"offset INT SYSTEM_METADATA("offset")" This is actually Oracle or MySQL style computed column syntax.
"You are right that one could argue that "timestamp", "headers" are something like "key" and "value"" I have the same feeling, both key value and headers timestamp are *real* data stored in the consumed record, they are not computed or generated. "Trying to solve everything via properties sounds rather like a hack to me" Things are not that hack if we can unify the routines or the definitions (all from the computed column way or all from the table options), i also think that it is a hacky that we mix in 2 kinds of syntax for different kinds of metadata (read-only and read-write). In this FLIP, we declare the Kafka key fields with table options but SYSTEM_METADATA for other metadata, that is a hacky thing or something in-consistent. Kurt Young <ykt...@gmail.com> 于2020年9月9日周三 下午4:48写道: > I would vote for `offset INT SYSTEM_METADATA("offset")`. > > I don't think we can stick with the SQL standard in DDL part forever, > especially as there are more and more > requirements coming from different connectors and external systems. > > Best, > Kurt > > > On Wed, Sep 9, 2020 at 4:40 PM Timo Walther <twal...@apache.org> wrote: > > > Hi Jark, > > > > now we are back at the original design proposed by Dawid :D Yes, we > > should be cautious about adding new syntax. But the length of this > > discussion shows that we are looking for a good long-term solution. In > > this case I would rather vote for a deep integration into the syntax. > > > > Computed columns are also not SQL standard compliant. And our DDL is > > neither, so we have some degree of freedom here. > > > > Trying to solve everything via properties sounds rather like a hack to > > me. You are right that one could argue that "timestamp", "headers" are > > something like "key" and "value". However, mixing > > > > `offset AS SYSTEM_METADATA("offset")` > > > > and > > > > `'timestamp.field' = 'ts'` > > > > looks more confusing to users that an explicit > > > > `offset AS CAST(SYSTEM_METADATA("offset") AS INT)` > > > > or > > > > `offset INT SYSTEM_METADATA("offset")` > > > > that is symetric for both source and sink. > > > > What do others think? > > > > Regards, > > Timo > > > > > > On 09.09.20 10:09, Jark Wu wrote: > > > Hi everyone, > > > > > > I think we have a conclusion that the writable metadata shouldn't be > > > defined as a computed column, but a normal column. > > > > > > "timestamp STRING SYSTEM_METADATA('timestamp')" is one of the > approaches. > > > However, it is not SQL standard compliant, we need to be cautious > enough > > > when adding new syntax. > > > Besides, we have to introduce the `PERSISTED` or `VIRTUAL` keyword to > > > resolve the query-sink schema problem if it is read-only metadata. That > > > adds more stuff to learn for users. > > > > > >>From my point of view, the "timestamp", "headers" are something like > > "key" > > > and "value" that stores with the real data. So why not define the > > > "timestamp" in the same way with "key" by using a "timestamp.field" > > > connector option? > > > On the other side, the read-only metadata, such as "offset", shouldn't > be > > > defined as a normal column. So why not use the existing computed column > > > syntax for such metadata? Then we don't have the query-sink schema > > problem. > > > So here is my proposal: > > > > > > CREATE TABLE kafka_table ( > > > id BIGINT, > > > name STRING, > > > col1 STRING, > > > col2 STRING, > > > ts TIMESTAMP(3) WITH LOCAL TIME ZONE, -- ts is a normal field, so > > can > > > be read and written. > > > offset AS SYSTEM_METADATA("offset") > > > ) WITH ( > > > 'connector' = 'kafka', > > > 'topic' = 'test-topic', > > > 'key.fields' = 'id, name', > > > 'key.format' = 'csv', > > > 'value.format' = 'avro', > > > 'timestamp.field' = 'ts' -- define the mapping of Kafka timestamp > > > ); > > > > > > INSERT INTO kafka_table > > > SELECT id, name, col1, col2, rowtime FROM another_table; > > > > > > I think this can solve all the problems without introducing any new > > syntax. > > > The only minor disadvantage is that we separate the definition > way/syntax > > > of read-only metadata and read-write fields. > > > However, I don't think this is a big problem. > > > > > > Best, > > > Jark > > > > > > > > > On Wed, 9 Sep 2020 at 15:09, Timo Walther <twal...@apache.org> wrote: > > > > > >> Hi Kurt, > > >> > > >> thanks for sharing your opinion. I'm totally up for not reusing > computed > > >> columns. I think Jark was a big supporter of this syntax, @Jark are > you > > >> fine with this as well? The non-computed column approach was only a > > >> "slightly rejected alternative". > > >> > > >> Furthermore, we would need to think about how such a new design > > >> influences the LIKE clause though. > > >> > > >> However, we should still keep the `PERSISTED` keyword as it influences > > >> the query->sink schema. If you look at the list of metadata for > existing > > >> connectors and formats, we currently offer only two writable metadata > > >> fields. Otherwise, one would need to declare two tables whenever a > > >> metadata columns is read (one for the source, one for the sink). This > > >> can be quite inconvientient e.g. for just reading the topic. > > >> > > >> Regards, > > >> Timo > > >> > > >> > > >> On 09.09.20 08:52, Kurt Young wrote: > > >>> I also share the concern that reusing the computed column syntax but > > have > > >>> different semantics > > >>> would confuse users a lot. > > >>> > > >>> Besides, I think metadata fields are conceptually not the same with > > >>> computed columns. The metadata > > >>> field is a connector specific thing and it only contains the > > information > > >>> that where does the field come > > >>> from (during source) or where does the field need to write to (during > > >>> sink). It's more similar with normal > > >>> fields, with assumption that all these fields need going to the data > > >> part. > > >>> > > >>> Thus I'm more lean to the rejected alternative that Timo mentioned. > > And I > > >>> think we don't need the > > >>> PERSISTED keyword, SYSTEM_METADATA should be enough. > > >>> > > >>> During implementation, the framework only needs to pass such <field, > > >>> metadata field> information to the > > >>> connector, and the logic of handling such fields inside the connector > > >>> should be straightforward. > > >>> > > >>> Regarding the downside Timo mentioned: > > >>> > > >>>> The disadvantage is that users cannot call UDFs or parse timestamps. > > >>> > > >>> I think this is fairly simple to solve. Since the metadata field > isn't > > a > > >>> computed column anymore, we can support > > >>> referencing such fields in the computed column. For example: > > >>> > > >>> CREATE TABLE kafka_table ( > > >>> id BIGINT, > > >>> name STRING, > > >>> timestamp STRING SYSTEM_METADATA("timestamp"), // get the > > >> timestamp > > >>> field from metadata > > >>> ts AS to_timestamp(timestamp) // normal computed column, parse > > the > > >>> string to TIMESTAMP type by using the metadata field > > >>> ) WITH ( > > >>> ... > > >>> ) > > >>> > > >>> Best, > > >>> Kurt > > >>> > > >>> > > >>> On Tue, Sep 8, 2020 at 11:57 PM Timo Walther <twal...@apache.org> > > wrote: > > >>> > > >>>> Hi Leonard, > > >>>> > > >>>> the only alternative I see is that we introduce a concept that is > > >>>> completely different to computed columns. This is also mentioned in > > the > > >>>> rejected alternative section of the FLIP. Something like: > > >>>> > > >>>> CREATE TABLE kafka_table ( > > >>>> id BIGINT, > > >>>> name STRING, > > >>>> timestamp INT SYSTEM_METADATA("timestamp") PERSISTED, > > >>>> headers MAP<STRING, BYTES> SYSTEM_METADATA("headers") > PERSISTED > > >>>> ) WITH ( > > >>>> ... > > >>>> ) > > >>>> > > >>>> This way we would avoid confusion at all and can easily map columns > to > > >>>> metadata columns. The disadvantage is that users cannot call UDFs or > > >>>> parse timestamps. This would need to be done in a real computed > > column. > > >>>> > > >>>> I'm happy about better alternatives. > > >>>> > > >>>> Regards, > > >>>> Timo > > >>>> > > >>>> > > >>>> On 08.09.20 15:37, Leonard Xu wrote: > > >>>>> HI, Timo > > >>>>> > > >>>>> Thanks for driving this FLIP. > > >>>>> > > >>>>> Sorry but I have a concern about Writing metadata via > > DynamicTableSink > > >>>> section: > > >>>>> > > >>>>> CREATE TABLE kafka_table ( > > >>>>> id BIGINT, > > >>>>> name STRING, > > >>>>> timestamp AS CAST(SYSTEM_METADATA("timestamp") AS BIGINT) > > >> PERSISTED, > > >>>>> headers AS CAST(SYSTEM_METADATA("headers") AS MAP<STRING, > > BYTES>) > > >>>> PERSISTED > > >>>>> ) WITH ( > > >>>>> ... > > >>>>> ) > > >>>>> An insert statement could look like: > > >>>>> > > >>>>> INSERT INTO kafka_table VALUES ( > > >>>>> (1, "ABC", 1599133672, MAP('checksum', computeChecksum(...))) > > >>>>> ) > > >>>>> > > >>>>> The proposed INERT syntax does not make sense to me, because it > > >> contains > > >>>> computed(generated) column. > > >>>>> Both SQL server and Postgresql do not allow to insert value to > > computed > > >>>> columns even they are persisted, this boke the generated column > > >> semantics > > >>>> and may confuse user much. > > >>>>> > > >>>>> For SQL server computed column[1]: > > >>>>>> column_name AS computed_column_expression [ PERSISTED [ NOT NULL ] > > >> ]... > > >>>>>> NOTE: A computed column cannot be the target of an INSERT or > UPDATE > > >>>> statement. > > >>>>> > > >>>>> For Postgresql generated column[2]: > > >>>>>> height_in numeric GENERATED ALWAYS AS (height_cm / 2.54) > STORED > > >>>>>> NOTE: A generated column cannot be written to directly. In INSERT > or > > >>>> UPDATE commands, a value cannot be specified for a generated column, > > but > > >>>> the keyword DEFAULT may be specified. > > >>>>> > > >>>>> It shouldn't be allowed to set/update value for generated column > > after > > >>>> lookup the SQL 2016: > > >>>>>> <insert statement> ::= > > >>>>>> INSERT INTO <insertion target> <insert columns and source> > > >>>>>> > > >>>>>> If <contextually typed table value constructor> CTTVC is > specified, > > >>>> then every <contextually typed row > > >>>>>> value constructor element> simply contained in CTTVC whose > > >> positionally > > >>>> corresponding <column name> > > >>>>>> in <insert column list> references a column of which some > underlying > > >>>> column is a generated column shall > > >>>>>> be a <default specification>. > > >>>>>> A <default specification> specifies the default value of some > > >>>> associated item. > > >>>>> > > >>>>> > > >>>>> [1] > > >>>> > > >> > > > https://docs.microsoft.com/en-US/sql/t-sql/statements/alter-table-computed-column-definition-transact-sql?view=sql-server-ver15 > > >>>> < > > >>>> > > >> > > > https://docs.microsoft.com/en-US/sql/t-sql/statements/alter-table-computed-column-definition-transact-sql?view=sql-server-ver15 > > >>>>> > > >>>>> [2] https://www.postgresql.org/docs/12/ddl-generated-columns.html > < > > >>>> https://www.postgresql.org/docs/12/ddl-generated-columns.html> > > >>>>> > > >>>>>> 在 2020年9月8日,17:31,Timo Walther <twal...@apache.org> 写道: > > >>>>>> > > >>>>>> Hi Jark, > > >>>>>> > > >>>>>> according to Flink's and Calcite's casting definition in [1][2] > > >>>> TIMESTAMP WITH LOCAL TIME ZONE should be castable from BIGINT. If > not, > > >> we > > >>>> will make it possible ;-) > > >>>>>> > > >>>>>> I'm aware of DeserializationSchema.getProducedType but I think > that > > >>>> this method is actually misplaced. The type should rather be passed > to > > >> the > > >>>> source itself. > > >>>>>> > > >>>>>> For our Kafka SQL source, we will also not use this method because > > the > > >>>> Kafka source will add own metadata in addition to the > > >>>> DeserializationSchema. So DeserializationSchema.getProducedType will > > >> never > > >>>> be read. > > >>>>>> > > >>>>>> For now I suggest to leave out the `DataType` from > > >>>> DecodingFormat.applyReadableMetadata. Also because the format's > > physical > > >>>> type is passed later in `createRuntimeDecoder`. If necessary, it can > > be > > >>>> computed manually by consumedType + metadata types. We will provide > a > > >>>> metadata utility class for that. > > >>>>>> > > >>>>>> Regards, > > >>>>>> Timo > > >>>>>> > > >>>>>> > > >>>>>> [1] > > >>>> > > >> > > > https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java#L200 > > >>>>>> [2] > > >>>> > > >> > > > https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/sql/type/SqlTypeCoercionRule.java#L254 > > >>>>>> > > >>>>>> > > >>>>>> On 08.09.20 10:52, Jark Wu wrote: > > >>>>>>> Hi Timo, > > >>>>>>> The updated CAST SYSTEM_METADATA behavior sounds good to me. I > just > > >>>> noticed > > >>>>>>> that a BIGINT can't be converted to "TIMESTAMP(3) WITH LOCAL TIME > > >>>> ZONE". > > >>>>>>> So maybe we need to support this, or use "TIMESTAMP(3) WITH LOCAL > > >> TIME > > >>>>>>> ZONE" as the defined type of Kafka timestamp? I think this makes > > >> sense, > > >>>>>>> because it represents the milli-seconds since epoch. > > >>>>>>> Regarding "DeserializationSchema doesn't need TypeInfo", I don't > > >> think > > >>>> so. > > >>>>>>> The DeserializationSchema implements ResultTypeQueryable, thus > the > > >>>>>>> implementation needs to return an output TypeInfo. > > >>>>>>> Besides, FlinkKafkaConsumer also > > >>>>>>> calls DeserializationSchema.getProducedType as the produced type > of > > >> the > > >>>>>>> source function [1]. > > >>>>>>> Best, > > >>>>>>> Jark > > >>>>>>> [1]: > > >>>>>>> > > >>>> > > >> > > > https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L1066 > > >>>>>>> On Tue, 8 Sep 2020 at 16:35, Timo Walther <twal...@apache.org> > > >> wrote: > > >>>>>>>> Hi everyone, > > >>>>>>>> > > >>>>>>>> I updated the FLIP again and hope that I could address the > > mentioned > > >>>>>>>> concerns. > > >>>>>>>> > > >>>>>>>> @Leonard: Thanks for the explanation. I wasn't aware that ts_ms > > and > > >>>>>>>> source.ts_ms have different semantics. I updated the FLIP and > > expose > > >>>> the > > >>>>>>>> most commonly used properties separately. So frequently used > > >>>> properties > > >>>>>>>> are not hidden in the MAP anymore: > > >>>>>>>> > > >>>>>>>> debezium-json.ingestion-timestamp > > >>>>>>>> debezium-json.source.timestamp > > >>>>>>>> debezium-json.source.database > > >>>>>>>> debezium-json.source.schema > > >>>>>>>> debezium-json.source.table > > >>>>>>>> > > >>>>>>>> However, since other properties depend on the used > > connector/vendor, > > >>>> the > > >>>>>>>> remaining options are stored in: > > >>>>>>>> > > >>>>>>>> debezium-json.source.properties > > >>>>>>>> > > >>>>>>>> And accessed with: > > >>>>>>>> > > >>>>>>>> CAST(SYSTEM_METADATA('debezium-json.source.properties') AS > > >> MAP<STRING, > > >>>>>>>> STRING>)['table'] > > >>>>>>>> > > >>>>>>>> Otherwise it is not possible to figure out the value and column > > type > > >>>>>>>> during validation. > > >>>>>>>> > > >>>>>>>> @Jark: You convinced me in relaxing the CAST constraints. I > added > > a > > >>>>>>>> dedicacated sub-section to the FLIP: > > >>>>>>>> > > >>>>>>>> For making the use of SYSTEM_METADATA easier and avoid nested > > >> casting > > >>>> we > > >>>>>>>> allow explicit casting to a target data type: > > >>>>>>>> > > >>>>>>>> rowtime AS CAST(SYSTEM_METADATA("timestamp") AS TIMESTAMP(3) > WITH > > >>>> LOCAL > > >>>>>>>> TIME ZONE) > > >>>>>>>> > > >>>>>>>> A connector still produces and consumes the data type returned > by > > >>>>>>>> `listMetadata()`. The planner will insert necessary explicit > > casts. > > >>>>>>>> > > >>>>>>>> In any case, the user must provide a CAST such that the computed > > >>>> column > > >>>>>>>> receives a valid data type when constructing the table schema. > > >>>>>>>> > > >>>>>>>> "I don't see a reason why `DecodingFormat#applyReadableMetadata` > > >>>> needs a > > >>>>>>>> DataType argument." > > >>>>>>>> > > >>>>>>>> Correct he DeserializationSchema doesn't need TypeInfo, it is > > always > > >>>>>>>> executed locally. It is the source that needs TypeInfo for > > >> serializing > > >>>>>>>> the record to the next operator. And that's this is what we > > provide. > > >>>>>>>> > > >>>>>>>> @Danny: > > >>>>>>>> > > >>>>>>>> “SYSTEM_METADATA("offset")` returns the NULL type by default” > > >>>>>>>> > > >>>>>>>> We can also use some other means to represent an UNKNOWN data > > type. > > >> In > > >>>>>>>> the Flink type system, we use the NullType for it. The important > > >> part > > >>>> is > > >>>>>>>> that the final data type is known for the entire computed > column. > > >> As I > > >>>>>>>> mentioned before, I would avoid the suggested option b) that > would > > >> be > > >>>>>>>> similar to your suggestion. The CAST should be enough and allows > > for > > >>>>>>>> complex expressions in the computed column. Option b) would need > > >>>> parser > > >>>>>>>> changes. > > >>>>>>>> > > >>>>>>>> Regards, > > >>>>>>>> Timo > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> On 08.09.20 06:21, Leonard Xu wrote: > > >>>>>>>>> Hi, Timo > > >>>>>>>>> > > >>>>>>>>> Thanks for you explanation and update, I have only one > question > > >> for > > >>>>>>>> the latest FLIP. > > >>>>>>>>> > > >>>>>>>>> About the MAP<STRING, STRING> DataType of key > > >>>> 'debezium-json.source', if > > >>>>>>>> user want to use the table name metadata, they need to write: > > >>>>>>>>> tableName STRING AS CAST(SYSTEM_METADATA('debeuim-json.source') > > AS > > >>>>>>>> MAP<STRING, STRING>)['table'] > > >>>>>>>>> > > >>>>>>>>> the expression is a little complex for user, Could we only > > support > > >>>>>>>> necessary metas with simple DataType as following? > > >>>>>>>>> tableName STRING AS > > >>>> CAST(SYSTEM_METADATA('debeuim-json.source.table') AS > > >>>>>>>> STRING), > > >>>>>>>>> transactionTime LONG AS > > >>>>>>>> CAST(SYSTEM_METADATA('debeuim-json.source.ts_ms') AS BIGINT), > > >>>>>>>>> > > >>>>>>>>> In this way, we can simplify the expression, the mainly used > > >>>> metadata in > > >>>>>>>> changelog format may include > > >>>> 'database','table','source.ts_ms','ts_ms' from > > >>>>>>>> my side, > > >>>>>>>>> maybe we could only support them at first version. > > >>>>>>>>> > > >>>>>>>>> Both Debezium and Canal have above four metadata, and I‘m > willing > > >> to > > >>>>>>>> take some subtasks in next development if necessary. > > >>>>>>>>> > > >>>>>>>>> Debezium: > > >>>>>>>>> { > > >>>>>>>>> "before": null, > > >>>>>>>>> "after": { "id": 101,"name": "scooter"}, > > >>>>>>>>> "source": { > > >>>>>>>>> "db": "inventory", # 1. database name > > the > > >>>>>>>> changelog belongs to. > > >>>>>>>>> "table": "products", # 2. table name the > > >>>> changelog > > >>>>>>>> belongs to. > > >>>>>>>>> "ts_ms": 1589355504100, # 3. timestamp of > the > > >>>> change > > >>>>>>>> happened in database system, i.e.: transaction time in database. > > >>>>>>>>> "connector": "mysql", > > >>>>>>>>> …. > > >>>>>>>>> }, > > >>>>>>>>> "ts_ms": 1589355606100, # 4. timestamp when > > the > > >>>> debezium > > >>>>>>>> processed the changelog. > > >>>>>>>>> "op": "c", > > >>>>>>>>> "transaction": null > > >>>>>>>>> } > > >>>>>>>>> > > >>>>>>>>> Canal: > > >>>>>>>>> { > > >>>>>>>>> "data": [{ "id": "102", "name": "car battery" }], > > >>>>>>>>> "database": "inventory", # 1. database name the > > changelog > > >>>>>>>> belongs to. > > >>>>>>>>> "table": "products", # 2. table name the > changelog > > >>>> belongs > > >>>>>>>> to. > > >>>>>>>>> "es": 1589374013000, # 3. execution time of the > > >> change > > >>>> in > > >>>>>>>> database system, i.e.: transaction time in database. > > >>>>>>>>> "ts": 1589374013680, # 4. timestamp when the > > cannal > > >>>>>>>> processed the changelog. > > >>>>>>>>> "isDdl": false, > > >>>>>>>>> "mysqlType": {}, > > >>>>>>>>> .... > > >>>>>>>>> } > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> Best > > >>>>>>>>> Leonard > > >>>>>>>>> > > >>>>>>>>>> 在 2020年9月8日,11:57,Danny Chan <yuzhao....@gmail.com> 写道: > > >>>>>>>>>> > > >>>>>>>>>> Thanks Timo ~ > > >>>>>>>>>> > > >>>>>>>>>> The FLIP was already in pretty good shape, I have only 2 > > questions > > >>>> here: > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> 1. “`CAST(SYSTEM_METADATA("offset") AS INT)` would be a valid > > >>>> read-only > > >>>>>>>> computed column for Kafka and can be extracted by the planner.” > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> What is the pros we follow the SQL-SERVER syntax here ? > Usually > > an > > >>>>>>>> expression return type can be inferred automatically. But I > guess > > >>>>>>>> SQL-SERVER does not have function like SYSTEM_METADATA which > > >> actually > > >>>> does > > >>>>>>>> not have a specific return type. > > >>>>>>>>>> > > >>>>>>>>>> And why not use the Oracle or MySQL syntax there ? > > >>>>>>>>>> > > >>>>>>>>>> column_name [datatype] [GENERATED ALWAYS] AS (expression) > > >> [VIRTUAL] > > >>>>>>>>>> Which is more straight-forward. > > >>>>>>>>>> > > >>>>>>>>>> 2. “SYSTEM_METADATA("offset")` returns the NULL type by > default” > > >>>>>>>>>> > > >>>>>>>>>> The default type should not be NULL because only NULL literal > > does > > >>>>>>>> that. Usually we use ANY as the type if we do not know the > > specific > > >>>> type in > > >>>>>>>> the SQL context. ANY means the physical value can be any java > > >> object. > > >>>>>>>>>> > > >>>>>>>>>> [1] > https://oracle-base.com/articles/11g/virtual-columns-11gr1 > > >>>>>>>>>> [2] > > >>>>>>>> > > >>>> > > >> > > > https://dev.mysql.com/doc/refman/5.7/en/create-table-generated-columns.html > > >>>>>>>>>> > > >>>>>>>>>> Best, > > >>>>>>>>>> Danny Chan > > >>>>>>>>>> 在 2020年9月4日 +0800 PM4:48,Timo Walther <twal...@apache.org > >,写道: > > >>>>>>>>>>> Hi everyone, > > >>>>>>>>>>> > > >>>>>>>>>>> I completely reworked FLIP-107. It now covers the full story > > how > > >> to > > >>>>>>>> read > > >>>>>>>>>>> and write metadata from/to connectors and formats. It > considers > > >>>> all of > > >>>>>>>>>>> the latest FLIPs, namely FLIP-95, FLIP-132 and FLIP-122. It > > >>>> introduces > > >>>>>>>>>>> the concept of PERSISTED computed columns and leaves out > > >>>> partitioning > > >>>>>>>>>>> for now. > > >>>>>>>>>>> > > >>>>>>>>>>> Looking forward to your feedback. > > >>>>>>>>>>> > > >>>>>>>>>>> Regards, > > >>>>>>>>>>> Timo > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> On 04.03.20 09:45, Kurt Young wrote: > > >>>>>>>>>>>> Sorry, forgot one question. > > >>>>>>>>>>>> > > >>>>>>>>>>>> 4. Can we make the value.fields-include more orthogonal? > Like > > >> one > > >>>> can > > >>>>>>>>>>>> specify it as "EXCEPT_KEY, EXCEPT_TIMESTAMP". > > >>>>>>>>>>>> With current EXCEPT_KEY and EXCEPT_KEY_TIMESTAMP, users can > > not > > >>>>>>>> config to > > >>>>>>>>>>>> just ignore timestamp but keep key. > > >>>>>>>>>>>> > > >>>>>>>>>>>> Best, > > >>>>>>>>>>>> Kurt > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> On Wed, Mar 4, 2020 at 4:42 PM Kurt Young <ykt...@gmail.com > > > > >>>> wrote: > > >>>>>>>>>>>> > > >>>>>>>>>>>>> Hi Dawid, > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> I have a couple of questions around key fields, actually I > > also > > >>>> have > > >>>>>>>> some > > >>>>>>>>>>>>> other questions but want to be focused on key fields first. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> 1. I don't fully understand the usage of "key.fields". Is > > this > > >>>>>>>> option only > > >>>>>>>>>>>>> valid during write operation? Because for > > >>>>>>>>>>>>> reading, I can't imagine how such options can be applied. I > > >> would > > >>>>>>>> expect > > >>>>>>>>>>>>> that there might be a SYSTEM_METADATA("key") > > >>>>>>>>>>>>> to read and assign the key to a normal field? > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> 2. If "key.fields" is only valid in write operation, I want > > to > > >>>>>>>> propose we > > >>>>>>>>>>>>> can simplify the options to not introducing key.format.type > > and > > >>>>>>>>>>>>> other related options. I think a single "key.field" (not > > >> fields) > > >>>>>>>> would be > > >>>>>>>>>>>>> enough, users can use UDF to calculate whatever key they > > >>>>>>>>>>>>> want before sink. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> 3. Also I don't want to introduce "value.format.type" and > > >>>>>>>>>>>>> "value.format.xxx" with the "value" prefix. Not every > > connector > > >>>> has a > > >>>>>>>>>>>>> concept > > >>>>>>>>>>>>> of key and values. The old parameter "format.type" already > > good > > >>>>>>>> enough to > > >>>>>>>>>>>>> use. > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Best, > > >>>>>>>>>>>>> Kurt > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> On Tue, Mar 3, 2020 at 10:40 PM Jark Wu <imj...@gmail.com> > > >>>> wrote: > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> Thanks Dawid, > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> I have two more questions. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> SupportsMetadata > > >>>>>>>>>>>>>> Introducing SupportsMetadata sounds good to me. But I have > > >> some > > >>>>>>>> questions > > >>>>>>>>>>>>>> regarding to this interface. > > >>>>>>>>>>>>>> 1) How do the source know what the expected return type of > > >> each > > >>>>>>>> metadata? > > >>>>>>>>>>>>>> 2) Where to put the metadata fields? Append to the > existing > > >>>> physical > > >>>>>>>>>>>>>> fields? > > >>>>>>>>>>>>>> If yes, I would suggest to change the signature to > > >> `TableSource > > >>>>>>>>>>>>>> appendMetadataFields(String[] metadataNames, DataType[] > > >>>>>>>> metadataTypes)` > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> SYSTEM_METADATA("partition") > > >>>>>>>>>>>>>> Can SYSTEM_METADATA() function be used nested in a > computed > > >>>> column > > >>>>>>>>>>>>>> expression? If yes, how to specify the return type of > > >>>>>>>> SYSTEM_METADATA? > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> Best, > > >>>>>>>>>>>>>> Jark > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> On Tue, 3 Mar 2020 at 17:06, Dawid Wysakowicz < > > >>>>>>>> dwysakow...@apache.org> > > >>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Hi, > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> 1. I thought a bit more on how the source would emit the > > >>>> columns > > >>>>>>>> and I > > >>>>>>>>>>>>>>> now see its not exactly the same as regular columns. I > see > > a > > >>>> need > > >>>>>>>> to > > >>>>>>>>>>>>>>> elaborate a bit more on that in the FLIP as you asked, > > Jark. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> I do agree mostly with Danny on how we should do that. > One > > >>>>>>>> additional > > >>>>>>>>>>>>>>> things I would introduce is an > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> interface SupportsMetadata { > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> boolean supportsMetadata(Set<String> metadataFields); > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> TableSource generateMetadataFields(Set<String> > > >> metadataFields); > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> } > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> This way the source would have to declare/emit only the > > >>>> requested > > >>>>>>>>>>>>>>> metadata fields. In order not to clash with user defined > > >>>> fields. > > >>>>>>>> When > > >>>>>>>>>>>>>>> emitting the metadata field I would prepend the column > name > > >>>> with > > >>>>>>>>>>>>>>> __system_{property_name}. Therefore when requested > > >>>>>>>>>>>>>>> SYSTEM_METADATA("partition") the source would append a > > field > > >>>>>>>>>>>>>>> __system_partition to the schema. This would be never > > visible > > >>>> to > > >>>>>>>> the > > >>>>>>>>>>>>>>> user as it would be used only for the subsequent computed > > >>>> columns. > > >>>>>>>> If > > >>>>>>>>>>>>>>> that makes sense to you, I will update the FLIP with this > > >>>>>>>> description. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> 2. CAST vs explicit type in computed columns > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Here I agree with Danny. It is also the current state of > > the > > >>>>>>>> proposal. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> 3. Partitioning on computed column vs function > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Here I also agree with Danny. I also think those are > > >>>> orthogonal. I > > >>>>>>>> would > > >>>>>>>>>>>>>>> leave out the STORED computed columns out of the > > discussion. > > >> I > > >>>>>>>> don't see > > >>>>>>>>>>>>>>> how do they relate to the partitioning. I already put > both > > of > > >>>> those > > >>>>>>>>>>>>>>> cases in the document. We can either partition on a > > computed > > >>>>>>>> column or > > >>>>>>>>>>>>>>> use a udf in a partioned by clause. I am fine with > leaving > > >> out > > >>>> the > > >>>>>>>>>>>>>>> partitioning by udf in the first version if you still > have > > >> some > > >>>>>>>>>>>>>> concerns. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> As for your question Danny. It depends which partitioning > > >>>> strategy > > >>>>>>>> you > > >>>>>>>>>>>>>> use. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> For the HASH partitioning strategy I thought it would > work > > as > > >>>> you > > >>>>>>>>>>>>>>> explained. It would be N = MOD(expr, num). I am not sure > > >>>> though if > > >>>>>>>> we > > >>>>>>>>>>>>>>> should introduce the PARTITIONS clause. Usually Flink > does > > >> not > > >>>> own > > >>>>>>>> the > > >>>>>>>>>>>>>>> data and the partitions are already an intrinsic property > > of > > >>>> the > > >>>>>>>>>>>>>>> underlying source e.g. for kafka we do not create topics, > > but > > >>>> we > > >>>>>>>> just > > >>>>>>>>>>>>>>> describe pre-existing pre-partitioned topic. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> 4. timestamp vs timestamp.field vs connector.field vs ... > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> I am fine with changing it to timestamp.field to be > > >> consistent > > >>>> with > > >>>>>>>>>>>>>>> other value.fields and key.fields. Actually that was also > > my > > >>>>>>>> initial > > >>>>>>>>>>>>>>> proposal in a first draft I prepared. I changed it > > afterwards > > >>>> to > > >>>>>>>> shorten > > >>>>>>>>>>>>>>> the key. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Best, > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Dawid > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> On 03/03/2020 09:00, Danny Chan wrote: > > >>>>>>>>>>>>>>>> Thanks Dawid for bringing up this discussion, I think it > > is > > >> a > > >>>>>>>> useful > > >>>>>>>>>>>>>>> feature ~ > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> About how the metadata outputs from source > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> I think it is completely orthogonal, computed column > push > > >>>> down is > > >>>>>>>>>>>>>>> another topic, this should not be a blocker but a > > promotion, > > >>>> if we > > >>>>>>>> do > > >>>>>>>>>>>>>> not > > >>>>>>>>>>>>>>> have any filters on the computed column, there is no need > > to > > >>>> do any > > >>>>>>>>>>>>>>> pushings; the source node just emit the complete record > > with > > >>>> full > > >>>>>>>>>>>>>> metadata > > >>>>>>>>>>>>>>> with the declared physical schema, then when generating > the > > >>>> virtual > > >>>>>>>>>>>>>>> columns, we would extract the metadata info and output as > > >> full > > >>>>>>>>>>>>>> columns(with > > >>>>>>>>>>>>>>> full schema). > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> About the type of metadata column > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> Personally i prefer explicit type instead of CAST, they > > are > > >>>>>>>> symantic > > >>>>>>>>>>>>>>> equivalent though, explict type is more straight-forward > > and > > >>>> we can > > >>>>>>>>>>>>>> declare > > >>>>>>>>>>>>>>> the nullable attribute there. > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> About option A: partitioning based on acomputed column > VS > > >>>> option > > >>>>>>>> B: > > >>>>>>>>>>>>>>> partitioning with just a function > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> From the FLIP, it seems that B's partitioning is > just > > a > > >>>> strategy > > >>>>>>>> when > > >>>>>>>>>>>>>>> writing data, the partiton column is not included in the > > >> table > > >>>>>>>> schema, > > >>>>>>>>>>>>>> so > > >>>>>>>>>>>>>>> it's just useless when reading from that. > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> - Compared to A, we do not need to generate the > partition > > >>>> column > > >>>>>>>> when > > >>>>>>>>>>>>>>> selecting from the table(but insert into) > > >>>>>>>>>>>>>>>> - For A we can also mark the column as STORED when we > want > > >> to > > >>>>>>>> persist > > >>>>>>>>>>>>>>> that > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> So in my opition they are orthogonal, we can support > > both, i > > >>>> saw > > >>>>>>>> that > > >>>>>>>>>>>>>>> MySQL/Oracle[1][2] would suggest to also define the > > >> PARTITIONS > > >>>>>>>> num, and > > >>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>> partitions are managed under a "tablenamespace", the > > >> partition > > >>>> in > > >>>>>>>> which > > >>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>> record is stored is partition number N, where N = > MOD(expr, > > >>>> num), > > >>>>>>>> for > > >>>>>>>>>>>>>> your > > >>>>>>>>>>>>>>> design, which partiton the record would persist ? > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> [1] > > >>>>>>>> https://dev.mysql.com/doc/refman/5.7/en/partitioning-hash.html > > >>>>>>>>>>>>>>>> [2] > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>> > > >>>> > > >> > > > https://docs.oracle.com/database/121/VLDBG/GUID-F023D3ED-262F-4B19-950A-D3C8F8CDB4F4.htm#VLDBG1270 > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> Best, > > >>>>>>>>>>>>>>>> Danny Chan > > >>>>>>>>>>>>>>>> 在 2020年3月2日 +0800 PM6:16,Dawid Wysakowicz < > > >>>> dwysakow...@apache.org > > >>>>>>>>>>>>>>> ,写道: > > >>>>>>>>>>>>>>>>> Hi Jark, > > >>>>>>>>>>>>>>>>> Ad. 2 I added a section to discuss relation to FLIP-63 > > >>>>>>>>>>>>>>>>> Ad. 3 Yes, I also tried to somewhat keep hierarchy of > > >>>> properties. > > >>>>>>>>>>>>>>> Therefore you have the key.format.type. > > >>>>>>>>>>>>>>>>> I also considered exactly what you are suggesting > > >> (prefixing > > >>>> with > > >>>>>>>>>>>>>>> connector or kafka). I should've put that into an > > >>>> Option/Rejected > > >>>>>>>>>>>>>>> alternatives. > > >>>>>>>>>>>>>>>>> I agree timestamp, key.*, value.* are connector > > properties. > > >>>> Why I > > >>>>>>>>>>>>>>> wanted to suggest not adding that prefix in the first > > version > > >>>> is > > >>>>>>>> that > > >>>>>>>>>>>>>>> actually all the properties in the WITH section are > > connector > > >>>>>>>>>>>>>> properties. > > >>>>>>>>>>>>>>> Even format is in the end a connector property as some of > > the > > >>>>>>>> sources > > >>>>>>>>>>>>>> might > > >>>>>>>>>>>>>>> not have a format, imo. The benefit of not adding the > > prefix > > >> is > > >>>>>>>> that it > > >>>>>>>>>>>>>>> makes the keys a bit shorter. Imagine prefixing all the > > >>>> properties > > >>>>>>>> with > > >>>>>>>>>>>>>>> connector (or if we go with FLINK-12557: elasticsearch): > > >>>>>>>>>>>>>>>>> elasticsearch.key.format.type: csv > > >>>>>>>>>>>>>>>>> elasticsearch.key.format.field: .... > > >>>>>>>>>>>>>>>>> elasticsearch.key.format.delimiter: .... > > >>>>>>>>>>>>>>>>> elasticsearch.key.format.*: .... > > >>>>>>>>>>>>>>>>> I am fine with doing it though if this is a preferred > > >>>> approach > > >>>>>>>> in the > > >>>>>>>>>>>>>>> community. > > >>>>>>>>>>>>>>>>> Ad in-line comments: > > >>>>>>>>>>>>>>>>> I forgot to update the `value.fields.include` property. > > It > > >>>>>>>> should be > > >>>>>>>>>>>>>>> value.fields-include. Which I think you also suggested in > > the > > >>>>>>>> comment, > > >>>>>>>>>>>>>>> right? > > >>>>>>>>>>>>>>>>> As for the cast vs declaring output type of computed > > >> column. > > >>>> I > > >>>>>>>> think > > >>>>>>>>>>>>>>> it's better not to use CAST, but declare a type of an > > >>>> expression > > >>>>>>>> and > > >>>>>>>>>>>>>> later > > >>>>>>>>>>>>>>> on infer the output type of SYSTEM_METADATA. The reason > is > > I > > >>>> think > > >>>>>>>> this > > >>>>>>>>>>>>>> way > > >>>>>>>>>>>>>>> it will be easier to implement e.g. filter push downs > when > > >>>> working > > >>>>>>>> with > > >>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>> native types of the source, e.g. in case of Kafka's > > offset, i > > >>>>>>>> think it's > > >>>>>>>>>>>>>>> better to pushdown long rather than string. This could > let > > us > > >>>> push > > >>>>>>>>>>>>>>> expression like e.g. offset > 12345 & offset < 59382. > > >>>> Otherwise we > > >>>>>>>> would > > >>>>>>>>>>>>>>> have to push down cast(offset, long) > 12345 && > > cast(offset, > > >>>> long) > > >>>>>>>> < > > >>>>>>>>>>>>>> 59382. > > >>>>>>>>>>>>>>> Moreover I think we need to introduce the type for > computed > > >>>> columns > > >>>>>>>>>>>>>> anyway > > >>>>>>>>>>>>>>> to support functions that infer output type based on > > expected > > >>>>>>>> return > > >>>>>>>>>>>>>> type. > > >>>>>>>>>>>>>>>>> As for the computed column push down. Yes, > > SYSTEM_METADATA > > >>>> would > > >>>>>>>> have > > >>>>>>>>>>>>>>> to be pushed down to the source. If it is not possible > the > > >>>> planner > > >>>>>>>>>>>>>> should > > >>>>>>>>>>>>>>> fail. As far as I know computed columns push down will be > > >> part > > >>>> of > > >>>>>>>> source > > >>>>>>>>>>>>>>> rework, won't it? ;) > > >>>>>>>>>>>>>>>>> As for the persisted computed column. I think it is > > >>>> completely > > >>>>>>>>>>>>>>> orthogonal. In my current proposal you can also partition > > by > > >> a > > >>>>>>>> computed > > >>>>>>>>>>>>>>> column. The difference between using a udf in partitioned > > by > > >> vs > > >>>>>>>>>>>>>> partitioned > > >>>>>>>>>>>>>>> by a computed column is that when you partition by a > > computed > > >>>>>>>> column > > >>>>>>>>>>>>>> this > > >>>>>>>>>>>>>>> column must be also computed when reading the table. If > you > > >>>> use a > > >>>>>>>> udf in > > >>>>>>>>>>>>>>> the partitioned by, the expression is computed only when > > >>>> inserting > > >>>>>>>> into > > >>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>> table. > > >>>>>>>>>>>>>>>>> Hope this answers some of your questions. Looking > forward > > >> for > > >>>>>>>> further > > >>>>>>>>>>>>>>> suggestions. > > >>>>>>>>>>>>>>>>> Best, > > >>>>>>>>>>>>>>>>> Dawid > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>> On 02/03/2020 05:18, Jark Wu wrote: > > >>>>>>>>>>>>>>>>>> Hi, > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> Thanks Dawid for starting such a great discussion. > > Reaing > > >>>>>>>> metadata > > >>>>>>>>>>>>>> and > > >>>>>>>>>>>>>>>>>> key-part information from source is an important > feature > > >> for > > >>>>>>>>>>>>>> streaming > > >>>>>>>>>>>>>>>>>> users. > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> In general, I agree with the proposal of the FLIP. > > >>>>>>>>>>>>>>>>>> I will leave my thoughts and comments here: > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> 1) +1 to use connector properties instead of > introducing > > >>>> HEADER > > >>>>>>>>>>>>>>> keyword as > > >>>>>>>>>>>>>>>>>> the reason you mentioned in the FLIP. > > >>>>>>>>>>>>>>>>>> 2) we already introduced PARTITIONED BY in FLIP-63. > > Maybe > > >> we > > >>>>>>>> should > > >>>>>>>>>>>>>>> add a > > >>>>>>>>>>>>>>>>>> section to explain what's the relationship between > them. > > >>>>>>>>>>>>>>>>>> Do their concepts conflict? Could INSERT PARTITION be > > used > > >>>> on > > >>>>>>>> the > > >>>>>>>>>>>>>>>>>> PARTITIONED table in this FLIP? > > >>>>>>>>>>>>>>>>>> 3) Currently, properties are hierarchical in Flink > SQL. > > >>>> Shall we > > >>>>>>>>>>>>>> make > > >>>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>>>>> new introduced properties more hierarchical? > > >>>>>>>>>>>>>>>>>> For example, "timestamp" => "connector.timestamp"? > > >>>> (actually, I > > >>>>>>>>>>>>>>> prefer > > >>>>>>>>>>>>>>>>>> "kafka.timestamp" which is another improvement for > > >>>> properties > > >>>>>>>>>>>>>>> FLINK-12557) > > >>>>>>>>>>>>>>>>>> A single "timestamp" in properties may mislead users > > that > > >>>> the > > >>>>>>>>>>>>>> field > > >>>>>>>>>>>>>>> is > > >>>>>>>>>>>>>>>>>> a rowtime attribute. > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> I also left some minor comments in the FLIP. > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> Thanks, > > >>>>>>>>>>>>>>>>>> Jark > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>> On Sun, 1 Mar 2020 at 22:30, Dawid Wysakowicz < > > >>>>>>>>>>>>>> dwysakow...@apache.org> > > >>>>>>>>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> Hi, > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> I would like to propose an improvement that would > > enable > > >>>>>>>> reading > > >>>>>>>>>>>>>> table > > >>>>>>>>>>>>>>>>>>> columns from different parts of source records. > Besides > > >> the > > >>>>>>>> main > > >>>>>>>>>>>>>>> payload > > >>>>>>>>>>>>>>>>>>> majority (if not all of the sources) expose > additional > > >>>>>>>>>>>>>> information. It > > >>>>>>>>>>>>>>>>>>> can be simply a read-only metadata such as offset, > > >>>> ingestion > > >>>>>>>> time > > >>>>>>>>>>>>>> or a > > >>>>>>>>>>>>>>>>>>> read and write parts of the record that contain data > > but > > >>>>>>>>>>>>>> additionally > > >>>>>>>>>>>>>>>>>>> serve different purposes (partitioning, compaction > > etc.), > > >>>> e.g. > > >>>>>>>> key > > >>>>>>>>>>>>>> or > > >>>>>>>>>>>>>>>>>>> timestamp in Kafka. > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> We should make it possible to read and write data > from > > >> all > > >>>> of > > >>>>>>>> those > > >>>>>>>>>>>>>>>>>>> locations. In this proposal I discuss reading > > >> partitioning > > >>>>>>>> data, > > >>>>>>>>>>>>>> for > > >>>>>>>>>>>>>>>>>>> completeness this proposal discusses also the > > >> partitioning > > >>>> when > > >>>>>>>>>>>>>>> writing > > >>>>>>>>>>>>>>>>>>> data out. > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> I am looking forward to your comments. > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> You can access the FLIP here: > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>> > > >>>> > > >> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records?src=contextnavpagetreemode > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> Best, > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> Dawid > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>> > > >>>>>>>> > > >>>>>> > > >>>>> > > >>>>> > > >>>> > > >>>> > > >>> > > >> > > >> > > > > > > > >