Thanks Timo, The updates to `Schema` and LIKE clause looks good to me.
Best, Jark On Tue, 15 Sep 2020 at 10:30, Leonard Xu <xbjt...@gmail.com> wrote: > Hi, Timo > > Thanks for your explanation, it makes sense to me. > > Best, > Leonard > > > >> Hi, Timo > >> Thanks for the update > >> I have a minor suggestion about the debezium metadata key, > >> Could we use the original debezium key rather than import new key? > >> debezium-json.schema => debezium-json.schema > >> debezium-json.ingestion-timestamp => debezium-json.ts_ms > >> debezium-json.source.database => debezium-json.source.db > >> debezium-json.source.schema => debezium-json.source.schema > >> debezium-json.source.table => debezium-json.source.table > >> debezium-json.source.timestamp => debezium-json.source.ts_ms > >> debezium-json.source.properties => debezium-json.source > MAP<STRING, STRING> > >> User who familiar with debezium will understand the key easier, and > the key syntax is more json-path like. HDYT? > >> The other part looks really good to me. > >> Regards, > >> Leonard > >>> 在 2020年9月10日,18:26,Aljoscha Krettek <aljos...@apache.org> 写道: > >>> > >>> I've only been watching this from the sidelines but that latest > proposal looks very good to me! > >>> > >>> Aljoscha > >>> > >>> On 10.09.20 12:20, Kurt Young wrote: > >>>> The new syntax looks good to me. > >>>> Best, > >>>> Kurt > >>>> On Thu, Sep 10, 2020 at 5:57 PM Jark Wu <imj...@gmail.com> wrote: > >>>>> Hi Timo, > >>>>> > >>>>> I have one minor suggestion. > >>>>> Maybe the default data type of `timestamp` can be `TIMESTAMP(3) WITH > >>>>> LOCAL TIME ZONE`, because this is the type that users want to use, > this can > >>>>> avoid unnecessary casting. > >>>>> Besides, currently, the bigint is casted to timestamp in seconds, so > the > >>>>> implicit cast may not work... > >>>>> > >>>>> I don't have other objections. But maybe we should wait for the > >>>>> opinion from @Kurt for the new syntax. > >>>>> > >>>>> Best, > >>>>> Jark > >>>>> > >>>>> > >>>>> On Thu, 10 Sep 2020 at 16:21, Danny Chan <yuzhao....@gmail.com> > wrote: > >>>>> > >>>>>> Thanks for driving this Timo, +1 for voting ~ > >>>>>> > >>>>>> Best, > >>>>>> Danny Chan > >>>>>> 在 2020年9月10日 +0800 PM3:47,Timo Walther <twal...@apache.org>,写道: > >>>>>>> Thanks everyone for this healthy discussion. I updated the FLIP > with the > >>>>>>> outcome. I think the result is very powerful but also very easy to > >>>>>>> declare. Thanks for all the contributions. > >>>>>>> > >>>>>>> If there are no objections, I would continue with a voting. > >>>>>>> > >>>>>>> What do you think? > >>>>>>> > >>>>>>> Regards, > >>>>>>> Timo > >>>>>>> > >>>>>>> > >>>>>>> On 09.09.20 16:52, Timo Walther wrote: > >>>>>>>> "If virtual by default, when a user types "timestamp int" ==> > >>>>>> persisted > >>>>>>>> column, then adds a "metadata" after that ==> virtual column, then > >>>>>> adds > >>>>>>>> a "persisted" after that ==> persisted column." > >>>>>>>> > >>>>>>>> Thanks for this nice mental model explanation, Jark. This makes > total > >>>>>>>> sense to me. Also making the the most common case as short at just > >>>>>>>> adding `METADATA` is a very good idea. Thanks, Danny! > >>>>>>>> > >>>>>>>> Let me update the FLIP again with all these ideas. > >>>>>>>> > >>>>>>>> Regards, > >>>>>>>> Timo > >>>>>>>> > >>>>>>>> > >>>>>>>> On 09.09.20 15:03, Jark Wu wrote: > >>>>>>>>> I'm also +1 to Danny's proposal: timestamp INT METADATA [FROM > >>>>>>>>> 'my-timestamp-field'] [VIRTUAL] > >>>>>>>>> Especially I like the shortcut: timestamp INT METADATA, this > makes > >>>>>> the > >>>>>>>>> most > >>>>>>>>> common case to be supported in the simplest way. > >>>>>>>>> > >>>>>>>>> I also think the default should be "PERSISTED", so VIRTUAL is > >>>>>> optional > >>>>>>>>> when > >>>>>>>>> you are accessing a read-only metadata. Because: > >>>>>>>>> 1. The "timestamp INT METADATA" should be a normal column, > because > >>>>>>>>> "METADATA" is just a modifier to indicate it is from metadata, a > >>>>>> normal > >>>>>>>>> column should be persisted. > >>>>>>>>> If virtual by default, when a user types "timestamp int" ==> > >>>>>>>>> persisted > >>>>>>>>> column, then adds a "metadata" after that ==> virtual column, > then > >>>>>> adds a > >>>>>>>>> "persisted" after that ==> persisted column. > >>>>>>>>> I think this looks reversed several times and makes users > >>>>>> confused. > >>>>>>>>> Physical fields are also prefixed with "fieldName TYPE", so > >>>>>> "timestamp > >>>>>>>>> INT > >>>>>>>>> METADATA" is persisted is very straightforward. > >>>>>>>>> 2. From the collected user question [1], we can see that > "timestamp" > >>>>>>>>> is the > >>>>>>>>> most common use case. "timestamp" is a read-write metadata. > >>>>>> Persisted by > >>>>>>>>> default doesn't break the reading behavior. > >>>>>>>>> > >>>>>>>>> Best, > >>>>>>>>> Jark > >>>>>>>>> > >>>>>>>>> [1]: https://issues.apache.org/jira/browse/FLINK-15869 > >>>>>>>>> > >>>>>>>>> On Wed, 9 Sep 2020 at 20:56, Leonard Xu <xbjt...@gmail.com> > wrote: > >>>>>>>>> > >>>>>>>>>> Thanks @Dawid for the nice summary, I think you catch all > >>>>>> opinions of > >>>>>>>>>> the > >>>>>>>>>> long discussion well. > >>>>>>>>>> > >>>>>>>>>> @Danny > >>>>>>>>>> “ timestamp INT METADATA [FROM 'my-timestamp-field'] [VIRTUAL] > >>>>>>>>>> Note that the "FROM 'field name'" is only needed when the name > >>>>>>>>>> conflict > >>>>>>>>>> with the declared table column name, when there are no > >>>>>> conflicts, > >>>>>>>>>> we can > >>>>>>>>>> simplify it to > >>>>>>>>>> timestamp INT METADATA" > >>>>>>>>>> > >>>>>>>>>> I really like the proposal, there is no confusion with computed > >>>>>>>>>> column any > >>>>>>>>>> more, and it’s concise enough. > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> @Timo @Dawid > >>>>>>>>>> “We use `SYSTEM_TIME` for temporal tables. I think prefixing > with > >>>>>> SYSTEM > >>>>>>>>>> makes it clearer that it comes magically from the system.” > >>>>>>>>>> “As for the issue of shortening the SYSTEM_METADATA to METADATA. > >>>>>> Here I > >>>>>>>>>> very much prefer the SYSTEM_ prefix.” > >>>>>>>>>> > >>>>>>>>>> I think `SYSTEM_TIME` is different with `SYSTEM_METADATA ` a > lot, > >>>>>>>>>> First of all, the word `TIME` has broad meanings but the word > >>>>>>>>>> `METADATA ` > >>>>>>>>>> not, `METADATA ` has specific meaning, > >>>>>>>>>> Secondly, `FOR SYSTEM_TIME AS OF` exists in SQL standard but > >>>>>>>>>> `SYSTEM_METADATA ` not. > >>>>>>>>>> Personally, I like more simplify way,sometimes less is more. > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> Best, > >>>>>>>>>> Leonard > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> Timo Walther <twal...@apache.org> 于2020年9月9日周三 下午6:41写道: > >>>>>>>>>>> > >>>>>>>>>>>> Hi everyone, > >>>>>>>>>>>> > >>>>>>>>>>>> "key" and "value" in the properties are a special case > >>>>>> because they > >>>>>>>>>>>> need > >>>>>>>>>>>> to configure a format. So key and value are more than just > >>>>>> metadata. > >>>>>>>>>>>> Jark's example for setting a timestamp would work but as the > >>>>>> FLIP > >>>>>>>>>>>> discusses, we have way more metadata fields like headers, > >>>>>>>>>>>> epoch-leader, > >>>>>>>>>>>> etc. Having a property for all of this metadata would mess up > >>>>>> the WITH > >>>>>>>>>>>> section entirely. Furthermore, we also want to deal with > >>>>>> metadata from > >>>>>>>>>>>> the formats. Solving this through properties as well would > >>>>>> further > >>>>>>>>>>>> complicate the property design. > >>>>>>>>>>>> > >>>>>>>>>>>> Personally, I still like the computed column design more > >>>>>> because it > >>>>>>>>>>>> allows to have full flexibility to compute the final column: > >>>>>>>>>>>> > >>>>>>>>>>>> timestamp AS adjustTimestamp(CAST(SYSTEM_METADATA("ts") AS > >>>>>>>>>> TIMESTAMP(3))) > >>>>>>>>>>>> > >>>>>>>>>>>> Instead of having a helper column and a real column in the > >>>>>> table: > >>>>>>>>>>>> > >>>>>>>>>>>> helperTimestamp AS CAST(SYSTEM_METADATA("ts") AS TIMESTAMP(3)) > >>>>>>>>>>>> realTimestamp AS adjustTimestamp(helperTimestamp) > >>>>>>>>>>>> > >>>>>>>>>>>> But I see that the discussion leans towards: > >>>>>>>>>>>> > >>>>>>>>>>>> timestamp INT SYSTEM_METADATA("ts") > >>>>>>>>>>>> > >>>>>>>>>>>> Which is fine with me. It is the shortest solution, because > >>>>>> we don't > >>>>>>>>>>>> need additional CAST. We can discuss the syntax, so that > >>>>>> confusion > >>>>>>>>>>>> with > >>>>>>>>>>>> computed columns can be avoided. > >>>>>>>>>>>> > >>>>>>>>>>>> timestamp INT USING SYSTEM_METADATA("ts") > >>>>>>>>>>>> timestamp INT FROM SYSTEM_METADATA("ts") > >>>>>>>>>>>> timestamp INT FROM SYSTEM_METADATA("ts") PERSISTED > >>>>>>>>>>>> > >>>>>>>>>>>> We use `SYSTEM_TIME` for temporal tables. I think prefixing > >>>>>> with > >>>>>>>>>>>> SYSTEM > >>>>>>>>>>>> makes it clearer that it comes magically from the system. > >>>>>>>>>>>> > >>>>>>>>>>>> What do you think? > >>>>>>>>>>>> > >>>>>>>>>>>> Regards, > >>>>>>>>>>>> Timo > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> On 09.09.20 11:41, Jark Wu wrote: > >>>>>>>>>>>>> Hi Danny, > >>>>>>>>>>>>> > >>>>>>>>>>>>> This is not Oracle and MySQL computed column syntax, > >>>>>> because there is > >>>>>>>>>> no > >>>>>>>>>>>>> "AS" after the type. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Hi everyone, > >>>>>>>>>>>>> > >>>>>>>>>>>>> If we want to use "offset INT SYSTEM_METADATA("offset")", > >>>>>> then I > >>>>>>>>>>>>> think > >>>>>>>>>> we > >>>>>>>>>>>>> must further discuss about "PERSISED" or "VIRTUAL" keyword > >>>>>> for > >>>>>>>>>> query-sink > >>>>>>>>>>>>> schema problem. > >>>>>>>>>>>>> Personally, I think we can use a shorter keyword "METADATA" > >>>>>> for > >>>>>>>>>>>>> "SYSTEM_METADATA". Because "SYSTEM_METADATA" sounds like a > >>>>>> system > >>>>>>>>>>>> function > >>>>>>>>>>>>> and confuse users this looks like a computed column. > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> Best, > >>>>>>>>>>>>> Jark > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> On Wed, 9 Sep 2020 at 17:23, Danny Chan < > >>>>>> danny0...@apache.org> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>>> "offset INT SYSTEM_METADATA("offset")" > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> This is actually Oracle or MySQL style computed column > >>>>>> syntax. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> "You are right that one could argue that "timestamp", > >>>>>> "headers" are > >>>>>>>>>>>>>> something like "key" and "value"" > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I have the same feeling, both key value and headers > >>>>>> timestamp are > >>>>>>>>>> *real* > >>>>>>>>>>>>>> data > >>>>>>>>>>>>>> stored in the consumed record, they are not computed or > >>>>>> generated. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> "Trying to solve everything via properties sounds rather > >>>>>> like a hack > >>>>>>>>>> to > >>>>>>>>>>>>>> me" > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Things are not that hack if we can unify the routines or > >>>>>> the > >>>>>>>>>> definitions > >>>>>>>>>>>>>> (all from the computed column way or all from the table > >>>>>> options), i > >>>>>>>>>> also > >>>>>>>>>>>>>> think that it is a hacky that we mix in 2 kinds of syntax > >>>>>> for > >>>>>>>>>> different > >>>>>>>>>>>>>> kinds of metadata (read-only and read-write). In this > >>>>>> FLIP, we > >>>>>>>>>>>>>> declare > >>>>>>>>>>>> the > >>>>>>>>>>>>>> Kafka key fields with table options but SYSTEM_METADATA > >>>>>> for other > >>>>>>>>>>>> metadata, > >>>>>>>>>>>>>> that is a hacky thing or something in-consistent. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Kurt Young <ykt...@gmail.com> 于2020年9月9日周三 下午4:48写道: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> I would vote for `offset INT > >>>>>> SYSTEM_METADATA("offset")`. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> I don't think we can stick with the SQL standard in DDL > >>>>>> part > >>>>>>>>>>>>>>> forever, > >>>>>>>>>>>>>>> especially as there are more and more > >>>>>>>>>>>>>>> requirements coming from different connectors and > >>>>>> external systems. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>>> Kurt > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On Wed, Sep 9, 2020 at 4:40 PM Timo Walther < > >>>>>> twal...@apache.org> > >>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Hi Jark, > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> now we are back at the original design proposed by > >>>>>> Dawid :D > >>>>>>>>>>>>>>>> Yes, we > >>>>>>>>>>>>>>>> should be cautious about adding new syntax. But the > >>>>>> length of this > >>>>>>>>>>>>>>>> discussion shows that we are looking for a good > >>>>>> long-term > >>>>>>>>>>>>>>>> solution. > >>>>>>>>>> In > >>>>>>>>>>>>>>>> this case I would rather vote for a deep integration > >>>>>> into the > >>>>>>>>>> syntax. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Computed columns are also not SQL standard compliant. > >>>>>> And our > >>>>>>>>>>>>>>>> DDL is > >>>>>>>>>>>>>>>> neither, so we have some degree of freedom here. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Trying to solve everything via properties sounds > >>>>>> rather like a > >>>>>>>>>>>>>>>> hack > >>>>>>>>>> to > >>>>>>>>>>>>>>>> me. You are right that one could argue that > >>>>>> "timestamp", "headers" > >>>>>>>>>> are > >>>>>>>>>>>>>>>> something like "key" and "value". However, mixing > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> `offset AS SYSTEM_METADATA("offset")` > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> `'timestamp.field' = 'ts'` > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> looks more confusing to users that an explicit > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> `offset AS CAST(SYSTEM_METADATA("offset") AS INT)` > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> or > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> `offset INT SYSTEM_METADATA("offset")` > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> that is symetric for both source and sink. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> What do others think? > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>>>> Timo > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> On 09.09.20 10:09, Jark Wu wrote: > >>>>>>>>>>>>>>>>> Hi everyone, > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> I think we have a conclusion that the writable > >>>>>> metadata shouldn't > >>>>>>>>>> be > >>>>>>>>>>>>>>>>> defined as a computed column, but a normal column. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> "timestamp STRING SYSTEM_METADATA('timestamp')" is > >>>>>> one of the > >>>>>>>>>>>>>>> approaches. > >>>>>>>>>>>>>>>>> However, it is not SQL standard compliant, we need > >>>>>> to be cautious > >>>>>>>>>>>>>>> enough > >>>>>>>>>>>>>>>>> when adding new syntax. > >>>>>>>>>>>>>>>>> Besides, we have to introduce the `PERSISTED` or > >>>>>> `VIRTUAL` > >>>>>>>>>>>>>>>>> keyword > >>>>>>>>>> to > >>>>>>>>>>>>>>>>> resolve the query-sink schema problem if it is > >>>>>> read-only > >>>>>>>>>>>>>>>>> metadata. > >>>>>>>>>>>>>> That > >>>>>>>>>>>>>>>>> adds more stuff to learn for users. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> From my point of view, the "timestamp", > >>>>>> "headers" are something > >>>>>>>>>> like > >>>>>>>>>>>>>>>> "key" > >>>>>>>>>>>>>>>>> and "value" that stores with the real data. So why > >>>>>> not define the > >>>>>>>>>>>>>>>>> "timestamp" in the same way with "key" by using a > >>>>>>>>>>>>>>>>> "timestamp.field" > >>>>>>>>>>>>>>>>> connector option? > >>>>>>>>>>>>>>>>> On the other side, the read-only metadata, such as > >>>>>> "offset", > >>>>>>>>>>>>>> shouldn't > >>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>> defined as a normal column. So why not use the > >>>>>> existing computed > >>>>>>>>>>>>>> column > >>>>>>>>>>>>>>>>> syntax for such metadata? Then we don't have the > >>>>>> query-sink > >>>>>>>>>>>>>>>>> schema > >>>>>>>>>>>>>>>> problem. > >>>>>>>>>>>>>>>>> So here is my proposal: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> CREATE TABLE kafka_table ( > >>>>>>>>>>>>>>>>> id BIGINT, > >>>>>>>>>>>>>>>>> name STRING, > >>>>>>>>>>>>>>>>> col1 STRING, > >>>>>>>>>>>>>>>>> col2 STRING, > >>>>>>>>>>>>>>>>> ts TIMESTAMP(3) WITH LOCAL TIME ZONE, -- ts > >>>>>> is a normal > >>>>>>>>>> field, > >>>>>>>>>>>>>> so > >>>>>>>>>>>>>>>> can > >>>>>>>>>>>>>>>>> be read and written. > >>>>>>>>>>>>>>>>> offset AS SYSTEM_METADATA("offset") > >>>>>>>>>>>>>>>>> ) WITH ( > >>>>>>>>>>>>>>>>> 'connector' = 'kafka', > >>>>>>>>>>>>>>>>> 'topic' = 'test-topic', > >>>>>>>>>>>>>>>>> 'key.fields' = 'id, name', > >>>>>>>>>>>>>>>>> 'key.format' = 'csv', > >>>>>>>>>>>>>>>>> 'value.format' = 'avro', > >>>>>>>>>>>>>>>>> 'timestamp.field' = 'ts' -- define the > >>>>>> mapping of Kafka > >>>>>>>>>>>>>> timestamp > >>>>>>>>>>>>>>>>> ); > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> INSERT INTO kafka_table > >>>>>>>>>>>>>>>>> SELECT id, name, col1, col2, rowtime FROM > >>>>>> another_table; > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> I think this can solve all the problems without > >>>>>> introducing > >>>>>>>>>>>>>>>>> any new > >>>>>>>>>>>>>>>> syntax. > >>>>>>>>>>>>>>>>> The only minor disadvantage is that we separate the > >>>>>> definition > >>>>>>>>>>>>>>> way/syntax > >>>>>>>>>>>>>>>>> of read-only metadata and read-write fields. > >>>>>>>>>>>>>>>>> However, I don't think this is a big problem. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>>>>> Jark > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> On Wed, 9 Sep 2020 at 15:09, Timo Walther < > >>>>>> twal...@apache.org> > >>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Hi Kurt, > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> thanks for sharing your opinion. I'm totally up > >>>>>> for not reusing > >>>>>>>>>>>>>>> computed > >>>>>>>>>>>>>>>>>> columns. I think Jark was a big supporter of this > >>>>>> syntax, @Jark > >>>>>>>>>> are > >>>>>>>>>>>>>>> you > >>>>>>>>>>>>>>>>>> fine with this as well? The non-computed column > >>>>>> approach was > >>>>>>>>>>>>>>>>>> only > >>>>>>>>>> a > >>>>>>>>>>>>>>>>>> "slightly rejected alternative". > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Furthermore, we would need to think about how > >>>>>> such a new design > >>>>>>>>>>>>>>>>>> influences the LIKE clause though. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> However, we should still keep the `PERSISTED` > >>>>>> keyword as it > >>>>>>>>>>>>>> influences > >>>>>>>>>>>>>>>>>> the query->sink schema. If you look at the list > >>>>>> of metadata for > >>>>>>>>>>>>>>> existing > >>>>>>>>>>>>>>>>>> connectors and formats, we currently offer only > >>>>>> two writable > >>>>>>>>>>>>>> metadata > >>>>>>>>>>>>>>>>>> fields. Otherwise, one would need to declare two > >>>>>> tables > >>>>>>>>>>>>>>>>>> whenever a > >>>>>>>>>>>>>>>>>> metadata columns is read (one for the source, one > >>>>>> for the sink). > >>>>>>>>>>>>>> This > >>>>>>>>>>>>>>>>>> can be quite inconvientient e.g. for just reading > >>>>>> the topic. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>>>>>> Timo > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> On 09.09.20 08:52, Kurt Young wrote: > >>>>>>>>>>>>>>>>>>> I also share the concern that reusing the > >>>>>> computed column > >>>>>>>>>>>>>>>>>>> syntax > >>>>>>>>>>>>>> but > >>>>>>>>>>>>>>>> have > >>>>>>>>>>>>>>>>>>> different semantics > >>>>>>>>>>>>>>>>>>> would confuse users a lot. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Besides, I think metadata fields are > >>>>>> conceptually not the same > >>>>>>>>>> with > >>>>>>>>>>>>>>>>>>> computed columns. The metadata > >>>>>>>>>>>>>>>>>>> field is a connector specific thing and it only > >>>>>> contains the > >>>>>>>>>>>>>>>> information > >>>>>>>>>>>>>>>>>>> that where does the field come > >>>>>>>>>>>>>>>>>>> from (during source) or where does the field > >>>>>> need to write to > >>>>>>>>>>>>>> (during > >>>>>>>>>>>>>>>>>>> sink). It's more similar with normal > >>>>>>>>>>>>>>>>>>> fields, with assumption that all these fields > >>>>>> need going to the > >>>>>>>>>>>>>> data > >>>>>>>>>>>>>>>>>> part. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Thus I'm more lean to the rejected alternative > >>>>>> that Timo > >>>>>>>>>> mentioned. > >>>>>>>>>>>>>>>> And I > >>>>>>>>>>>>>>>>>>> think we don't need the > >>>>>>>>>>>>>>>>>>> PERSISTED keyword, SYSTEM_METADATA should be > >>>>>> enough. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> During implementation, the framework only needs > >>>>>> to pass such > >>>>>>>>>>>>>> <field, > >>>>>>>>>>>>>>>>>>> metadata field> information to the > >>>>>>>>>>>>>>>>>>> connector, and the logic of handling such > >>>>>> fields inside the > >>>>>>>>>>>>>> connector > >>>>>>>>>>>>>>>>>>> should be straightforward. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Regarding the downside Timo mentioned: > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> The disadvantage is that users cannot call > >>>>>> UDFs or parse > >>>>>>>>>>>>>> timestamps. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> I think this is fairly simple to solve. Since > >>>>>> the metadata > >>>>>>>>>>>>>>>>>>> field > >>>>>>>>>>>>>>> isn't > >>>>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>>>> computed column anymore, we can support > >>>>>>>>>>>>>>>>>>> referencing such fields in the computed column. > >>>>>> For example: > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> CREATE TABLE kafka_table ( > >>>>>>>>>>>>>>>>>>> id BIGINT, > >>>>>>>>>>>>>>>>>>> name STRING, > >>>>>>>>>>>>>>>>>>> timestamp STRING > >>>>>> SYSTEM_METADATA("timestamp"), // > >>>>>>>>>>>>>>>>>>> get the > >>>>>>>>>>>>>>>>>> timestamp > >>>>>>>>>>>>>>>>>>> field from metadata > >>>>>>>>>>>>>>>>>>> ts AS to_timestamp(timestamp) // normal > >>>>>> computed > >>>>>>>>>>>>>>>>>>> column, > >>>>>>>>>>>>>> parse > >>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>> string to TIMESTAMP type by using the metadata > >>>>>> field > >>>>>>>>>>>>>>>>>>> ) WITH ( > >>>>>>>>>>>>>>>>>>> ... > >>>>>>>>>>>>>>>>>>> ) > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>>>>>>> Kurt > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> On Tue, Sep 8, 2020 at 11:57 PM Timo Walther > >>>>>>>>>>>>>>>>>>> <twal...@apache.org > >>>>>>>>>>> > >>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Hi Leonard, > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> the only alternative I see is that we > >>>>>> introduce a concept that > >>>>>>>>>> is > >>>>>>>>>>>>>>>>>>>> completely different to computed columns. > >>>>>> This is also > >>>>>>>>>>>>>>>>>>>> mentioned > >>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>> rejected alternative section of the FLIP. > >>>>>> Something like: > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> CREATE TABLE kafka_table ( > >>>>>>>>>>>>>>>>>>>> id BIGINT, > >>>>>>>>>>>>>>>>>>>> name STRING, > >>>>>>>>>>>>>>>>>>>> timestamp INT > >>>>>> SYSTEM_METADATA("timestamp") PERSISTED, > >>>>>>>>>>>>>>>>>>>> headers MAP<STRING, BYTES> > >>>>>> SYSTEM_METADATA("headers") > >>>>>>>>>>>>>>> PERSISTED > >>>>>>>>>>>>>>>>>>>> ) WITH ( > >>>>>>>>>>>>>>>>>>>> ... > >>>>>>>>>>>>>>>>>>>> ) > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> This way we would avoid confusion at all and > >>>>>> can easily map > >>>>>>>>>>>>>> columns > >>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>> metadata columns. The disadvantage is that > >>>>>> users cannot call > >>>>>>>>>> UDFs > >>>>>>>>>>>>>> or > >>>>>>>>>>>>>>>>>>>> parse timestamps. This would need to be done > >>>>>> in a real > >>>>>>>>>>>>>>>>>>>> computed > >>>>>>>>>>>>>>>> column. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> I'm happy about better alternatives. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>>>>>>>> Timo > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> On 08.09.20 15:37, Leonard Xu wrote: > >>>>>>>>>>>>>>>>>>>>> HI, Timo > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Thanks for driving this FLIP. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Sorry but I have a concern about Writing > >>>>>> metadata via > >>>>>>>>>>>>>>>> DynamicTableSink > >>>>>>>>>>>>>>>>>>>> section: > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> CREATE TABLE kafka_table ( > >>>>>>>>>>>>>>>>>>>>> id BIGINT, > >>>>>>>>>>>>>>>>>>>>> name STRING, > >>>>>>>>>>>>>>>>>>>>> timestamp AS > >>>>>> CAST(SYSTEM_METADATA("timestamp") AS > >>>>>>>>>>>>>>>>>>>>> BIGINT) > >>>>>>>>>>>>>>>>>> PERSISTED, > >>>>>>>>>>>>>>>>>>>>> headers AS > >>>>>> CAST(SYSTEM_METADATA("headers") AS > >>>>>>>>>>>>>>>>>>>>> MAP<STRING, > >>>>>>>>>>>>>>>> BYTES>) > >>>>>>>>>>>>>>>>>>>> PERSISTED > >>>>>>>>>>>>>>>>>>>>> ) WITH ( > >>>>>>>>>>>>>>>>>>>>> ... > >>>>>>>>>>>>>>>>>>>>> ) > >>>>>>>>>>>>>>>>>>>>> An insert statement could look like: > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> INSERT INTO kafka_table VALUES ( > >>>>>>>>>>>>>>>>>>>>> (1, "ABC", 1599133672, MAP('checksum', > >>>>>>>>>>>>>> computeChecksum(...))) > >>>>>>>>>>>>>>>>>>>>> ) > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> The proposed INERT syntax does not make > >>>>>> sense to me, > >>>>>>>>>>>>>>>>>>>>> because it > >>>>>>>>>>>>>>>>>> contains > >>>>>>>>>>>>>>>>>>>> computed(generated) column. > >>>>>>>>>>>>>>>>>>>>> Both SQL server and Postgresql do not allow > >>>>>> to insert > >>>>>>>>>>>>>>>>>>>>> value to > >>>>>>>>>>>>>>>> computed > >>>>>>>>>>>>>>>>>>>> columns even they are persisted, this boke > >>>>>> the generated > >>>>>>>>>>>>>>>>>>>> column > >>>>>>>>>>>>>>>>>> semantics > >>>>>>>>>>>>>>>>>>>> and may confuse user much. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> For SQL server computed column[1]: > >>>>>>>>>>>>>>>>>>>>>> column_name AS computed_column_expression > >>>>>> [ PERSISTED [ NOT > >>>>>>>>>>>>>> NULL ] > >>>>>>>>>>>>>>>>>> ]... > >>>>>>>>>>>>>>>>>>>>>> NOTE: A computed column cannot be the > >>>>>> target of an INSERT or > >>>>>>>>>>>>>>> UPDATE > >>>>>>>>>>>>>>>>>>>> statement. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> For Postgresql generated column[2]: > >>>>>>>>>>>>>>>>>>>>>> height_in numeric GENERATED ALWAYS > >>>>>> AS (height_cm / > >>>>>>>>>>>>>>>>>>>>>> 2.54) > >>>>>>>>>>>>>>> STORED > >>>>>>>>>>>>>>>>>>>>>> NOTE: A generated column cannot be > >>>>>> written to directly. In > >>>>>>>>>>>>>> INSERT > >>>>>>>>>>>>>>> or > >>>>>>>>>>>>>>>>>>>> UPDATE commands, a value cannot be specified > >>>>>> for a generated > >>>>>>>>>>>>>> column, > >>>>>>>>>>>>>>>> but > >>>>>>>>>>>>>>>>>>>> the keyword DEFAULT may be specified. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> It shouldn't be allowed to set/update value > >>>>>> for generated > >>>>>>>>>> column > >>>>>>>>>>>>>>>> after > >>>>>>>>>>>>>>>>>>>> lookup the SQL 2016: > >>>>>>>>>>>>>>>>>>>>>> <insert statement> ::= > >>>>>>>>>>>>>>>>>>>>>> INSERT INTO <insertion target> <insert > >>>>>> columns and source> > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> If <contextually typed table value > >>>>>> constructor> CTTVC is > >>>>>>>>>>>>>>> specified, > >>>>>>>>>>>>>>>>>>>> then every <contextually typed row > >>>>>>>>>>>>>>>>>>>>>> value constructor element> simply > >>>>>> contained in CTTVC whose > >>>>>>>>>>>>>>>>>> positionally > >>>>>>>>>>>>>>>>>>>> corresponding <column name> > >>>>>>>>>>>>>>>>>>>>>> in <insert column list> references a > >>>>>> column of which some > >>>>>>>>>>>>>>> underlying > >>>>>>>>>>>>>>>>>>>> column is a generated column shall > >>>>>>>>>>>>>>>>>>>>>> be a <default specification>. > >>>>>>>>>>>>>>>>>>>>>> A <default specification> specifies the > >>>>>> default value of > >>>>>>>>>>>>>>>>>>>>>> some > >>>>>>>>>>>>>>>>>>>> associated item. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> [1] > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>> > >>>>>> > https://docs.microsoft.com/en-US/sql/t-sql/statements/alter-table-computed-column-definition-transact-sql?view=sql-server-ver15 > >>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> < > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>> > >>>>>> > https://docs.microsoft.com/en-US/sql/t-sql/statements/alter-table-computed-column-definition-transact-sql?view=sql-server-ver15 > >>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> [2] > >>>>>>>>>>>>>> > >>>>>> https://www.postgresql.org/docs/12/ddl-generated-columns.html > >>>>>>>>>>>>>>> < > >>>>>>>>>>>>>>>>>>>> > >>>>>> https://www.postgresql.org/docs/12/ddl-generated-columns.html> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> 在 2020年9月8日,17:31,Timo Walther < > >>>>>> twal...@apache.org> > >>>>>>>>>>>>>>>>>>>>>> 写道: > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Hi Jark, > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> according to Flink's and Calcite's > >>>>>> casting definition in > >>>>>>>>>> [1][2] > >>>>>>>>>>>>>>>>>>>> TIMESTAMP WITH LOCAL TIME ZONE should be > >>>>>> castable from BIGINT. > >>>>>>>>>> If > >>>>>>>>>>>>>>> not, > >>>>>>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>>>>>> will make it possible ;-) > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> I'm aware of > >>>>>> DeserializationSchema.getProducedType but I > >>>>>>>>>>>>>>>>>>>>>> think > >>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>>> this method is actually misplaced. The type > >>>>>> should rather be > >>>>>>>>>>>>>> passed > >>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>> source itself. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> For our Kafka SQL source, we will also > >>>>>> not use this method > >>>>>>>>>>>>>> because > >>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>> Kafka source will add own metadata in > >>>>>> addition to the > >>>>>>>>>>>>>>>>>>>> DeserializationSchema. So > >>>>>>>>>>>>>>>>>>>> DeserializationSchema.getProducedType > >>>>>>>>>>>>>> will > >>>>>>>>>>>>>>>>>> never > >>>>>>>>>>>>>>>>>>>> be read. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> For now I suggest to leave out the > >>>>>> `DataType` from > >>>>>>>>>>>>>>>>>>>> DecodingFormat.applyReadableMetadata. Also > >>>>>> because the > >>>>>>>>>>>>>>>>>>>> format's > >>>>>>>>>>>>>>>> physical > >>>>>>>>>>>>>>>>>>>> type is passed later in > >>>>>> `createRuntimeDecoder`. If > >>>>>>>>>>>>>>>>>>>> necessary, it > >>>>>>>>>>>>>> can > >>>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>>>> computed manually by consumedType + metadata > >>>>>> types. We will > >>>>>>>>>>>>>> provide > >>>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>>>>> metadata utility class for that. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>>>>>>>>>> Timo > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> [1] > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>> > >>>>>> > https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java#L200 > >>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> [2] > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>> > >>>>>> > https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/sql/type/SqlTypeCoercionRule.java#L254 > >>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> On 08.09.20 10:52, Jark Wu wrote: > >>>>>>>>>>>>>>>>>>>>>>> Hi Timo, > >>>>>>>>>>>>>>>>>>>>>>> The updated CAST SYSTEM_METADATA > >>>>>> behavior sounds good to > >>>>>>>>>>>>>>>>>>>>>>> me. > >>>>>>>>>> I > >>>>>>>>>>>>>>> just > >>>>>>>>>>>>>>>>>>>> noticed > >>>>>>>>>>>>>>>>>>>>>>> that a BIGINT can't be converted to > >>>>>> "TIMESTAMP(3) WITH > >>>>>>>>>>>>>>>>>>>>>>> LOCAL > >>>>>>>>>>>>>> TIME > >>>>>>>>>>>>>>>>>>>> ZONE". > >>>>>>>>>>>>>>>>>>>>>>> So maybe we need to support this, or > >>>>>> use "TIMESTAMP(3) WITH > >>>>>>>>>>>>>> LOCAL > >>>>>>>>>>>>>>>>>> TIME > >>>>>>>>>>>>>>>>>>>>>>> ZONE" as the defined type of Kafka > >>>>>> timestamp? I think this > >>>>>>>>>>>>>> makes > >>>>>>>>>>>>>>>>>> sense, > >>>>>>>>>>>>>>>>>>>>>>> because it represents the milli-seconds > >>>>>> since epoch. > >>>>>>>>>>>>>>>>>>>>>>> Regarding "DeserializationSchema > >>>>>> doesn't need TypeInfo", I > >>>>>>>>>>>>>> don't > >>>>>>>>>>>>>>>>>> think > >>>>>>>>>>>>>>>>>>>> so. > >>>>>>>>>>>>>>>>>>>>>>> The DeserializationSchema implements > >>>>>> ResultTypeQueryable, > >>>>>>>>>> thus > >>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>> implementation needs to return an > >>>>>> output TypeInfo. > >>>>>>>>>>>>>>>>>>>>>>> Besides, FlinkKafkaConsumer also > >>>>>>>>>>>>>>>>>>>>>>> calls > >>>>>> DeserializationSchema.getProducedType as the produced > >>>>>>>>>>>>>> type > >>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>> source function [1]. > >>>>>>>>>>>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>>>>>>>>>>> Jark > >>>>>>>>>>>>>>>>>>>>>>> [1]: > >>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>> > >>>>>> > https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L1066 > >>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>> On Tue, 8 Sep 2020 at 16:35, Timo > >>>>>> Walther < > >>>>>>>>>> twal...@apache.org> > >>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>>>> Hi everyone, > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> I updated the FLIP again and hope > >>>>>> that I could address the > >>>>>>>>>>>>>>>> mentioned > >>>>>>>>>>>>>>>>>>>>>>>> concerns. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> @Leonard: Thanks for the explanation. > >>>>>> I wasn't aware that > >>>>>>>>>>>>>> ts_ms > >>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>>>>> source.ts_ms have different > >>>>>> semantics. I updated the FLIP > >>>>>>>>>> and > >>>>>>>>>>>>>>>> expose > >>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>> most commonly used properties > >>>>>> separately. So frequently > >>>>>>>>>>>>>>>>>>>>>>>> used > >>>>>>>>>>>>>>>>>>>> properties > >>>>>>>>>>>>>>>>>>>>>>>> are not hidden in the MAP anymore: > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> debezium-json.ingestion-timestamp > >>>>>>>>>>>>>>>>>>>>>>>> debezium-json.source.timestamp > >>>>>>>>>>>>>>>>>>>>>>>> debezium-json.source.database > >>>>>>>>>>>>>>>>>>>>>>>> debezium-json.source.schema > >>>>>>>>>>>>>>>>>>>>>>>> debezium-json.source.table > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> However, since other properties > >>>>>> depend on the used > >>>>>>>>>>>>>>>> connector/vendor, > >>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>> remaining options are stored in: > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> debezium-json.source.properties > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> And accessed with: > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>> CAST(SYSTEM_METADATA('debezium-json.source.properties') AS > >>>>>>>>>>>>>>>>>> MAP<STRING, > >>>>>>>>>>>>>>>>>>>>>>>> STRING>)['table'] > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> Otherwise it is not possible to > >>>>>> figure out the value and > >>>>>>>>>>>>>> column > >>>>>>>>>>>>>>>> type > >>>>>>>>>>>>>>>>>>>>>>>> during validation. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> @Jark: You convinced me in relaxing > >>>>>> the CAST > >>>>>>>>>>>>>>>>>>>>>>>> constraints. I > >>>>>>>>>>>>>>> added > >>>>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>>>>>>>>> dedicacated sub-section to the FLIP: > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> For making the use of SYSTEM_METADATA > >>>>>> easier and avoid > >>>>>>>>>> nested > >>>>>>>>>>>>>>>>>> casting > >>>>>>>>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>>>>>>>>>> allow explicit casting to a target > >>>>>> data type: > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> rowtime AS > >>>>>> CAST(SYSTEM_METADATA("timestamp") AS > >>>>>>>>>>>>>>>>>>>>>>>> TIMESTAMP(3) > >>>>>>>>>>>>>>> WITH > >>>>>>>>>>>>>>>>>>>> LOCAL > >>>>>>>>>>>>>>>>>>>>>>>> TIME ZONE) > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> A connector still produces and > >>>>>> consumes the data type > >>>>>>>>>> returned > >>>>>>>>>>>>>>> by > >>>>>>>>>>>>>>>>>>>>>>>> `listMetadata()`. The planner will > >>>>>> insert necessary > >>>>>>>>>>>>>>>>>>>>>>>> explicit > >>>>>>>>>>>>>>>> casts. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> In any case, the user must provide a > >>>>>> CAST such that the > >>>>>>>>>>>>>> computed > >>>>>>>>>>>>>>>>>>>> column > >>>>>>>>>>>>>>>>>>>>>>>> receives a valid data type when > >>>>>> constructing the table > >>>>>>>>>> schema. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> "I don't see a reason why > >>>>>>>>>>>>>> `DecodingFormat#applyReadableMetadata` > >>>>>>>>>>>>>>>>>>>> needs a > >>>>>>>>>>>>>>>>>>>>>>>> DataType argument." > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> Correct he DeserializationSchema > >>>>>> doesn't need TypeInfo, it > >>>>>>>>>> is > >>>>>>>>>>>>>>>> always > >>>>>>>>>>>>>>>>>>>>>>>> executed locally. It is the source > >>>>>> that needs TypeInfo for > >>>>>>>>>>>>>>>>>> serializing > >>>>>>>>>>>>>>>>>>>>>>>> the record to the next operator. And > >>>>>> that's this is > >>>>>>>>>>>>>>>>>>>>>>>> what we > >>>>>>>>>>>>>>>> provide. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> @Danny: > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> “SYSTEM_METADATA("offset")` returns > >>>>>> the NULL type by > >>>>>>>>>> default” > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> We can also use some other means to > >>>>>> represent an UNKNOWN > >>>>>>>>>> data > >>>>>>>>>>>>>>>> type. > >>>>>>>>>>>>>>>>>> In > >>>>>>>>>>>>>>>>>>>>>>>> the Flink type system, we use the > >>>>>> NullType for it. The > >>>>>>>>>>>>>> important > >>>>>>>>>>>>>>>>>> part > >>>>>>>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>>>>>>> that the final data type is known for > >>>>>> the entire computed > >>>>>>>>>>>>>>> column. > >>>>>>>>>>>>>>>>>> As I > >>>>>>>>>>>>>>>>>>>>>>>> mentioned before, I would avoid the > >>>>>> suggested option b) > >>>>>>>>>>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>> would > >>>>>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>>>>>>>> similar to your suggestion. The CAST > >>>>>> should be enough and > >>>>>>>>>>>>>> allows > >>>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>>>>>>>> complex expressions in the computed > >>>>>> column. Option b) > >>>>>>>>>>>>>>>>>>>>>>>> would > >>>>>>>>>>>>>> need > >>>>>>>>>>>>>>>>>>>> parser > >>>>>>>>>>>>>>>>>>>>>>>> changes. > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>>>>>>>>>>>> Timo > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> On 08.09.20 06:21, Leonard Xu wrote: > >>>>>>>>>>>>>>>>>>>>>>>>> Hi, Timo > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> Thanks for you explanation and > >>>>>> update, I have only one > >>>>>>>>>>>>>>> question > >>>>>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>>>>>>>> the latest FLIP. > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> About the MAP<STRING, STRING> > >>>>>> DataType of key > >>>>>>>>>>>>>>>>>>>> 'debezium-json.source', if > >>>>>>>>>>>>>>>>>>>>>>>> user want to use the table name > >>>>>> metadata, they need to > >>>>>>>>>> write: > >>>>>>>>>>>>>>>>>>>>>>>>> tableName STRING AS > >>>>>>>>>>>>>> CAST(SYSTEM_METADATA('debeuim-json.source') > >>>>>>>>>>>>>>>> AS > >>>>>>>>>>>>>>>>>>>>>>>> MAP<STRING, STRING>)['table'] > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> the expression is a little complex > >>>>>> for user, Could we > >>>>>>>>>>>>>>>>>>>>>>>>> only > >>>>>>>>>>>>>>>> support > >>>>>>>>>>>>>>>>>>>>>>>> necessary metas with simple DataType > >>>>>> as following? > >>>>>>>>>>>>>>>>>>>>>>>>> tableName STRING AS > >>>>>>>>>>>>>>>>>>>> > >>>>>> CAST(SYSTEM_METADATA('debeuim-json.source.table') AS > >>>>>>>>>>>>>>>>>>>>>>>> STRING), > >>>>>>>>>>>>>>>>>>>>>>>>> transactionTime LONG AS > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>> CAST(SYSTEM_METADATA('debeuim-json.source.ts_ms') AS > >>>>>>>>>> BIGINT), > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> In this way, we can simplify the > >>>>>> expression, the mainly > >>>>>>>>>> used > >>>>>>>>>>>>>>>>>>>> metadata in > >>>>>>>>>>>>>>>>>>>>>>>> changelog format may include > >>>>>>>>>>>>>>>>>>>> 'database','table','source.ts_ms','ts_ms' from > >>>>>>>>>>>>>>>>>>>>>>>> my side, > >>>>>>>>>>>>>>>>>>>>>>>>> maybe we could only support them at > >>>>>> first version. > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> Both Debezium and Canal have above > >>>>>> four metadata, and I‘m > >>>>>>>>>>>>>>> willing > >>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>>>> take some subtasks in next > >>>>>> development if necessary. > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> Debezium: > >>>>>>>>>>>>>>>>>>>>>>>>> { > >>>>>>>>>>>>>>>>>>>>>>>>> "before": null, > >>>>>>>>>>>>>>>>>>>>>>>>> "after": { "id": > >>>>>> 101,"name": "scooter"}, > >>>>>>>>>>>>>>>>>>>>>>>>> "source": { > >>>>>>>>>>>>>>>>>>>>>>>>> "db": > >>>>>> "inventory", # 1. > >>>>>>>>>>>>>>>>>>>>>>>>> database > >>>>>>>>>>>>>> name > >>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>> changelog belongs to. > >>>>>>>>>>>>>>>>>>>>>>>>> "table": > >>>>>> "products", # 2. > >>>>>>>>>>>>>>>>>>>>>>>>> table name > >>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>> changelog > >>>>>>>>>>>>>>>>>>>>>>>> belongs to. > >>>>>>>>>>>>>>>>>>>>>>>>> "ts_ms": > >>>>>> 1589355504100, # 3. > >>>>>>>>>>>>>>>>>>>>>>>>> timestamp > >>>>>>>>>>>> of > >>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>> change > >>>>>>>>>>>>>>>>>>>>>>>> happened in database system, i.e.: > >>>>>> transaction time in > >>>>>>>>>>>>>> database. > >>>>>>>>>>>>>>>>>>>>>>>>> "connector": "mysql", > >>>>>>>>>>>>>>>>>>>>>>>>> …. > >>>>>>>>>>>>>>>>>>>>>>>>> }, > >>>>>>>>>>>>>>>>>>>>>>>>> "ts_ms": > >>>>>> 1589355606100, # 4. > >>>>>>>>>>>>>>>>>>>>>>>>> timestamp > >>>>>>>>>>>>>> when > >>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>> debezium > >>>>>>>>>>>>>>>>>>>>>>>> processed the changelog. > >>>>>>>>>>>>>>>>>>>>>>>>> "op": "c", > >>>>>>>>>>>>>>>>>>>>>>>>> "transaction": null > >>>>>>>>>>>>>>>>>>>>>>>>> } > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> Canal: > >>>>>>>>>>>>>>>>>>>>>>>>> { > >>>>>>>>>>>>>>>>>>>>>>>>> "data": [{ "id": "102", > >>>>>> "name": "car battery" }], > >>>>>>>>>>>>>>>>>>>>>>>>> "database": > >>>>>> "inventory", # 1. database > >>>>>>>>>>>>>>>>>>>>>>>>> name the > >>>>>>>>>>>>>>>> changelog > >>>>>>>>>>>>>>>>>>>>>>>> belongs to. > >>>>>>>>>>>>>>>>>>>>>>>>> "table": > >>>>>> "products", # 2. table name the > >>>>>>>>>>>>>>> changelog > >>>>>>>>>>>>>>>>>>>> belongs > >>>>>>>>>>>>>>>>>>>>>>>> to. > >>>>>>>>>>>>>>>>>>>>>>>>> "es": > >>>>>> 1589374013000, # 3. execution > >>>>>>>>>>>>>>>>>>>>>>>>> time of > >>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>> change > >>>>>>>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>>>>>>>> database system, i.e.: transaction > >>>>>> time in database. > >>>>>>>>>>>>>>>>>>>>>>>>> "ts": > >>>>>> 1589374013680, # 4. timestamp > >>>>>>>>>>>>>>>>>>>>>>>>> when the > >>>>>>>>>>>>>>>> cannal > >>>>>>>>>>>>>>>>>>>>>>>> processed the changelog. > >>>>>>>>>>>>>>>>>>>>>>>>> "isDdl": false, > >>>>>>>>>>>>>>>>>>>>>>>>> "mysqlType": {}, > >>>>>>>>>>>>>>>>>>>>>>>>> .... > >>>>>>>>>>>>>>>>>>>>>>>>> } > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> Best > >>>>>>>>>>>>>>>>>>>>>>>>> Leonard > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> 在 2020年9月8日,11:57,Danny Chan > >>>>>>>>>>>>>>>>>>>>>>>>>> <yuzhao....@gmail.com> 写道: > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> Thanks Timo ~ > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> The FLIP was already in pretty > >>>>>> good shape, I have only 2 > >>>>>>>>>>>>>>>> questions > >>>>>>>>>>>>>>>>>>>> here: > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> 1. > >>>>>> “`CAST(SYSTEM_METADATA("offset") AS INT)` would be a > >>>>>>>>>>>>>> valid > >>>>>>>>>>>>>>>>>>>> read-only > >>>>>>>>>>>>>>>>>>>>>>>> computed column for Kafka and can be > >>>>>> extracted by the > >>>>>>>>>>>>>> planner.” > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> What is the pros we follow the > >>>>>> SQL-SERVER syntax here ? > >>>>>>>>>>>>>>> Usually > >>>>>>>>>>>>>>>> an > >>>>>>>>>>>>>>>>>>>>>>>> expression return type can be > >>>>>> inferred automatically. > >>>>>>>>>>>>>>>>>>>>>>>> But I > >>>>>>>>>>>>>>> guess > >>>>>>>>>>>>>>>>>>>>>>>> SQL-SERVER does not have function > >>>>>> like SYSTEM_METADATA > >>>>>>>>>>>>>>>>>>>>>>>> which > >>>>>>>>>>>>>>>>>> actually > >>>>>>>>>>>>>>>>>>>> does > >>>>>>>>>>>>>>>>>>>>>>>> not have a specific return type. > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> And why not use the Oracle or > >>>>>> MySQL syntax there ? > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> column_name [datatype] [GENERATED > >>>>>> ALWAYS] AS > >>>>>>>>>>>>>>>>>>>>>>>>>> (expression) > >>>>>>>>>>>>>>>>>> [VIRTUAL] > >>>>>>>>>>>>>>>>>>>>>>>>>> Which is more straight-forward. > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> 2. “SYSTEM_METADATA("offset")` > >>>>>> returns the NULL type by > >>>>>>>>>>>>>>> default” > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> The default type should not be > >>>>>> NULL because only NULL > >>>>>>>>>>>>>> literal > >>>>>>>>>>>>>>>> does > >>>>>>>>>>>>>>>>>>>>>>>> that. Usually we use ANY as the type > >>>>>> if we do not know the > >>>>>>>>>>>>>>>> specific > >>>>>>>>>>>>>>>>>>>> type in > >>>>>>>>>>>>>>>>>>>>>>>> the SQL context. ANY means the > >>>>>> physical value can be any > >>>>>>>>>> java > >>>>>>>>>>>>>>>>>> object. > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> [1] > >>>>>>>>>>>>>>> > >>>>>> https://oracle-base.com/articles/11g/virtual-columns-11gr1 > >>>>>>>>>>>>>>>>>>>>>>>>>> [2] > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>> > >>>>>> > https://dev.mysql.com/doc/refman/5.7/en/create-table-generated-columns.html > >>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>>>>>>>>>>>>>> Danny Chan > >>>>>>>>>>>>>>>>>>>>>>>>>> 在 2020年9月4日 +0800 PM4:48,Timo > >>>>>> Walther > >>>>>>>>>>>>>>>>>>>>>>>>>> <twal...@apache.org > >>>>>>>>>>>>>>>> ,写道: > >>>>>>>>>>>>>>>>>>>>>>>>>>> Hi everyone, > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> I completely reworked FLIP-107. > >>>>>> It now covers the full > >>>>>>>>>>>>>> story > >>>>>>>>>>>>>>>> how > >>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>>>> read > >>>>>>>>>>>>>>>>>>>>>>>>>>> and write metadata from/to > >>>>>> connectors and formats. It > >>>>>>>>>>>>>>> considers > >>>>>>>>>>>>>>>>>>>> all of > >>>>>>>>>>>>>>>>>>>>>>>>>>> the latest FLIPs, namely > >>>>>> FLIP-95, FLIP-132 and > >>>>>>>>>>>>>>>>>>>>>>>>>>> FLIP-122. > >>>>>>>>>> It > >>>>>>>>>>>>>>>>>>>> introduces > >>>>>>>>>>>>>>>>>>>>>>>>>>> the concept of PERSISTED > >>>>>> computed columns and leaves > >>>>>>>>>>>>>>>>>>>>>>>>>>> out > >>>>>>>>>>>>>>>>>>>> partitioning > >>>>>>>>>>>>>>>>>>>>>>>>>>> for now. > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Looking forward to your > >>>>>> feedback. > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>>>>>>>>>>>>>>> Timo > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> On 04.03.20 09:45, Kurt Young > >>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Sorry, forgot one question. > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> 4. Can we make the > >>>>>> value.fields-include more > >>>>>>>>>>>>>>>>>>>>>>>>>>>> orthogonal? > >>>>>>>>>>>>>>> Like > >>>>>>>>>>>>>>>>>> one > >>>>>>>>>>>>>>>>>>>> can > >>>>>>>>>>>>>>>>>>>>>>>>>>>> specify it as "EXCEPT_KEY, > >>>>>> EXCEPT_TIMESTAMP". > >>>>>>>>>>>>>>>>>>>>>>>>>>>> With current EXCEPT_KEY and > >>>>>> EXCEPT_KEY_TIMESTAMP, > >>>>>>>>>>>>>>>>>>>>>>>>>>>> users > >>>>>>>>>>>>>> can > >>>>>>>>>>>>>>>> not > >>>>>>>>>>>>>>>>>>>>>>>> config to > >>>>>>>>>>>>>>>>>>>>>>>>>>>> just ignore timestamp but > >>>>>> keep key. > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>>>>>>>>>>>>>>>> Kurt > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Mar 4, 2020 at 4:42 > >>>>>> PM Kurt Young < > >>>>>>>>>>>>>> ykt...@gmail.com > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Dawid, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> I have a couple of > >>>>>> questions around key fields, > >>>>>>>>>> actually > >>>>>>>>>>>>>> I > >>>>>>>>>>>>>>>> also > >>>>>>>>>>>>>>>>>>>> have > >>>>>>>>>>>>>>>>>>>>>>>> some > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> other questions but want to > >>>>>> be focused on key fields > >>>>>>>>>>>>>> first. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1. I don't fully understand > >>>>>> the usage of > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> "key.fields". > >>>>>>>>>> Is > >>>>>>>>>>>>>>>> this > >>>>>>>>>>>>>>>>>>>>>>>> option only > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> valid during write > >>>>>> operation? Because for > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> reading, I can't imagine > >>>>>> how such options can be > >>>>>>>>>>>>>> applied. I > >>>>>>>>>>>>>>>>>> would > >>>>>>>>>>>>>>>>>>>>>>>> expect > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> that there might be a > >>>>>> SYSTEM_METADATA("key") > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> to read and assign the key > >>>>>> to a normal field? > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2. If "key.fields" is only > >>>>>> valid in write > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> operation, I > >>>>>>>>>>>>>> want > >>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>>>> propose we > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> can simplify the options to > >>>>>> not introducing > >>>>>>>>>>>>>> key.format.type > >>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> other related options. I > >>>>>> think a single "key.field" > >>>>>>>>>> (not > >>>>>>>>>>>>>>>>>> fields) > >>>>>>>>>>>>>>>>>>>>>>>> would be > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> enough, users can use UDF > >>>>>> to calculate whatever key > >>>>>>>>>> they > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> want before sink. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3. Also I don't want to > >>>>>> introduce "value.format.type" > >>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> "value.format.xxx" with the > >>>>>> "value" prefix. Not every > >>>>>>>>>>>>>>>> connector > >>>>>>>>>>>>>>>>>>>> has a > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> concept > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> of key and values. The old > >>>>>> parameter "format.type" > >>>>>>>>>>>>>> already > >>>>>>>>>>>>>>>> good > >>>>>>>>>>>>>>>>>>>>>>>> enough to > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> use. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Kurt > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Mar 3, 2020 at > >>>>>> 10:40 PM Jark Wu < > >>>>>>>>>>>>>> imj...@gmail.com> > >>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks Dawid, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I have two more questions. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> SupportsMetadata > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Introducing > >>>>>> SupportsMetadata sounds good to me. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> But I > >>>>>>>>>>>>>> have > >>>>>>>>>>>>>>>>>> some > >>>>>>>>>>>>>>>>>>>>>>>> questions > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> regarding to this > >>>>>> interface. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1) How do the source know > >>>>>> what the expected return > >>>>>>>>>> type > >>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>> each > >>>>>>>>>>>>>>>>>>>>>>>> metadata? > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2) Where to put the > >>>>>> metadata fields? Append to the > >>>>>>>>>>>>>>> existing > >>>>>>>>>>>>>>>>>>>> physical > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> fields? > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> If yes, I would suggest > >>>>>> to change the signature to > >>>>>>>>>>>>>>>>>> `TableSource > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>> appendMetadataFields(String[] metadataNames, > >>>>>>>>>> DataType[] > >>>>>>>>>>>>>>>>>>>>>>>> metadataTypes)` > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>> SYSTEM_METADATA("partition") > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Can SYSTEM_METADATA() > >>>>>> function be used nested in a > >>>>>>>>>>>>>>> computed > >>>>>>>>>>>>>>>>>>>> column > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> expression? If yes, how > >>>>>> to specify the return > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> type of > >>>>>>>>>>>>>>>>>>>>>>>> SYSTEM_METADATA? > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Jark > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, 3 Mar 2020 at > >>>>>> 17:06, Dawid Wysakowicz < > >>>>>>>>>>>>>>>>>>>>>>>> dwysakow...@apache.org> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1. I thought a bit more > >>>>>> on how the source would > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> emit > >>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>> columns > >>>>>>>>>>>>>>>>>>>>>>>> and I > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> now see its not exactly > >>>>>> the same as regular > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> columns. > >>>>>>>>>> I > >>>>>>>>>>>>>>> see > >>>>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>>>>> need > >>>>>>>>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> elaborate a bit more on > >>>>>> that in the FLIP as you > >>>>>>>>>> asked, > >>>>>>>>>>>>>>>> Jark. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I do agree mostly with > >>>>>> Danny on how we should do > >>>>>>>>>> that. > >>>>>>>>>>>>>>> One > >>>>>>>>>>>>>>>>>>>>>>>> additional > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> things I would > >>>>>> introduce is an > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> interface > >>>>>> SupportsMetadata { > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> boolean > >>>>>> supportsMetadata(Set<String> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> metadataFields); > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> TableSource > >>>>>> generateMetadataFields(Set<String> > >>>>>>>>>>>>>>>>>> metadataFields); > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> } > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> This way the source > >>>>>> would have to declare/emit only > >>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>> requested > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> metadata fields. In > >>>>>> order not to clash with user > >>>>>>>>>>>>>> defined > >>>>>>>>>>>>>>>>>>>> fields. > >>>>>>>>>>>>>>>>>>>>>>>> When > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> emitting the metadata > >>>>>> field I would prepend the > >>>>>>>>>> column > >>>>>>>>>>>>>>> name > >>>>>>>>>>>>>>>>>>>> with > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>> __system_{property_name}. Therefore when requested > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>> SYSTEM_METADATA("partition") the source would > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> append > >>>>>>>>>> a > >>>>>>>>>>>>>>>> field > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> __system_partition to > >>>>>> the schema. This would be > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> never > >>>>>>>>>>>>>>>> visible > >>>>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> user as it would be > >>>>>> used only for the subsequent > >>>>>>>>>>>>>> computed > >>>>>>>>>>>>>>>>>>>> columns. > >>>>>>>>>>>>>>>>>>>>>>>> If > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> that makes sense to > >>>>>> you, I will update the FLIP > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> with > >>>>>>>>>>>>>> this > >>>>>>>>>>>>>>>>>>>>>>>> description. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2. CAST vs explicit > >>>>>> type in computed columns > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Here I agree with > >>>>>> Danny. It is also the current > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> state > >>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>> proposal. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3. Partitioning on > >>>>>> computed column vs function > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Here I also agree with > >>>>>> Danny. I also think those > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> are > >>>>>>>>>>>>>>>>>>>> orthogonal. I > >>>>>>>>>>>>>>>>>>>>>>>> would > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> leave out the STORED > >>>>>> computed columns out of the > >>>>>>>>>>>>>>>> discussion. > >>>>>>>>>>>>>>>>>> I > >>>>>>>>>>>>>>>>>>>>>>>> don't see > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> how do they relate to > >>>>>> the partitioning. I > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> already put > >>>>>>>>>>>>>>> both > >>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>> those > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> cases in the document. > >>>>>> We can either partition on a > >>>>>>>>>>>>>>>> computed > >>>>>>>>>>>>>>>>>>>>>>>> column or > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> use a udf in a > >>>>>> partioned by clause. I am fine with > >>>>>>>>>>>>>>> leaving > >>>>>>>>>>>>>>>>>> out > >>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitioning by udf in > >>>>>> the first version if you > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> still > >>>>>>>>>>>>>>> have > >>>>>>>>>>>>>>>>>> some > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> concerns. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> As for your question > >>>>>> Danny. It depends which > >>>>>>>>>>>>>> partitioning > >>>>>>>>>>>>>>>>>>>> strategy > >>>>>>>>>>>>>>>>>>>>>>>> you > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> use. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> For the HASH > >>>>>> partitioning strategy I thought it > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> would > >>>>>>>>>>>>>>> work > >>>>>>>>>>>>>>>> as > >>>>>>>>>>>>>>>>>>>> you > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> explained. It would be > >>>>>> N = MOD(expr, num). I am not > >>>>>>>>>>>>>> sure > >>>>>>>>>>>>>>>>>>>> though if > >>>>>>>>>>>>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> should introduce the > >>>>>> PARTITIONS clause. Usually > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Flink > >>>>>>>>>>>>>>> does > >>>>>>>>>>>>>>>>>> not > >>>>>>>>>>>>>>>>>>>> own > >>>>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> data and the partitions > >>>>>> are already an intrinsic > >>>>>>>>>>>>>> property > >>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> underlying source e.g. > >>>>>> for kafka we do not create > >>>>>>>>>>>>>> topics, > >>>>>>>>>>>>>>>> but > >>>>>>>>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>>>>>>>>>> just > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> describe pre-existing > >>>>>> pre-partitioned topic. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 4. timestamp vs > >>>>>> timestamp.field vs > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> connector.field vs > >>>>>>>>>>>>>> ... > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I am fine with changing > >>>>>> it to timestamp.field to be > >>>>>>>>>>>>>>>>>> consistent > >>>>>>>>>>>>>>>>>>>> with > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> other value.fields and > >>>>>> key.fields. Actually that > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> was > >>>>>>>>>>>>>> also > >>>>>>>>>>>>>>>> my > >>>>>>>>>>>>>>>>>>>>>>>> initial > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> proposal in a first > >>>>>> draft I prepared. I changed it > >>>>>>>>>>>>>>>> afterwards > >>>>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>>>> shorten > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the key. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Dawid > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On 03/03/2020 09:00, > >>>>>> Danny Chan wrote: > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks Dawid for > >>>>>> bringing up this discussion, I > >>>>>>>>>> think > >>>>>>>>>>>>>> it > >>>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>>>>>>>>> useful > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> feature ~ > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> About how the > >>>>>> metadata outputs from source > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I think it is > >>>>>> completely orthogonal, computed > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> column > >>>>>>>>>>>>>>> push > >>>>>>>>>>>>>>>>>>>> down is > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> another topic, this > >>>>>> should not be a blocker but a > >>>>>>>>>>>>>>>> promotion, > >>>>>>>>>>>>>>>>>>>> if we > >>>>>>>>>>>>>>>>>>>>>>>> do > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> not > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> have any filters on the > >>>>>> computed column, there > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> is no > >>>>>>>>>>>>>> need > >>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>> do any > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> pushings; the source > >>>>>> node just emit the complete > >>>>>>>>>> record > >>>>>>>>>>>>>>>> with > >>>>>>>>>>>>>>>>>>>> full > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> metadata > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> with the declared > >>>>>> physical schema, then when > >>>>>>>>>> generating > >>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>> virtual > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> columns, we would > >>>>>> extract the metadata info and > >>>>>>>>>> output > >>>>>>>>>>>>>> as > >>>>>>>>>>>>>>>>>> full > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> columns(with > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> full schema). > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> About the type of > >>>>>> metadata column > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Personally i prefer > >>>>>> explicit type instead of CAST, > >>>>>>>>>>>>>> they > >>>>>>>>>>>>>>>> are > >>>>>>>>>>>>>>>>>>>>>>>> symantic > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> equivalent though, > >>>>>> explict type is more > >>>>>>>>>>>>>> straight-forward > >>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>> we can > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> declare > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the nullable attribute > >>>>>> there. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> About option A: > >>>>>> partitioning based on acomputed > >>>>>>>>>> column > >>>>>>>>>>>>>>> VS > >>>>>>>>>>>>>>>>>>>> option > >>>>>>>>>>>>>>>>>>>>>>>> B: > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitioning with just > >>>>>> a function > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> From the FLIP, > >>>>>> it seems that B's > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitioning is > >>>>>>>>>>>>>>> just > >>>>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>>>>> strategy > >>>>>>>>>>>>>>>>>>>>>>>> when > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> writing data, the > >>>>>> partiton column is not > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> included in > >>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>> table > >>>>>>>>>>>>>>>>>>>>>>>> schema, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> so > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it's just useless when > >>>>>> reading from that. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - Compared to A, we > >>>>>> do not need to generate the > >>>>>>>>>>>>>>> partition > >>>>>>>>>>>>>>>>>>>> column > >>>>>>>>>>>>>>>>>>>>>>>> when > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> selecting from the > >>>>>> table(but insert into) > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - For A we can also > >>>>>> mark the column as STORED when > >>>>>>>>>> we > >>>>>>>>>>>>>>> want > >>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>>>>> persist > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> So in my opition they > >>>>>> are orthogonal, we can > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> support > >>>>>>>>>>>>>>>> both, i > >>>>>>>>>>>>>>>>>>>> saw > >>>>>>>>>>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> MySQL/Oracle[1][2] > >>>>>> would suggest to also define the > >>>>>>>>>>>>>>>>>> PARTITIONS > >>>>>>>>>>>>>>>>>>>>>>>> num, and > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitions are managed > >>>>>> under a "tablenamespace", > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>> partition > >>>>>>>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>>>>>>>> which > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> record is stored is > >>>>>> partition number N, where N = > >>>>>>>>>>>>>>> MOD(expr, > >>>>>>>>>>>>>>>>>>>> num), > >>>>>>>>>>>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> your > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> design, which partiton > >>>>>> the record would persist ? > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> [1] > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>> https://dev.mysql.com/doc/refman/5.7/en/partitioning-hash.html > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> [2] > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>> > >>>>>> > https://docs.oracle.com/database/121/VLDBG/GUID-F023D3ED-262F-4B19-950A-D3C8F8CDB4F4.htm#VLDBG1270 > >>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Danny Chan > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 在 2020年3月2日 +0800 > >>>>>> PM6:16,Dawid Wysakowicz < > >>>>>>>>>>>>>>>>>>>> dwysakow...@apache.org > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ,写道: > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Jark, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Ad. 2 I added a > >>>>>> section to discuss relation to > >>>>>>>>>>>>>> FLIP-63 > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Ad. 3 Yes, I also > >>>>>> tried to somewhat keep > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> hierarchy > >>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>> properties. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Therefore you have the > >>>>>> key.format.type. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I also considered > >>>>>> exactly what you are suggesting > >>>>>>>>>>>>>>>>>> (prefixing > >>>>>>>>>>>>>>>>>>>> with > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> connector or kafka). I > >>>>>> should've put that into an > >>>>>>>>>>>>>>>>>>>> Option/Rejected > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> alternatives. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I agree timestamp, > >>>>>> key.*, value.* are connector > >>>>>>>>>>>>>>>> properties. > >>>>>>>>>>>>>>>>>>>> Why I > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wanted to suggest not > >>>>>> adding that prefix in the > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> first > >>>>>>>>>>>>>>>> version > >>>>>>>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> actually all the > >>>>>> properties in the WITH section are > >>>>>>>>>>>>>>>> connector > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> properties. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Even format is in the > >>>>>> end a connector property as > >>>>>>>>>> some > >>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>> sources > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> might > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> not have a format, imo. > >>>>>> The benefit of not > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> adding the > >>>>>>>>>>>>>>>> prefix > >>>>>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>>>>>>> that it > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> makes the keys a bit > >>>>>> shorter. Imagine prefixing all > >>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>> properties > >>>>>>>>>>>>>>>>>>>>>>>> with > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> connector (or if we go > >>>>>> with FLINK-12557: > >>>>>>>>>>>>>> elasticsearch): > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>> elasticsearch.key.format.type: csv > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>> elasticsearch.key.format.field: .... > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>> elasticsearch.key.format.delimiter: .... > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>> elasticsearch.key.format.*: .... > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I am fine with > >>>>>> doing it though if this is a > >>>>>>>>>> preferred > >>>>>>>>>>>>>>>>>>>> approach > >>>>>>>>>>>>>>>>>>>>>>>> in the > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> community. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Ad in-line comments: > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I forgot to update > >>>>>> the `value.fields.include` > >>>>>>>>>>>>>> property. > >>>>>>>>>>>>>>>> It > >>>>>>>>>>>>>>>>>>>>>>>> should be > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> value.fields-include. > >>>>>> Which I think you also > >>>>>>>>>> suggested > >>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>> comment, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> right? > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> As for the cast vs > >>>>>> declaring output type of > >>>>>>>>>> computed > >>>>>>>>>>>>>>>>>> column. > >>>>>>>>>>>>>>>>>>>> I > >>>>>>>>>>>>>>>>>>>>>>>> think > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it's better not to use > >>>>>> CAST, but declare a type > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of an > >>>>>>>>>>>>>>>>>>>> expression > >>>>>>>>>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> later > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> on infer the output > >>>>>> type of SYSTEM_METADATA. The > >>>>>>>>>> reason > >>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>> I > >>>>>>>>>>>>>>>>>>>> think > >>>>>>>>>>>>>>>>>>>>>>>> this > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> way > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it will be easier to > >>>>>> implement e.g. filter push > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> downs > >>>>>>>>>>>>>>> when > >>>>>>>>>>>>>>>>>>>> working > >>>>>>>>>>>>>>>>>>>>>>>> with > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> native types of the > >>>>>> source, e.g. in case of Kafka's > >>>>>>>>>>>>>>>> offset, i > >>>>>>>>>>>>>>>>>>>>>>>> think it's > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> better to pushdown long > >>>>>> rather than string. This > >>>>>>>>>> could > >>>>>>>>>>>>>>> let > >>>>>>>>>>>>>>>> us > >>>>>>>>>>>>>>>>>>>> push > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> expression like e.g. > >>>>>> offset > 12345 & offset < > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 59382. > >>>>>>>>>>>>>>>>>>>> Otherwise we > >>>>>>>>>>>>>>>>>>>>>>>> would > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> have to push down > >>>>>> cast(offset, long) > 12345 && > >>>>>>>>>>>>>>>> cast(offset, > >>>>>>>>>>>>>>>>>>>> long) > >>>>>>>>>>>>>>>>>>>>>>>> < > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 59382. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Moreover I think we > >>>>>> need to introduce the type for > >>>>>>>>>>>>>>> computed > >>>>>>>>>>>>>>>>>>>> columns > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> anyway > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to support functions > >>>>>> that infer output type > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> based on > >>>>>>>>>>>>>>>> expected > >>>>>>>>>>>>>>>>>>>>>>>> return > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> type. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> As for the computed > >>>>>> column push down. Yes, > >>>>>>>>>>>>>>>> SYSTEM_METADATA > >>>>>>>>>>>>>>>>>>>> would > >>>>>>>>>>>>>>>>>>>>>>>> have > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to be pushed down to > >>>>>> the source. If it is not > >>>>>>>>>> possible > >>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>> planner > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> should > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> fail. As far as I know > >>>>>> computed columns push down > >>>>>>>>>> will > >>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>> part > >>>>>>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>>>>>> source > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> rework, won't it? ;) > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> As for the > >>>>>> persisted computed column. I think > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it is > >>>>>>>>>>>>>>>>>>>> completely > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> orthogonal. In my > >>>>>> current proposal you can also > >>>>>>>>>>>>>> partition > >>>>>>>>>>>>>>>> by > >>>>>>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>>>>>>>>> computed > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> column. The difference > >>>>>> between using a udf in > >>>>>>>>>>>>>> partitioned > >>>>>>>>>>>>>>>> by > >>>>>>>>>>>>>>>>>> vs > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitioned > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> by a computed column is > >>>>>> that when you partition > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> by a > >>>>>>>>>>>>>>>> computed > >>>>>>>>>>>>>>>>>>>>>>>> column > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> this > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> column must be also > >>>>>> computed when reading the > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> table. > >>>>>>>>>> If > >>>>>>>>>>>>>>> you > >>>>>>>>>>>>>>>>>>>> use a > >>>>>>>>>>>>>>>>>>>>>>>> udf in > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the partitioned by, the > >>>>>> expression is computed only > >>>>>>>>>>>>>> when > >>>>>>>>>>>>>>>>>>>> inserting > >>>>>>>>>>>>>>>>>>>>>>>> into > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> table. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hope this answers > >>>>>> some of your questions. Looking > >>>>>>>>>>>>>>> forward > >>>>>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>>>>>>>> further > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> suggestions. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Dawid > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On 02/03/2020 > >>>>>> 05:18, Jark Wu wrote: > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks Dawid for > >>>>>> starting such a great > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> discussion. > >>>>>>>>>>>>>>>> Reaing > >>>>>>>>>>>>>>>>>>>>>>>> metadata > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> key-part > >>>>>> information from source is an important > >>>>>>>>>>>>>>> feature > >>>>>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> streaming > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> users. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> In general, I > >>>>>> agree with the proposal of the > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> FLIP. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I will leave my > >>>>>> thoughts and comments here: > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1) +1 to use > >>>>>> connector properties instead of > >>>>>>>>>>>>>>> introducing > >>>>>>>>>>>>>>>>>>>> HEADER > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> keyword as > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the reason you > >>>>>> mentioned in the FLIP. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2) we already > >>>>>> introduced PARTITIONED BY in > >>>>>>>>>> FLIP-63. > >>>>>>>>>>>>>>>> Maybe > >>>>>>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>>>>>>>>>> should > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> add a > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> section to > >>>>>> explain what's the relationship > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> between > >>>>>>>>>>>>>>> them. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Do their concepts > >>>>>> conflict? Could INSERT > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> PARTITION > >>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>> used > >>>>>>>>>>>>>>>>>>>> on > >>>>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> PARTITIONED table > >>>>>> in this FLIP? > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3) Currently, > >>>>>> properties are hierarchical in > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Flink > >>>>>>>>>>>>>>> SQL. > >>>>>>>>>>>>>>>>>>>> Shall we > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> make > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> new introduced > >>>>>> properties more hierarchical? > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> For example, > >>>>>> "timestamp" => > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>> "connector.timestamp"? > >>>>>>>>>>>>>>>>>>>> (actually, I > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> prefer > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> "kafka.timestamp" > >>>>>> which is another > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> improvement for > >>>>>>>>>>>>>>>>>>>> properties > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> FLINK-12557) > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> A single > >>>>>> "timestamp" in properties may mislead > >>>>>>>>>> users > >>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> field > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> a rowtime > >>>>>> attribute. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I also left some > >>>>>> minor comments in the FLIP. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Jark > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Sun, 1 Mar > >>>>>> 2020 at 22:30, Dawid Wysakowicz < > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> dwysakow...@apache.org> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I would like to > >>>>>> propose an improvement that > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> would > >>>>>>>>>>>>>>>> enable > >>>>>>>>>>>>>>>>>>>>>>>> reading > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> table > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> columns from > >>>>>> different parts of source records. > >>>>>>>>>>>>>>> Besides > >>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>>>> main > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> payload > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> majority (if > >>>>>> not all of the sources) expose > >>>>>>>>>>>>>>> additional > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> information. It > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> can be simply a > >>>>>> read-only metadata such as > >>>>>>>>>> offset, > >>>>>>>>>>>>>>>>>>>> ingestion > >>>>>>>>>>>>>>>>>>>>>>>> time > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> or a > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> read and write > >>>>>> parts of the record that contain > >>>>>>>>>>>>>> data > >>>>>>>>>>>>>>>> but > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> additionally > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> serve different > >>>>>> purposes (partitioning, > >>>>>>>>>> compaction > >>>>>>>>>>>>>>>> etc.), > >>>>>>>>>>>>>>>>>>>> e.g. > >>>>>>>>>>>>>>>>>>>>>>>> key > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> or > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> timestamp in > >>>>>> Kafka. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> We should make > >>>>>> it possible to read and write > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> data > >>>>>>>>>>>>>>> from > >>>>>>>>>>>>>>>>>> all > >>>>>>>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>>>>>>>> those > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> locations. In > >>>>>> this proposal I discuss reading > >>>>>>>>>>>>>>>>>> partitioning > >>>>>>>>>>>>>>>>>>>>>>>> data, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> completeness > >>>>>> this proposal discusses also the > >>>>>>>>>>>>>>>>>> partitioning > >>>>>>>>>>>>>>>>>>>> when > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> writing > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> data out. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I am looking > >>>>>> forward to your comments. > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> You can access > >>>>>> the FLIP here: > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>> > >>>>>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records?src=contextnavpagetreemode > >>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Dawid > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>> > > > >