Thanks Timo,

The updates to `Schema` and LIKE clause looks good to me.

Best,
Jark

On Tue, 15 Sep 2020 at 10:30, Leonard Xu <xbjt...@gmail.com> wrote:

> Hi, Timo
>
> Thanks for your explanation, it makes sense to me.
>
> Best,
> Leonard
>
>
> >> Hi, Timo
> >> Thanks for the update
> >> I have a minor suggestion about the debezium metadata key,
> >> Could we use the original  debezium key rather than import new key?
> >> debezium-json.schema                            => debezium-json.schema
> >> debezium-json.ingestion-timestamp  =>  debezium-json.ts_ms
> >> debezium-json.source.database       =>  debezium-json.source.db
> >> debezium-json.source.schema         =>  debezium-json.source.schema
> >> debezium-json.source.table              =>  debezium-json.source.table
> >> debezium-json.source.timestamp     =>  debezium-json.source.ts_ms
> >> debezium-json.source.properties      =>  debezium-json.source
> MAP<STRING, STRING>
> >>  User who familiar with debezium will understand the key easier,  and
> the key syntax is more json-path like. HDYT?
> >> The other part looks really good to me.
> >> Regards,
> >> Leonard
> >>> 在 2020年9月10日,18:26,Aljoscha Krettek <aljos...@apache.org> 写道:
> >>>
> >>> I've only been watching this from the sidelines but that latest
> proposal looks very good to me!
> >>>
> >>> Aljoscha
> >>>
> >>> On 10.09.20 12:20, Kurt Young wrote:
> >>>> The new syntax looks good to me.
> >>>> Best,
> >>>> Kurt
> >>>> On Thu, Sep 10, 2020 at 5:57 PM Jark Wu <imj...@gmail.com> wrote:
> >>>>> Hi Timo,
> >>>>>
> >>>>> I have one minor suggestion.
> >>>>> Maybe the default data type of `timestamp`  can be `TIMESTAMP(3) WITH
> >>>>> LOCAL TIME ZONE`, because this is the type that users want to use,
> this can
> >>>>> avoid unnecessary casting.
> >>>>> Besides, currently, the bigint is casted to timestamp in seconds, so
> the
> >>>>> implicit cast may not work...
> >>>>>
> >>>>> I don't have other objections. But maybe we should wait for the
> >>>>> opinion from @Kurt for the new syntax.
> >>>>>
> >>>>> Best,
> >>>>> Jark
> >>>>>
> >>>>>
> >>>>> On Thu, 10 Sep 2020 at 16:21, Danny Chan <yuzhao....@gmail.com>
> wrote:
> >>>>>
> >>>>>> Thanks for driving this Timo, +1 for voting ~
> >>>>>>
> >>>>>> Best,
> >>>>>> Danny Chan
> >>>>>> 在 2020年9月10日 +0800 PM3:47,Timo Walther <twal...@apache.org>,写道:
> >>>>>>> Thanks everyone for this healthy discussion. I updated the FLIP
> with the
> >>>>>>> outcome. I think the result is very powerful but also very easy to
> >>>>>>> declare. Thanks for all the contributions.
> >>>>>>>
> >>>>>>> If there are no objections, I would continue with a voting.
> >>>>>>>
> >>>>>>> What do you think?
> >>>>>>>
> >>>>>>> Regards,
> >>>>>>> Timo
> >>>>>>>
> >>>>>>>
> >>>>>>> On 09.09.20 16:52, Timo Walther wrote:
> >>>>>>>> "If virtual by default, when a user types "timestamp int" ==>
> >>>>>> persisted
> >>>>>>>> column, then adds a "metadata" after that ==> virtual column, then
> >>>>>> adds
> >>>>>>>> a "persisted" after that ==> persisted column."
> >>>>>>>>
> >>>>>>>> Thanks for this nice mental model explanation, Jark. This makes
> total
> >>>>>>>> sense to me. Also making the the most common case as short at just
> >>>>>>>> adding `METADATA` is a very good idea. Thanks, Danny!
> >>>>>>>>
> >>>>>>>> Let me update the FLIP again with all these ideas.
> >>>>>>>>
> >>>>>>>> Regards,
> >>>>>>>> Timo
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On 09.09.20 15:03, Jark Wu wrote:
> >>>>>>>>> I'm also +1 to Danny's proposal: timestamp INT METADATA [FROM
> >>>>>>>>> 'my-timestamp-field'] [VIRTUAL]
> >>>>>>>>> Especially I like the shortcut: timestamp INT METADATA, this
> makes
> >>>>>> the
> >>>>>>>>> most
> >>>>>>>>> common case to be supported in the simplest way.
> >>>>>>>>>
> >>>>>>>>> I also think the default should be "PERSISTED", so VIRTUAL is
> >>>>>> optional
> >>>>>>>>> when
> >>>>>>>>> you are accessing a read-only metadata. Because:
> >>>>>>>>> 1. The "timestamp INT METADATA" should be a normal column,
> because
> >>>>>>>>> "METADATA" is just a modifier to indicate it is from metadata, a
> >>>>>> normal
> >>>>>>>>> column should be persisted.
> >>>>>>>>>      If virtual by default, when a user types "timestamp int" ==>
> >>>>>>>>> persisted
> >>>>>>>>> column, then adds a "metadata" after that ==> virtual column,
> then
> >>>>>> adds a
> >>>>>>>>> "persisted" after that ==> persisted column.
> >>>>>>>>>      I think this looks reversed several times and makes users
> >>>>>> confused.
> >>>>>>>>> Physical fields are also prefixed with "fieldName TYPE", so
> >>>>>> "timestamp
> >>>>>>>>> INT
> >>>>>>>>> METADATA" is persisted is very straightforward.
> >>>>>>>>> 2. From the collected user question [1], we can see that
> "timestamp"
> >>>>>>>>> is the
> >>>>>>>>> most common use case. "timestamp" is a read-write metadata.
> >>>>>> Persisted by
> >>>>>>>>> default doesn't break the reading behavior.
> >>>>>>>>>
> >>>>>>>>> Best,
> >>>>>>>>> Jark
> >>>>>>>>>
> >>>>>>>>> [1]: https://issues.apache.org/jira/browse/FLINK-15869
> >>>>>>>>>
> >>>>>>>>> On Wed, 9 Sep 2020 at 20:56, Leonard Xu <xbjt...@gmail.com>
> wrote:
> >>>>>>>>>
> >>>>>>>>>> Thanks @Dawid for the nice summary, I think you catch all
> >>>>>> opinions of
> >>>>>>>>>> the
> >>>>>>>>>> long discussion well.
> >>>>>>>>>>
> >>>>>>>>>> @Danny
> >>>>>>>>>> “ timestamp INT METADATA [FROM 'my-timestamp-field'] [VIRTUAL]
> >>>>>>>>>>   Note that the "FROM 'field name'" is only needed when the name
> >>>>>>>>>> conflict
> >>>>>>>>>>   with the declared table column name, when there are no
> >>>>>> conflicts,
> >>>>>>>>>> we can
> >>>>>>>>>> simplify it to
> >>>>>>>>>>        timestamp INT METADATA"
> >>>>>>>>>>
> >>>>>>>>>> I really like the proposal, there is no confusion with computed
> >>>>>>>>>> column any
> >>>>>>>>>> more,  and it’s concise enough.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> @Timo @Dawid
> >>>>>>>>>> “We use `SYSTEM_TIME` for temporal tables. I think prefixing
> with
> >>>>>> SYSTEM
> >>>>>>>>>> makes it clearer that it comes magically from the system.”
> >>>>>>>>>> “As for the issue of shortening the SYSTEM_METADATA to METADATA.
> >>>>>> Here I
> >>>>>>>>>> very much prefer the SYSTEM_ prefix.”
> >>>>>>>>>>
> >>>>>>>>>> I think `SYSTEM_TIME` is different with `SYSTEM_METADATA ` a
> lot,
> >>>>>>>>>> First of all,  the word `TIME` has broad meanings but the word
> >>>>>>>>>> `METADATA `
> >>>>>>>>>> not,  `METADATA ` has specific meaning,
> >>>>>>>>>> Secondly, `FOR SYSTEM_TIME AS OF` exists in SQL standard but
> >>>>>>>>>> `SYSTEM_METADATA ` not.
> >>>>>>>>>> Personally, I like more simplify way,sometimes  less is more.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> Best,
> >>>>>>>>>> Leonard
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Timo Walther <twal...@apache.org> 于2020年9月9日周三 下午6:41写道:
> >>>>>>>>>>>
> >>>>>>>>>>>> Hi everyone,
> >>>>>>>>>>>>
> >>>>>>>>>>>> "key" and "value" in the properties are a special case
> >>>>>> because they
> >>>>>>>>>>>> need
> >>>>>>>>>>>> to configure a format. So key and value are more than just
> >>>>>> metadata.
> >>>>>>>>>>>> Jark's example for setting a timestamp would work but as the
> >>>>>> FLIP
> >>>>>>>>>>>> discusses, we have way more metadata fields like headers,
> >>>>>>>>>>>> epoch-leader,
> >>>>>>>>>>>> etc. Having a property for all of this metadata would mess up
> >>>>>> the WITH
> >>>>>>>>>>>> section entirely. Furthermore, we also want to deal with
> >>>>>> metadata from
> >>>>>>>>>>>> the formats. Solving this through properties as well would
> >>>>>> further
> >>>>>>>>>>>> complicate the property design.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Personally, I still like the computed column design more
> >>>>>> because it
> >>>>>>>>>>>> allows to have full flexibility to compute the final column:
> >>>>>>>>>>>>
> >>>>>>>>>>>> timestamp AS adjustTimestamp(CAST(SYSTEM_METADATA("ts") AS
> >>>>>>>>>> TIMESTAMP(3)))
> >>>>>>>>>>>>
> >>>>>>>>>>>> Instead of having a helper column and a real column in the
> >>>>>> table:
> >>>>>>>>>>>>
> >>>>>>>>>>>> helperTimestamp AS CAST(SYSTEM_METADATA("ts") AS TIMESTAMP(3))
> >>>>>>>>>>>> realTimestamp AS adjustTimestamp(helperTimestamp)
> >>>>>>>>>>>>
> >>>>>>>>>>>> But I see that the discussion leans towards:
> >>>>>>>>>>>>
> >>>>>>>>>>>> timestamp INT SYSTEM_METADATA("ts")
> >>>>>>>>>>>>
> >>>>>>>>>>>> Which is fine with me. It is the shortest solution, because
> >>>>>> we don't
> >>>>>>>>>>>> need additional CAST. We can discuss the syntax, so that
> >>>>>> confusion
> >>>>>>>>>>>> with
> >>>>>>>>>>>> computed columns can be avoided.
> >>>>>>>>>>>>
> >>>>>>>>>>>> timestamp INT USING SYSTEM_METADATA("ts")
> >>>>>>>>>>>> timestamp INT FROM SYSTEM_METADATA("ts")
> >>>>>>>>>>>> timestamp INT FROM SYSTEM_METADATA("ts") PERSISTED
> >>>>>>>>>>>>
> >>>>>>>>>>>> We use `SYSTEM_TIME` for temporal tables. I think prefixing
> >>>>>> with
> >>>>>>>>>>>> SYSTEM
> >>>>>>>>>>>> makes it clearer that it comes magically from the system.
> >>>>>>>>>>>>
> >>>>>>>>>>>> What do you think?
> >>>>>>>>>>>>
> >>>>>>>>>>>> Regards,
> >>>>>>>>>>>> Timo
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> On 09.09.20 11:41, Jark Wu wrote:
> >>>>>>>>>>>>> Hi Danny,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> This is not Oracle and MySQL computed column syntax,
> >>>>>> because there is
> >>>>>>>>>> no
> >>>>>>>>>>>>> "AS" after the type.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Hi everyone,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> If we want to use "offset INT SYSTEM_METADATA("offset")",
> >>>>>> then I
> >>>>>>>>>>>>> think
> >>>>>>>>>> we
> >>>>>>>>>>>>> must further discuss about "PERSISED" or "VIRTUAL" keyword
> >>>>>> for
> >>>>>>>>>> query-sink
> >>>>>>>>>>>>> schema problem.
> >>>>>>>>>>>>> Personally, I think we can use a shorter keyword "METADATA"
> >>>>>> for
> >>>>>>>>>>>>> "SYSTEM_METADATA". Because "SYSTEM_METADATA" sounds like a
> >>>>>> system
> >>>>>>>>>>>> function
> >>>>>>>>>>>>> and confuse users this looks like a computed column.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Best,
> >>>>>>>>>>>>> Jark
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Wed, 9 Sep 2020 at 17:23, Danny Chan <
> >>>>>> danny0...@apache.org> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> "offset INT SYSTEM_METADATA("offset")"
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> This is actually Oracle or MySQL style computed column
> >>>>>> syntax.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> "You are right that one could argue that "timestamp",
> >>>>>> "headers" are
> >>>>>>>>>>>>>> something like "key" and "value""
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I have the same feeling, both key value and headers
> >>>>>> timestamp are
> >>>>>>>>>> *real*
> >>>>>>>>>>>>>> data
> >>>>>>>>>>>>>> stored in the consumed record, they are not computed or
> >>>>>> generated.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> "Trying to solve everything via properties sounds rather
> >>>>>> like a hack
> >>>>>>>>>> to
> >>>>>>>>>>>>>> me"
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Things are not that hack if we can unify the routines or
> >>>>>> the
> >>>>>>>>>> definitions
> >>>>>>>>>>>>>> (all from the computed column way or all from the table
> >>>>>> options), i
> >>>>>>>>>> also
> >>>>>>>>>>>>>> think that it is a hacky that we mix in 2 kinds of syntax
> >>>>>> for
> >>>>>>>>>> different
> >>>>>>>>>>>>>> kinds of metadata (read-only and read-write). In this
> >>>>>> FLIP, we
> >>>>>>>>>>>>>> declare
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>> Kafka key fields with table options but SYSTEM_METADATA
> >>>>>> for other
> >>>>>>>>>>>> metadata,
> >>>>>>>>>>>>>> that is a hacky thing or something in-consistent.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Kurt Young <ykt...@gmail.com> 于2020年9月9日周三 下午4:48写道:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>   I would vote for `offset INT
> >>>>>> SYSTEM_METADATA("offset")`.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> I don't think we can stick with the SQL standard in DDL
> >>>>>> part
> >>>>>>>>>>>>>>> forever,
> >>>>>>>>>>>>>>> especially as there are more and more
> >>>>>>>>>>>>>>> requirements coming from different connectors and
> >>>>>> external systems.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>> Kurt
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On Wed, Sep 9, 2020 at 4:40 PM Timo Walther <
> >>>>>> twal...@apache.org>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Hi Jark,
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> now we are back at the original design proposed by
> >>>>>> Dawid :D
> >>>>>>>>>>>>>>>> Yes, we
> >>>>>>>>>>>>>>>> should be cautious about adding new syntax. But the
> >>>>>> length of this
> >>>>>>>>>>>>>>>> discussion shows that we are looking for a good
> >>>>>> long-term
> >>>>>>>>>>>>>>>> solution.
> >>>>>>>>>> In
> >>>>>>>>>>>>>>>> this case I would rather vote for a deep integration
> >>>>>> into the
> >>>>>>>>>> syntax.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Computed columns are also not SQL standard compliant.
> >>>>>> And our
> >>>>>>>>>>>>>>>> DDL is
> >>>>>>>>>>>>>>>> neither, so we have some degree of freedom here.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Trying to solve everything via properties sounds
> >>>>>> rather like a
> >>>>>>>>>>>>>>>> hack
> >>>>>>>>>> to
> >>>>>>>>>>>>>>>> me. You are right that one could argue that
> >>>>>> "timestamp", "headers"
> >>>>>>>>>> are
> >>>>>>>>>>>>>>>> something like "key" and "value". However, mixing
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> `offset AS SYSTEM_METADATA("offset")`
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> `'timestamp.field' = 'ts'`
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> looks more confusing to users that an explicit
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> `offset AS CAST(SYSTEM_METADATA("offset") AS INT)`
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> or
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> `offset INT SYSTEM_METADATA("offset")`
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> that is symetric for both source and sink.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> What do others think?
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>> Timo
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On 09.09.20 10:09, Jark Wu wrote:
> >>>>>>>>>>>>>>>>> Hi everyone,
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> I think we have a conclusion that the writable
> >>>>>> metadata shouldn't
> >>>>>>>>>> be
> >>>>>>>>>>>>>>>>> defined as a computed column, but a normal column.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> "timestamp STRING SYSTEM_METADATA('timestamp')" is
> >>>>>> one of the
> >>>>>>>>>>>>>>> approaches.
> >>>>>>>>>>>>>>>>> However, it is not SQL standard compliant, we need
> >>>>>> to be cautious
> >>>>>>>>>>>>>>> enough
> >>>>>>>>>>>>>>>>> when adding new syntax.
> >>>>>>>>>>>>>>>>> Besides, we have to introduce the `PERSISTED` or
> >>>>>> `VIRTUAL`
> >>>>>>>>>>>>>>>>> keyword
> >>>>>>>>>> to
> >>>>>>>>>>>>>>>>> resolve the query-sink schema problem if it is
> >>>>>> read-only
> >>>>>>>>>>>>>>>>> metadata.
> >>>>>>>>>>>>>> That
> >>>>>>>>>>>>>>>>> adds more stuff to learn for users.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>  From my point of view, the "timestamp",
> >>>>>> "headers" are something
> >>>>>>>>>> like
> >>>>>>>>>>>>>>>> "key"
> >>>>>>>>>>>>>>>>> and "value" that stores with the real data. So why
> >>>>>> not define the
> >>>>>>>>>>>>>>>>> "timestamp" in the same way with "key" by using a
> >>>>>>>>>>>>>>>>> "timestamp.field"
> >>>>>>>>>>>>>>>>> connector option?
> >>>>>>>>>>>>>>>>> On the other side, the read-only metadata, such as
> >>>>>> "offset",
> >>>>>>>>>>>>>> shouldn't
> >>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>> defined as a normal column. So why not use the
> >>>>>> existing computed
> >>>>>>>>>>>>>> column
> >>>>>>>>>>>>>>>>> syntax for such metadata? Then we don't have the
> >>>>>> query-sink
> >>>>>>>>>>>>>>>>> schema
> >>>>>>>>>>>>>>>> problem.
> >>>>>>>>>>>>>>>>> So here is my proposal:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> CREATE TABLE kafka_table (
> >>>>>>>>>>>>>>>>>     id BIGINT,
> >>>>>>>>>>>>>>>>>     name STRING,
> >>>>>>>>>>>>>>>>>     col1 STRING,
> >>>>>>>>>>>>>>>>>     col2 STRING,
> >>>>>>>>>>>>>>>>>     ts TIMESTAMP(3) WITH LOCAL TIME ZONE,    -- ts
> >>>>>> is a normal
> >>>>>>>>>> field,
> >>>>>>>>>>>>>> so
> >>>>>>>>>>>>>>>> can
> >>>>>>>>>>>>>>>>> be read and written.
> >>>>>>>>>>>>>>>>>     offset AS SYSTEM_METADATA("offset")
> >>>>>>>>>>>>>>>>> ) WITH (
> >>>>>>>>>>>>>>>>>     'connector' = 'kafka',
> >>>>>>>>>>>>>>>>>     'topic' = 'test-topic',
> >>>>>>>>>>>>>>>>>     'key.fields' = 'id, name',
> >>>>>>>>>>>>>>>>>     'key.format' = 'csv',
> >>>>>>>>>>>>>>>>>     'value.format' = 'avro',
> >>>>>>>>>>>>>>>>>     'timestamp.field' = 'ts'    -- define the
> >>>>>> mapping of Kafka
> >>>>>>>>>>>>>> timestamp
> >>>>>>>>>>>>>>>>> );
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> INSERT INTO kafka_table
> >>>>>>>>>>>>>>>>> SELECT id, name, col1, col2, rowtime FROM
> >>>>>> another_table;
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> I think this can solve all the problems without
> >>>>>> introducing
> >>>>>>>>>>>>>>>>> any new
> >>>>>>>>>>>>>>>> syntax.
> >>>>>>>>>>>>>>>>> The only minor disadvantage is that we separate the
> >>>>>> definition
> >>>>>>>>>>>>>>> way/syntax
> >>>>>>>>>>>>>>>>> of read-only metadata and read-write fields.
> >>>>>>>>>>>>>>>>> However, I don't think this is a big problem.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>>> Jark
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On Wed, 9 Sep 2020 at 15:09, Timo Walther <
> >>>>>> twal...@apache.org>
> >>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Hi Kurt,
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> thanks for sharing your opinion. I'm totally up
> >>>>>> for not reusing
> >>>>>>>>>>>>>>> computed
> >>>>>>>>>>>>>>>>>> columns. I think Jark was a big supporter of this
> >>>>>> syntax, @Jark
> >>>>>>>>>> are
> >>>>>>>>>>>>>>> you
> >>>>>>>>>>>>>>>>>> fine with this as well? The non-computed column
> >>>>>> approach was
> >>>>>>>>>>>>>>>>>> only
> >>>>>>>>>> a
> >>>>>>>>>>>>>>>>>> "slightly rejected alternative".
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Furthermore, we would need to think about how
> >>>>>> such a new design
> >>>>>>>>>>>>>>>>>> influences the LIKE clause though.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> However, we should still keep the `PERSISTED`
> >>>>>> keyword as it
> >>>>>>>>>>>>>> influences
> >>>>>>>>>>>>>>>>>> the query->sink schema. If you look at the list
> >>>>>> of metadata for
> >>>>>>>>>>>>>>> existing
> >>>>>>>>>>>>>>>>>> connectors and formats, we currently offer only
> >>>>>> two writable
> >>>>>>>>>>>>>> metadata
> >>>>>>>>>>>>>>>>>> fields. Otherwise, one would need to declare two
> >>>>>> tables
> >>>>>>>>>>>>>>>>>> whenever a
> >>>>>>>>>>>>>>>>>> metadata columns is read (one for the source, one
> >>>>>> for the sink).
> >>>>>>>>>>>>>> This
> >>>>>>>>>>>>>>>>>> can be quite inconvientient e.g. for just reading
> >>>>>> the topic.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>>>> Timo
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> On 09.09.20 08:52, Kurt Young wrote:
> >>>>>>>>>>>>>>>>>>> I also share the concern that reusing the
> >>>>>> computed column
> >>>>>>>>>>>>>>>>>>> syntax
> >>>>>>>>>>>>>> but
> >>>>>>>>>>>>>>>> have
> >>>>>>>>>>>>>>>>>>> different semantics
> >>>>>>>>>>>>>>>>>>> would confuse users a lot.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Besides, I think metadata fields are
> >>>>>> conceptually not the same
> >>>>>>>>>> with
> >>>>>>>>>>>>>>>>>>> computed columns. The metadata
> >>>>>>>>>>>>>>>>>>> field is a connector specific thing and it only
> >>>>>> contains the
> >>>>>>>>>>>>>>>> information
> >>>>>>>>>>>>>>>>>>> that where does the field come
> >>>>>>>>>>>>>>>>>>> from (during source) or where does the field
> >>>>>> need to write to
> >>>>>>>>>>>>>> (during
> >>>>>>>>>>>>>>>>>>> sink). It's more similar with normal
> >>>>>>>>>>>>>>>>>>> fields, with assumption that all these fields
> >>>>>> need going to the
> >>>>>>>>>>>>>> data
> >>>>>>>>>>>>>>>>>> part.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Thus I'm more lean to the rejected alternative
> >>>>>> that Timo
> >>>>>>>>>> mentioned.
> >>>>>>>>>>>>>>>> And I
> >>>>>>>>>>>>>>>>>>> think we don't need the
> >>>>>>>>>>>>>>>>>>> PERSISTED keyword, SYSTEM_METADATA should be
> >>>>>> enough.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> During implementation, the framework only needs
> >>>>>> to pass such
> >>>>>>>>>>>>>> <field,
> >>>>>>>>>>>>>>>>>>> metadata field> information to the
> >>>>>>>>>>>>>>>>>>> connector, and the logic of handling such
> >>>>>> fields inside the
> >>>>>>>>>>>>>> connector
> >>>>>>>>>>>>>>>>>>> should be straightforward.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Regarding the downside Timo mentioned:
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> The disadvantage is that users cannot call
> >>>>>> UDFs or parse
> >>>>>>>>>>>>>> timestamps.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> I think this is fairly simple to solve. Since
> >>>>>> the metadata
> >>>>>>>>>>>>>>>>>>> field
> >>>>>>>>>>>>>>> isn't
> >>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>> computed column anymore, we can support
> >>>>>>>>>>>>>>>>>>> referencing such fields in the computed column.
> >>>>>> For example:
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> CREATE TABLE kafka_table (
> >>>>>>>>>>>>>>>>>>>         id BIGINT,
> >>>>>>>>>>>>>>>>>>>         name STRING,
> >>>>>>>>>>>>>>>>>>>         timestamp STRING
> >>>>>> SYSTEM_METADATA("timestamp"),  //
> >>>>>>>>>>>>>>>>>>> get the
> >>>>>>>>>>>>>>>>>> timestamp
> >>>>>>>>>>>>>>>>>>> field from metadata
> >>>>>>>>>>>>>>>>>>>         ts AS to_timestamp(timestamp) // normal
> >>>>>> computed
> >>>>>>>>>>>>>>>>>>> column,
> >>>>>>>>>>>>>> parse
> >>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> string to TIMESTAMP type by using the metadata
> >>>>>> field
> >>>>>>>>>>>>>>>>>>> ) WITH (
> >>>>>>>>>>>>>>>>>>>        ...
> >>>>>>>>>>>>>>>>>>> )
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>>>>> Kurt
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> On Tue, Sep 8, 2020 at 11:57 PM Timo Walther
> >>>>>>>>>>>>>>>>>>> <twal...@apache.org
> >>>>>>>>>>>
> >>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Hi Leonard,
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> the only alternative I see is that we
> >>>>>> introduce a concept that
> >>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>> completely different to computed columns.
> >>>>>> This is also
> >>>>>>>>>>>>>>>>>>>> mentioned
> >>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> rejected alternative section of the FLIP.
> >>>>>> Something like:
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> CREATE TABLE kafka_table (
> >>>>>>>>>>>>>>>>>>>>         id BIGINT,
> >>>>>>>>>>>>>>>>>>>>         name STRING,
> >>>>>>>>>>>>>>>>>>>>         timestamp INT
> >>>>>> SYSTEM_METADATA("timestamp") PERSISTED,
> >>>>>>>>>>>>>>>>>>>>         headers MAP<STRING, BYTES>
> >>>>>> SYSTEM_METADATA("headers")
> >>>>>>>>>>>>>>> PERSISTED
> >>>>>>>>>>>>>>>>>>>> ) WITH (
> >>>>>>>>>>>>>>>>>>>>        ...
> >>>>>>>>>>>>>>>>>>>> )
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> This way we would avoid confusion at all and
> >>>>>> can easily map
> >>>>>>>>>>>>>> columns
> >>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>> metadata columns. The disadvantage is that
> >>>>>> users cannot call
> >>>>>>>>>> UDFs
> >>>>>>>>>>>>>> or
> >>>>>>>>>>>>>>>>>>>> parse timestamps. This would need to be done
> >>>>>> in a real
> >>>>>>>>>>>>>>>>>>>> computed
> >>>>>>>>>>>>>>>> column.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> I'm happy about better alternatives.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>>>>>> Timo
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> On 08.09.20 15:37, Leonard Xu wrote:
> >>>>>>>>>>>>>>>>>>>>> HI, Timo
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Thanks for driving this FLIP.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Sorry but I have a concern about Writing
> >>>>>> metadata via
> >>>>>>>>>>>>>>>> DynamicTableSink
> >>>>>>>>>>>>>>>>>>>> section:
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> CREATE TABLE kafka_table (
> >>>>>>>>>>>>>>>>>>>>>       id BIGINT,
> >>>>>>>>>>>>>>>>>>>>>       name STRING,
> >>>>>>>>>>>>>>>>>>>>>       timestamp AS
> >>>>>> CAST(SYSTEM_METADATA("timestamp") AS
> >>>>>>>>>>>>>>>>>>>>> BIGINT)
> >>>>>>>>>>>>>>>>>> PERSISTED,
> >>>>>>>>>>>>>>>>>>>>>       headers AS
> >>>>>> CAST(SYSTEM_METADATA("headers") AS
> >>>>>>>>>>>>>>>>>>>>> MAP<STRING,
> >>>>>>>>>>>>>>>> BYTES>)
> >>>>>>>>>>>>>>>>>>>> PERSISTED
> >>>>>>>>>>>>>>>>>>>>> ) WITH (
> >>>>>>>>>>>>>>>>>>>>>       ...
> >>>>>>>>>>>>>>>>>>>>> )
> >>>>>>>>>>>>>>>>>>>>> An insert statement could look like:
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> INSERT INTO kafka_table VALUES (
> >>>>>>>>>>>>>>>>>>>>>       (1, "ABC", 1599133672, MAP('checksum',
> >>>>>>>>>>>>>> computeChecksum(...)))
> >>>>>>>>>>>>>>>>>>>>> )
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> The proposed INERT syntax does not make
> >>>>>> sense to me,
> >>>>>>>>>>>>>>>>>>>>> because it
> >>>>>>>>>>>>>>>>>> contains
> >>>>>>>>>>>>>>>>>>>> computed(generated) column.
> >>>>>>>>>>>>>>>>>>>>> Both SQL server and Postgresql do not allow
> >>>>>> to insert
> >>>>>>>>>>>>>>>>>>>>> value to
> >>>>>>>>>>>>>>>> computed
> >>>>>>>>>>>>>>>>>>>> columns even they are persisted, this boke
> >>>>>> the generated
> >>>>>>>>>>>>>>>>>>>> column
> >>>>>>>>>>>>>>>>>> semantics
> >>>>>>>>>>>>>>>>>>>> and may confuse user much.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> For SQL server computed column[1]:
> >>>>>>>>>>>>>>>>>>>>>> column_name AS computed_column_expression
> >>>>>> [ PERSISTED [ NOT
> >>>>>>>>>>>>>> NULL ]
> >>>>>>>>>>>>>>>>>> ]...
> >>>>>>>>>>>>>>>>>>>>>> NOTE: A computed column cannot be the
> >>>>>> target of an INSERT or
> >>>>>>>>>>>>>>> UPDATE
> >>>>>>>>>>>>>>>>>>>> statement.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> For Postgresql generated column[2]:
> >>>>>>>>>>>>>>>>>>>>>>      height_in numeric GENERATED ALWAYS
> >>>>>> AS (height_cm /
> >>>>>>>>>>>>>>>>>>>>>> 2.54)
> >>>>>>>>>>>>>>> STORED
> >>>>>>>>>>>>>>>>>>>>>> NOTE: A generated column cannot be
> >>>>>> written to directly. In
> >>>>>>>>>>>>>> INSERT
> >>>>>>>>>>>>>>> or
> >>>>>>>>>>>>>>>>>>>> UPDATE commands, a value cannot be specified
> >>>>>> for a generated
> >>>>>>>>>>>>>> column,
> >>>>>>>>>>>>>>>> but
> >>>>>>>>>>>>>>>>>>>> the keyword DEFAULT may be specified.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> It shouldn't be allowed to set/update value
> >>>>>> for generated
> >>>>>>>>>> column
> >>>>>>>>>>>>>>>> after
> >>>>>>>>>>>>>>>>>>>> lookup the SQL 2016:
> >>>>>>>>>>>>>>>>>>>>>> <insert statement> ::=
> >>>>>>>>>>>>>>>>>>>>>> INSERT INTO <insertion target> <insert
> >>>>>> columns and source>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> If <contextually typed table value
> >>>>>> constructor> CTTVC is
> >>>>>>>>>>>>>>> specified,
> >>>>>>>>>>>>>>>>>>>> then every <contextually typed row
> >>>>>>>>>>>>>>>>>>>>>> value constructor element> simply
> >>>>>> contained in CTTVC whose
> >>>>>>>>>>>>>>>>>> positionally
> >>>>>>>>>>>>>>>>>>>> corresponding <column name>
> >>>>>>>>>>>>>>>>>>>>>> in <insert column list> references a
> >>>>>> column of which some
> >>>>>>>>>>>>>>> underlying
> >>>>>>>>>>>>>>>>>>>> column is a generated column shall
> >>>>>>>>>>>>>>>>>>>>>> be a <default specification>.
> >>>>>>>>>>>>>>>>>>>>>> A <default specification> specifies the
> >>>>>> default value of
> >>>>>>>>>>>>>>>>>>>>>> some
> >>>>>>>>>>>>>>>>>>>> associated item.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> [1]
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>
> https://docs.microsoft.com/en-US/sql/t-sql/statements/alter-table-computed-column-definition-transact-sql?view=sql-server-ver15
> >>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> <
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>
> https://docs.microsoft.com/en-US/sql/t-sql/statements/alter-table-computed-column-definition-transact-sql?view=sql-server-ver15
> >>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> [2]
> >>>>>>>>>>>>>>
> >>>>>> https://www.postgresql.org/docs/12/ddl-generated-columns.html
> >>>>>>>>>>>>>>> <
> >>>>>>>>>>>>>>>>>>>>
> >>>>>> https://www.postgresql.org/docs/12/ddl-generated-columns.html>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> 在 2020年9月8日,17:31,Timo Walther <
> >>>>>> twal...@apache.org>
> >>>>>>>>>>>>>>>>>>>>>> 写道:
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Hi Jark,
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> according to Flink's and Calcite's
> >>>>>> casting definition in
> >>>>>>>>>> [1][2]
> >>>>>>>>>>>>>>>>>>>> TIMESTAMP WITH LOCAL TIME ZONE should be
> >>>>>> castable from BIGINT.
> >>>>>>>>>> If
> >>>>>>>>>>>>>>> not,
> >>>>>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>>> will make it possible ;-)
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> I'm aware of
> >>>>>> DeserializationSchema.getProducedType but I
> >>>>>>>>>>>>>>>>>>>>>> think
> >>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>> this method is actually misplaced. The type
> >>>>>> should rather be
> >>>>>>>>>>>>>> passed
> >>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> source itself.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> For our Kafka SQL source, we will also
> >>>>>> not use this method
> >>>>>>>>>>>>>> because
> >>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> Kafka source will add own metadata in
> >>>>>> addition to the
> >>>>>>>>>>>>>>>>>>>> DeserializationSchema. So
> >>>>>>>>>>>>>>>>>>>> DeserializationSchema.getProducedType
> >>>>>>>>>>>>>> will
> >>>>>>>>>>>>>>>>>> never
> >>>>>>>>>>>>>>>>>>>> be read.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> For now I suggest to leave out the
> >>>>>> `DataType` from
> >>>>>>>>>>>>>>>>>>>> DecodingFormat.applyReadableMetadata. Also
> >>>>>> because the
> >>>>>>>>>>>>>>>>>>>> format's
> >>>>>>>>>>>>>>>> physical
> >>>>>>>>>>>>>>>>>>>> type is passed later in
> >>>>>> `createRuntimeDecoder`. If
> >>>>>>>>>>>>>>>>>>>> necessary, it
> >>>>>>>>>>>>>> can
> >>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>> computed manually by consumedType + metadata
> >>>>>> types. We will
> >>>>>>>>>>>>>> provide
> >>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>> metadata utility class for that.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>>>>>>>> Timo
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> [1]
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>
> https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java#L200
> >>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> [2]
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>
> https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/sql/type/SqlTypeCoercionRule.java#L254
> >>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> On 08.09.20 10:52, Jark Wu wrote:
> >>>>>>>>>>>>>>>>>>>>>>> Hi Timo,
> >>>>>>>>>>>>>>>>>>>>>>> The updated CAST SYSTEM_METADATA
> >>>>>> behavior sounds good to
> >>>>>>>>>>>>>>>>>>>>>>> me.
> >>>>>>>>>> I
> >>>>>>>>>>>>>>> just
> >>>>>>>>>>>>>>>>>>>> noticed
> >>>>>>>>>>>>>>>>>>>>>>> that a BIGINT can't be converted to
> >>>>>> "TIMESTAMP(3) WITH
> >>>>>>>>>>>>>>>>>>>>>>> LOCAL
> >>>>>>>>>>>>>> TIME
> >>>>>>>>>>>>>>>>>>>> ZONE".
> >>>>>>>>>>>>>>>>>>>>>>> So maybe we need to support this, or
> >>>>>> use "TIMESTAMP(3) WITH
> >>>>>>>>>>>>>> LOCAL
> >>>>>>>>>>>>>>>>>> TIME
> >>>>>>>>>>>>>>>>>>>>>>> ZONE" as the defined type of Kafka
> >>>>>> timestamp? I think this
> >>>>>>>>>>>>>> makes
> >>>>>>>>>>>>>>>>>> sense,
> >>>>>>>>>>>>>>>>>>>>>>> because it represents the milli-seconds
> >>>>>> since epoch.
> >>>>>>>>>>>>>>>>>>>>>>> Regarding "DeserializationSchema
> >>>>>> doesn't need TypeInfo", I
> >>>>>>>>>>>>>> don't
> >>>>>>>>>>>>>>>>>> think
> >>>>>>>>>>>>>>>>>>>> so.
> >>>>>>>>>>>>>>>>>>>>>>> The DeserializationSchema implements
> >>>>>> ResultTypeQueryable,
> >>>>>>>>>> thus
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>> implementation needs to return an
> >>>>>> output TypeInfo.
> >>>>>>>>>>>>>>>>>>>>>>> Besides, FlinkKafkaConsumer also
> >>>>>>>>>>>>>>>>>>>>>>> calls
> >>>>>> DeserializationSchema.getProducedType as the produced
> >>>>>>>>>>>>>> type
> >>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>> source function [1].
> >>>>>>>>>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>>>>>>>>> Jark
> >>>>>>>>>>>>>>>>>>>>>>> [1]:
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L1066
> >>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> On Tue, 8 Sep 2020 at 16:35, Timo
> >>>>>> Walther <
> >>>>>>>>>> twal...@apache.org>
> >>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>> Hi everyone,
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> I updated the FLIP again and hope
> >>>>>> that I could address the
> >>>>>>>>>>>>>>>> mentioned
> >>>>>>>>>>>>>>>>>>>>>>>> concerns.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> @Leonard: Thanks for the explanation.
> >>>>>> I wasn't aware that
> >>>>>>>>>>>>>> ts_ms
> >>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>>>> source.ts_ms have different
> >>>>>> semantics. I updated the FLIP
> >>>>>>>>>> and
> >>>>>>>>>>>>>>>> expose
> >>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>> most commonly used properties
> >>>>>> separately. So frequently
> >>>>>>>>>>>>>>>>>>>>>>>> used
> >>>>>>>>>>>>>>>>>>>> properties
> >>>>>>>>>>>>>>>>>>>>>>>> are not hidden in the MAP anymore:
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> debezium-json.ingestion-timestamp
> >>>>>>>>>>>>>>>>>>>>>>>> debezium-json.source.timestamp
> >>>>>>>>>>>>>>>>>>>>>>>> debezium-json.source.database
> >>>>>>>>>>>>>>>>>>>>>>>> debezium-json.source.schema
> >>>>>>>>>>>>>>>>>>>>>>>> debezium-json.source.table
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> However, since other properties
> >>>>>> depend on the used
> >>>>>>>>>>>>>>>> connector/vendor,
> >>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>> remaining options are stored in:
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> debezium-json.source.properties
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> And accessed with:
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>> CAST(SYSTEM_METADATA('debezium-json.source.properties') AS
> >>>>>>>>>>>>>>>>>> MAP<STRING,
> >>>>>>>>>>>>>>>>>>>>>>>> STRING>)['table']
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Otherwise it is not possible to
> >>>>>> figure out the value and
> >>>>>>>>>>>>>> column
> >>>>>>>>>>>>>>>> type
> >>>>>>>>>>>>>>>>>>>>>>>> during validation.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> @Jark: You convinced me in relaxing
> >>>>>> the CAST
> >>>>>>>>>>>>>>>>>>>>>>>> constraints. I
> >>>>>>>>>>>>>>> added
> >>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>>>>>> dedicacated sub-section to the FLIP:
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> For making the use of SYSTEM_METADATA
> >>>>>> easier and avoid
> >>>>>>>>>> nested
> >>>>>>>>>>>>>>>>>> casting
> >>>>>>>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>>>>>>> allow explicit casting to a target
> >>>>>> data type:
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> rowtime AS
> >>>>>> CAST(SYSTEM_METADATA("timestamp") AS
> >>>>>>>>>>>>>>>>>>>>>>>> TIMESTAMP(3)
> >>>>>>>>>>>>>>> WITH
> >>>>>>>>>>>>>>>>>>>> LOCAL
> >>>>>>>>>>>>>>>>>>>>>>>> TIME ZONE)
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> A connector still produces and
> >>>>>> consumes the data type
> >>>>>>>>>> returned
> >>>>>>>>>>>>>>> by
> >>>>>>>>>>>>>>>>>>>>>>>> `listMetadata()`. The planner will
> >>>>>> insert necessary
> >>>>>>>>>>>>>>>>>>>>>>>> explicit
> >>>>>>>>>>>>>>>> casts.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> In any case, the user must provide a
> >>>>>> CAST such that the
> >>>>>>>>>>>>>> computed
> >>>>>>>>>>>>>>>>>>>> column
> >>>>>>>>>>>>>>>>>>>>>>>> receives a valid data type when
> >>>>>> constructing the table
> >>>>>>>>>> schema.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> "I don't see a reason why
> >>>>>>>>>>>>>> `DecodingFormat#applyReadableMetadata`
> >>>>>>>>>>>>>>>>>>>> needs a
> >>>>>>>>>>>>>>>>>>>>>>>> DataType argument."
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Correct he DeserializationSchema
> >>>>>> doesn't need TypeInfo, it
> >>>>>>>>>> is
> >>>>>>>>>>>>>>>> always
> >>>>>>>>>>>>>>>>>>>>>>>> executed locally. It is the source
> >>>>>> that needs TypeInfo for
> >>>>>>>>>>>>>>>>>> serializing
> >>>>>>>>>>>>>>>>>>>>>>>> the record to the next operator. And
> >>>>>> that's this is
> >>>>>>>>>>>>>>>>>>>>>>>> what we
> >>>>>>>>>>>>>>>> provide.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> @Danny:
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> “SYSTEM_METADATA("offset")` returns
> >>>>>> the NULL type by
> >>>>>>>>>> default”
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> We can also use some other means to
> >>>>>> represent an UNKNOWN
> >>>>>>>>>> data
> >>>>>>>>>>>>>>>> type.
> >>>>>>>>>>>>>>>>>> In
> >>>>>>>>>>>>>>>>>>>>>>>> the Flink type system, we use the
> >>>>>> NullType for it. The
> >>>>>>>>>>>>>> important
> >>>>>>>>>>>>>>>>>> part
> >>>>>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>>>>> that the final data type is known for
> >>>>>> the entire computed
> >>>>>>>>>>>>>>> column.
> >>>>>>>>>>>>>>>>>> As I
> >>>>>>>>>>>>>>>>>>>>>>>> mentioned before, I would avoid the
> >>>>>> suggested option b)
> >>>>>>>>>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>> would
> >>>>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>>>>>> similar to your suggestion. The CAST
> >>>>>> should be enough and
> >>>>>>>>>>>>>> allows
> >>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>>>>>> complex expressions in the computed
> >>>>>> column. Option b)
> >>>>>>>>>>>>>>>>>>>>>>>> would
> >>>>>>>>>>>>>> need
> >>>>>>>>>>>>>>>>>>>> parser
> >>>>>>>>>>>>>>>>>>>>>>>> changes.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>>>>>>>>>> Timo
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> On 08.09.20 06:21, Leonard Xu wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>> Hi, Timo
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> Thanks for you explanation and
> >>>>>> update,  I have only one
> >>>>>>>>>>>>>>> question
> >>>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>>>>>> the latest FLIP.
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> About the MAP<STRING, STRING>
> >>>>>> DataType of key
> >>>>>>>>>>>>>>>>>>>> 'debezium-json.source', if
> >>>>>>>>>>>>>>>>>>>>>>>> user want to use the table name
> >>>>>> metadata, they need to
> >>>>>>>>>> write:
> >>>>>>>>>>>>>>>>>>>>>>>>> tableName STRING AS
> >>>>>>>>>>>>>> CAST(SYSTEM_METADATA('debeuim-json.source')
> >>>>>>>>>>>>>>>> AS
> >>>>>>>>>>>>>>>>>>>>>>>> MAP<STRING, STRING>)['table']
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> the expression is a little complex
> >>>>>> for user, Could we
> >>>>>>>>>>>>>>>>>>>>>>>>> only
> >>>>>>>>>>>>>>>> support
> >>>>>>>>>>>>>>>>>>>>>>>> necessary metas with simple DataType
> >>>>>> as following?
> >>>>>>>>>>>>>>>>>>>>>>>>> tableName STRING AS
> >>>>>>>>>>>>>>>>>>>>
> >>>>>> CAST(SYSTEM_METADATA('debeuim-json.source.table') AS
> >>>>>>>>>>>>>>>>>>>>>>>> STRING),
> >>>>>>>>>>>>>>>>>>>>>>>>> transactionTime LONG AS
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>> CAST(SYSTEM_METADATA('debeuim-json.source.ts_ms') AS
> >>>>>>>>>> BIGINT),
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> In this way, we can simplify the
> >>>>>> expression, the mainly
> >>>>>>>>>> used
> >>>>>>>>>>>>>>>>>>>> metadata in
> >>>>>>>>>>>>>>>>>>>>>>>> changelog format may include
> >>>>>>>>>>>>>>>>>>>> 'database','table','source.ts_ms','ts_ms' from
> >>>>>>>>>>>>>>>>>>>>>>>> my side,
> >>>>>>>>>>>>>>>>>>>>>>>>> maybe we could only support them at
> >>>>>> first version.
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> Both Debezium and Canal have above
> >>>>>> four metadata, and I‘m
> >>>>>>>>>>>>>>> willing
> >>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>>> take some subtasks in next
> >>>>>> development if necessary.
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> Debezium:
> >>>>>>>>>>>>>>>>>>>>>>>>> {
> >>>>>>>>>>>>>>>>>>>>>>>>>        "before": null,
> >>>>>>>>>>>>>>>>>>>>>>>>>        "after": {  "id":
> >>>>>> 101,"name": "scooter"},
> >>>>>>>>>>>>>>>>>>>>>>>>>        "source": {
> >>>>>>>>>>>>>>>>>>>>>>>>>          "db":
> >>>>>> "inventory",                  # 1.
> >>>>>>>>>>>>>>>>>>>>>>>>> database
> >>>>>>>>>>>>>> name
> >>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>> changelog belongs to.
> >>>>>>>>>>>>>>>>>>>>>>>>>          "table":
> >>>>>> "products",                # 2.
> >>>>>>>>>>>>>>>>>>>>>>>>> table name
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> changelog
> >>>>>>>>>>>>>>>>>>>>>>>> belongs to.
> >>>>>>>>>>>>>>>>>>>>>>>>>          "ts_ms":
> >>>>>> 1589355504100,             # 3.
> >>>>>>>>>>>>>>>>>>>>>>>>> timestamp
> >>>>>>>>>>>> of
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> change
> >>>>>>>>>>>>>>>>>>>>>>>> happened in database system, i.e.:
> >>>>>> transaction time in
> >>>>>>>>>>>>>> database.
> >>>>>>>>>>>>>>>>>>>>>>>>>          "connector": "mysql",
> >>>>>>>>>>>>>>>>>>>>>>>>>          ….
> >>>>>>>>>>>>>>>>>>>>>>>>>        },
> >>>>>>>>>>>>>>>>>>>>>>>>>        "ts_ms":
> >>>>>> 1589355606100,              # 4.
> >>>>>>>>>>>>>>>>>>>>>>>>> timestamp
> >>>>>>>>>>>>>> when
> >>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> debezium
> >>>>>>>>>>>>>>>>>>>>>>>> processed the changelog.
> >>>>>>>>>>>>>>>>>>>>>>>>>        "op": "c",
> >>>>>>>>>>>>>>>>>>>>>>>>>        "transaction": null
> >>>>>>>>>>>>>>>>>>>>>>>>> }
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> Canal:
> >>>>>>>>>>>>>>>>>>>>>>>>> {
> >>>>>>>>>>>>>>>>>>>>>>>>>        "data": [{  "id": "102",
> >>>>>> "name": "car battery" }],
> >>>>>>>>>>>>>>>>>>>>>>>>>        "database":
> >>>>>> "inventory",      # 1. database
> >>>>>>>>>>>>>>>>>>>>>>>>> name the
> >>>>>>>>>>>>>>>> changelog
> >>>>>>>>>>>>>>>>>>>>>>>> belongs to.
> >>>>>>>>>>>>>>>>>>>>>>>>>        "table":
> >>>>>> "products",          # 2. table name the
> >>>>>>>>>>>>>>> changelog
> >>>>>>>>>>>>>>>>>>>> belongs
> >>>>>>>>>>>>>>>>>>>>>>>> to.
> >>>>>>>>>>>>>>>>>>>>>>>>>        "es":
> >>>>>> 1589374013000,          # 3. execution
> >>>>>>>>>>>>>>>>>>>>>>>>> time of
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> change
> >>>>>>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>>>>>> database system, i.e.: transaction
> >>>>>> time in database.
> >>>>>>>>>>>>>>>>>>>>>>>>>        "ts":
> >>>>>> 1589374013680,          # 4. timestamp
> >>>>>>>>>>>>>>>>>>>>>>>>> when the
> >>>>>>>>>>>>>>>> cannal
> >>>>>>>>>>>>>>>>>>>>>>>> processed the changelog.
> >>>>>>>>>>>>>>>>>>>>>>>>>        "isDdl": false,
> >>>>>>>>>>>>>>>>>>>>>>>>>        "mysqlType": {},
> >>>>>>>>>>>>>>>>>>>>>>>>>        ....
> >>>>>>>>>>>>>>>>>>>>>>>>> }
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> Best
> >>>>>>>>>>>>>>>>>>>>>>>>> Leonard
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> 在 2020年9月8日,11:57,Danny Chan
> >>>>>>>>>>>>>>>>>>>>>>>>>> <yuzhao....@gmail.com> 写道:
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> Thanks Timo ~
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> The FLIP was already in pretty
> >>>>>> good shape, I have only 2
> >>>>>>>>>>>>>>>> questions
> >>>>>>>>>>>>>>>>>>>> here:
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> 1.
> >>>>>> “`CAST(SYSTEM_METADATA("offset") AS INT)` would be a
> >>>>>>>>>>>>>> valid
> >>>>>>>>>>>>>>>>>>>> read-only
> >>>>>>>>>>>>>>>>>>>>>>>> computed column for Kafka and can be
> >>>>>> extracted by the
> >>>>>>>>>>>>>> planner.”
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> What is the pros we follow the
> >>>>>> SQL-SERVER syntax here ?
> >>>>>>>>>>>>>>> Usually
> >>>>>>>>>>>>>>>> an
> >>>>>>>>>>>>>>>>>>>>>>>> expression return type can be
> >>>>>> inferred automatically.
> >>>>>>>>>>>>>>>>>>>>>>>> But I
> >>>>>>>>>>>>>>> guess
> >>>>>>>>>>>>>>>>>>>>>>>> SQL-SERVER does not have function
> >>>>>> like SYSTEM_METADATA
> >>>>>>>>>>>>>>>>>>>>>>>> which
> >>>>>>>>>>>>>>>>>> actually
> >>>>>>>>>>>>>>>>>>>> does
> >>>>>>>>>>>>>>>>>>>>>>>> not have a specific return type.
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> And why not use the Oracle or
> >>>>>> MySQL syntax there ?
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> column_name [datatype] [GENERATED
> >>>>>> ALWAYS] AS
> >>>>>>>>>>>>>>>>>>>>>>>>>> (expression)
> >>>>>>>>>>>>>>>>>> [VIRTUAL]
> >>>>>>>>>>>>>>>>>>>>>>>>>> Which is more straight-forward.
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> 2. “SYSTEM_METADATA("offset")`
> >>>>>> returns the NULL type by
> >>>>>>>>>>>>>>> default”
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> The default type should not be
> >>>>>> NULL because only NULL
> >>>>>>>>>>>>>> literal
> >>>>>>>>>>>>>>>> does
> >>>>>>>>>>>>>>>>>>>>>>>> that. Usually we use ANY as the type
> >>>>>> if we do not know the
> >>>>>>>>>>>>>>>> specific
> >>>>>>>>>>>>>>>>>>>> type in
> >>>>>>>>>>>>>>>>>>>>>>>> the SQL context. ANY means the
> >>>>>> physical value can be any
> >>>>>>>>>> java
> >>>>>>>>>>>>>>>>>> object.
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> [1]
> >>>>>>>>>>>>>>>
> >>>>>> https://oracle-base.com/articles/11g/virtual-columns-11gr1
> >>>>>>>>>>>>>>>>>>>>>>>>>> [2]
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>
> https://dev.mysql.com/doc/refman/5.7/en/create-table-generated-columns.html
> >>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>>>>>>>>>>>> Danny Chan
> >>>>>>>>>>>>>>>>>>>>>>>>>> 在 2020年9月4日 +0800 PM4:48,Timo
> >>>>>> Walther
> >>>>>>>>>>>>>>>>>>>>>>>>>> <twal...@apache.org
> >>>>>>>>>>>>>>>> ,写道:
> >>>>>>>>>>>>>>>>>>>>>>>>>>> Hi everyone,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> I completely reworked FLIP-107.
> >>>>>> It now covers the full
> >>>>>>>>>>>>>> story
> >>>>>>>>>>>>>>>> how
> >>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>>> read
> >>>>>>>>>>>>>>>>>>>>>>>>>>> and write metadata from/to
> >>>>>> connectors and formats. It
> >>>>>>>>>>>>>>> considers
> >>>>>>>>>>>>>>>>>>>> all of
> >>>>>>>>>>>>>>>>>>>>>>>>>>> the latest FLIPs, namely
> >>>>>> FLIP-95, FLIP-132 and
> >>>>>>>>>>>>>>>>>>>>>>>>>>> FLIP-122.
> >>>>>>>>>> It
> >>>>>>>>>>>>>>>>>>>> introduces
> >>>>>>>>>>>>>>>>>>>>>>>>>>> the concept of PERSISTED
> >>>>>> computed columns and leaves
> >>>>>>>>>>>>>>>>>>>>>>>>>>> out
> >>>>>>>>>>>>>>>>>>>> partitioning
> >>>>>>>>>>>>>>>>>>>>>>>>>>> for now.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> Looking forward to your
> >>>>>> feedback.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>>>>>>>>>>>>> Timo
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> On 04.03.20 09:45, Kurt Young
> >>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Sorry, forgot one question.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> 4. Can we make the
> >>>>>> value.fields-include more
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> orthogonal?
> >>>>>>>>>>>>>>> Like
> >>>>>>>>>>>>>>>>>> one
> >>>>>>>>>>>>>>>>>>>> can
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> specify it as "EXCEPT_KEY,
> >>>>>> EXCEPT_TIMESTAMP".
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> With current EXCEPT_KEY and
> >>>>>> EXCEPT_KEY_TIMESTAMP,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> users
> >>>>>>>>>>>>>> can
> >>>>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>>>>>>>>> config to
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> just ignore timestamp but
> >>>>>> keep key.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Kurt
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Mar 4, 2020 at 4:42
> >>>>>> PM Kurt Young <
> >>>>>>>>>>>>>> ykt...@gmail.com
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Dawid,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> I have a couple of
> >>>>>> questions around key fields,
> >>>>>>>>>> actually
> >>>>>>>>>>>>>> I
> >>>>>>>>>>>>>>>> also
> >>>>>>>>>>>>>>>>>>>> have
> >>>>>>>>>>>>>>>>>>>>>>>> some
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> other questions but want to
> >>>>>> be focused on key fields
> >>>>>>>>>>>>>> first.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1. I don't fully understand
> >>>>>> the usage of
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> "key.fields".
> >>>>>>>>>> Is
> >>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>>>>>>> option only
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> valid during write
> >>>>>> operation? Because for
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> reading, I can't imagine
> >>>>>> how such options can be
> >>>>>>>>>>>>>> applied. I
> >>>>>>>>>>>>>>>>>> would
> >>>>>>>>>>>>>>>>>>>>>>>> expect
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> that there might be a
> >>>>>> SYSTEM_METADATA("key")
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> to read and assign the key
> >>>>>> to a normal field?
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2. If "key.fields" is only
> >>>>>> valid in write
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> operation, I
> >>>>>>>>>>>>>> want
> >>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>>> propose we
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> can simplify the options to
> >>>>>> not introducing
> >>>>>>>>>>>>>> key.format.type
> >>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> other related options. I
> >>>>>> think a single "key.field"
> >>>>>>>>>> (not
> >>>>>>>>>>>>>>>>>> fields)
> >>>>>>>>>>>>>>>>>>>>>>>> would be
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> enough, users can use UDF
> >>>>>> to calculate whatever key
> >>>>>>>>>> they
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> want before sink.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3. Also I don't want to
> >>>>>> introduce "value.format.type"
> >>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> "value.format.xxx" with the
> >>>>>> "value" prefix. Not every
> >>>>>>>>>>>>>>>> connector
> >>>>>>>>>>>>>>>>>>>> has a
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> concept
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> of key and values. The old
> >>>>>> parameter "format.type"
> >>>>>>>>>>>>>> already
> >>>>>>>>>>>>>>>> good
> >>>>>>>>>>>>>>>>>>>>>>>> enough to
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> use.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Kurt
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Mar 3, 2020 at
> >>>>>> 10:40 PM Jark Wu <
> >>>>>>>>>>>>>> imj...@gmail.com>
> >>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks Dawid,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I have two more questions.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> SupportsMetadata
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Introducing
> >>>>>> SupportsMetadata sounds good to me.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> But I
> >>>>>>>>>>>>>> have
> >>>>>>>>>>>>>>>>>> some
> >>>>>>>>>>>>>>>>>>>>>>>> questions
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> regarding to this
> >>>>>> interface.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1) How do the source know
> >>>>>> what the expected return
> >>>>>>>>>> type
> >>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>> each
> >>>>>>>>>>>>>>>>>>>>>>>> metadata?
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2) Where to put the
> >>>>>> metadata fields? Append to the
> >>>>>>>>>>>>>>> existing
> >>>>>>>>>>>>>>>>>>>> physical
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> fields?
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> If yes, I would suggest
> >>>>>> to change the signature to
> >>>>>>>>>>>>>>>>>> `TableSource
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>> appendMetadataFields(String[] metadataNames,
> >>>>>>>>>> DataType[]
> >>>>>>>>>>>>>>>>>>>>>>>> metadataTypes)`
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>> SYSTEM_METADATA("partition")
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Can SYSTEM_METADATA()
> >>>>>> function be used nested in a
> >>>>>>>>>>>>>>> computed
> >>>>>>>>>>>>>>>>>>>> column
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> expression? If yes, how
> >>>>>> to specify the return
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> type of
> >>>>>>>>>>>>>>>>>>>>>>>> SYSTEM_METADATA?
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Jark
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, 3 Mar 2020 at
> >>>>>> 17:06, Dawid Wysakowicz <
> >>>>>>>>>>>>>>>>>>>>>>>> dwysakow...@apache.org>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1. I thought a bit more
> >>>>>> on how the source would
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> emit
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> columns
> >>>>>>>>>>>>>>>>>>>>>>>> and I
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> now see its not exactly
> >>>>>> the same as regular
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> columns.
> >>>>>>>>>> I
> >>>>>>>>>>>>>>> see
> >>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>> need
> >>>>>>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> elaborate a bit more on
> >>>>>> that in the FLIP as you
> >>>>>>>>>> asked,
> >>>>>>>>>>>>>>>> Jark.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I do agree mostly with
> >>>>>> Danny on how we should do
> >>>>>>>>>> that.
> >>>>>>>>>>>>>>> One
> >>>>>>>>>>>>>>>>>>>>>>>> additional
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> things I would
> >>>>>> introduce is an
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> interface
> >>>>>> SupportsMetadata {
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> boolean
> >>>>>> supportsMetadata(Set<String>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> metadataFields);
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> TableSource
> >>>>>> generateMetadataFields(Set<String>
> >>>>>>>>>>>>>>>>>> metadataFields);
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> }
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> This way the source
> >>>>>> would have to declare/emit only
> >>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> requested
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> metadata fields. In
> >>>>>> order not to clash with user
> >>>>>>>>>>>>>> defined
> >>>>>>>>>>>>>>>>>>>> fields.
> >>>>>>>>>>>>>>>>>>>>>>>> When
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> emitting the metadata
> >>>>>> field I would prepend the
> >>>>>>>>>> column
> >>>>>>>>>>>>>>> name
> >>>>>>>>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>> __system_{property_name}. Therefore when requested
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>> SYSTEM_METADATA("partition") the source would
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> append
> >>>>>>>>>> a
> >>>>>>>>>>>>>>>> field
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> __system_partition to
> >>>>>> the schema. This would be
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> never
> >>>>>>>>>>>>>>>> visible
> >>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> user as it would be
> >>>>>> used only for the subsequent
> >>>>>>>>>>>>>> computed
> >>>>>>>>>>>>>>>>>>>> columns.
> >>>>>>>>>>>>>>>>>>>>>>>> If
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> that makes sense to
> >>>>>> you, I will update the FLIP
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> with
> >>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>>>>>>> description.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2. CAST vs explicit
> >>>>>> type in computed columns
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Here I agree with
> >>>>>> Danny. It is also the current
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> state
> >>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>> proposal.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3. Partitioning on
> >>>>>> computed column vs function
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Here I also agree with
> >>>>>> Danny. I also think those
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> are
> >>>>>>>>>>>>>>>>>>>> orthogonal. I
> >>>>>>>>>>>>>>>>>>>>>>>> would
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> leave out the STORED
> >>>>>> computed columns out of the
> >>>>>>>>>>>>>>>> discussion.
> >>>>>>>>>>>>>>>>>> I
> >>>>>>>>>>>>>>>>>>>>>>>> don't see
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> how do they relate to
> >>>>>> the partitioning. I
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> already put
> >>>>>>>>>>>>>>> both
> >>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>> those
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> cases in the document.
> >>>>>> We can either partition on a
> >>>>>>>>>>>>>>>> computed
> >>>>>>>>>>>>>>>>>>>>>>>> column or
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> use a udf in a
> >>>>>> partioned by clause. I am fine with
> >>>>>>>>>>>>>>> leaving
> >>>>>>>>>>>>>>>>>> out
> >>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitioning by udf in
> >>>>>> the first version if you
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> still
> >>>>>>>>>>>>>>> have
> >>>>>>>>>>>>>>>>>> some
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> concerns.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> As for your question
> >>>>>> Danny. It depends which
> >>>>>>>>>>>>>> partitioning
> >>>>>>>>>>>>>>>>>>>> strategy
> >>>>>>>>>>>>>>>>>>>>>>>> you
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> use.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> For the HASH
> >>>>>> partitioning strategy I thought it
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> would
> >>>>>>>>>>>>>>> work
> >>>>>>>>>>>>>>>> as
> >>>>>>>>>>>>>>>>>>>> you
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> explained. It would be
> >>>>>> N = MOD(expr, num). I am not
> >>>>>>>>>>>>>> sure
> >>>>>>>>>>>>>>>>>>>> though if
> >>>>>>>>>>>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> should introduce the
> >>>>>> PARTITIONS clause. Usually
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Flink
> >>>>>>>>>>>>>>> does
> >>>>>>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>>>>> own
> >>>>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> data and the partitions
> >>>>>> are already an intrinsic
> >>>>>>>>>>>>>> property
> >>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> underlying source e.g.
> >>>>>> for kafka we do not create
> >>>>>>>>>>>>>> topics,
> >>>>>>>>>>>>>>>> but
> >>>>>>>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>>>>>>> just
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> describe pre-existing
> >>>>>> pre-partitioned topic.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 4. timestamp vs
> >>>>>> timestamp.field vs
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> connector.field vs
> >>>>>>>>>>>>>> ...
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I am fine with changing
> >>>>>> it to timestamp.field to be
> >>>>>>>>>>>>>>>>>> consistent
> >>>>>>>>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> other value.fields and
> >>>>>> key.fields. Actually that
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> was
> >>>>>>>>>>>>>> also
> >>>>>>>>>>>>>>>> my
> >>>>>>>>>>>>>>>>>>>>>>>> initial
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> proposal in a first
> >>>>>> draft I prepared. I changed it
> >>>>>>>>>>>>>>>> afterwards
> >>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>>> shorten
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the key.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Dawid
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On 03/03/2020 09:00,
> >>>>>> Danny Chan wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks Dawid for
> >>>>>> bringing up this discussion, I
> >>>>>>>>>> think
> >>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>>>>>> useful
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> feature ~
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> About how the
> >>>>>> metadata outputs from source
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I think it is
> >>>>>> completely orthogonal, computed
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> column
> >>>>>>>>>>>>>>> push
> >>>>>>>>>>>>>>>>>>>> down is
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> another topic, this
> >>>>>> should not be a blocker but a
> >>>>>>>>>>>>>>>> promotion,
> >>>>>>>>>>>>>>>>>>>> if we
> >>>>>>>>>>>>>>>>>>>>>>>> do
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> have any filters on the
> >>>>>> computed column, there
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> is no
> >>>>>>>>>>>>>> need
> >>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>> do any
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> pushings; the source
> >>>>>> node just emit the complete
> >>>>>>>>>> record
> >>>>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>>>> full
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> metadata
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> with the declared
> >>>>>> physical schema, then when
> >>>>>>>>>> generating
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> virtual
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> columns, we would
> >>>>>> extract the metadata info and
> >>>>>>>>>> output
> >>>>>>>>>>>>>> as
> >>>>>>>>>>>>>>>>>> full
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> columns(with
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> full schema).
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> About the type of
> >>>>>> metadata column
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Personally i prefer
> >>>>>> explicit type instead of CAST,
> >>>>>>>>>>>>>> they
> >>>>>>>>>>>>>>>> are
> >>>>>>>>>>>>>>>>>>>>>>>> symantic
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> equivalent though,
> >>>>>> explict type is more
> >>>>>>>>>>>>>> straight-forward
> >>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>> we can
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> declare
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the nullable attribute
> >>>>>> there.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> About option A:
> >>>>>> partitioning based on acomputed
> >>>>>>>>>> column
> >>>>>>>>>>>>>>> VS
> >>>>>>>>>>>>>>>>>>>> option
> >>>>>>>>>>>>>>>>>>>>>>>> B:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitioning with just
> >>>>>> a function
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>      From the FLIP,
> >>>>>> it seems that B's
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitioning is
> >>>>>>>>>>>>>>> just
> >>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>> strategy
> >>>>>>>>>>>>>>>>>>>>>>>> when
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> writing data, the
> >>>>>> partiton column is not
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> included in
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> table
> >>>>>>>>>>>>>>>>>>>>>>>> schema,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> so
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it's just useless when
> >>>>>> reading from that.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - Compared to A, we
> >>>>>> do not need to generate the
> >>>>>>>>>>>>>>> partition
> >>>>>>>>>>>>>>>>>>>> column
> >>>>>>>>>>>>>>>>>>>>>>>> when
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> selecting from the
> >>>>>> table(but insert into)
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - For A we can also
> >>>>>> mark the column as STORED when
> >>>>>>>>>> we
> >>>>>>>>>>>>>>> want
> >>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>>> persist
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> So in my opition they
> >>>>>> are orthogonal, we can
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> support
> >>>>>>>>>>>>>>>> both, i
> >>>>>>>>>>>>>>>>>>>> saw
> >>>>>>>>>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> MySQL/Oracle[1][2]
> >>>>>> would suggest to also define the
> >>>>>>>>>>>>>>>>>> PARTITIONS
> >>>>>>>>>>>>>>>>>>>>>>>> num, and
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitions are managed
> >>>>>> under a "tablenamespace",
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>> partition
> >>>>>>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>>>>>> which
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> record is stored is
> >>>>>> partition number N, where N =
> >>>>>>>>>>>>>>> MOD(expr,
> >>>>>>>>>>>>>>>>>>>> num),
> >>>>>>>>>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> your
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> design, which partiton
> >>>>>> the record would persist ?
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> [1]
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>> https://dev.mysql.com/doc/refman/5.7/en/partitioning-hash.html
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> [2]
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>
> https://docs.oracle.com/database/121/VLDBG/GUID-F023D3ED-262F-4B19-950A-D3C8F8CDB4F4.htm#VLDBG1270
> >>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Danny Chan
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 在 2020年3月2日 +0800
> >>>>>> PM6:16,Dawid Wysakowicz <
> >>>>>>>>>>>>>>>>>>>> dwysakow...@apache.org
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ,写道:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Jark,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Ad. 2 I added a
> >>>>>> section to discuss relation to
> >>>>>>>>>>>>>> FLIP-63
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Ad. 3 Yes, I also
> >>>>>> tried to somewhat keep
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> hierarchy
> >>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>> properties.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Therefore you have the
> >>>>>> key.format.type.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I also considered
> >>>>>> exactly what you are suggesting
> >>>>>>>>>>>>>>>>>> (prefixing
> >>>>>>>>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> connector or kafka). I
> >>>>>> should've put that into an
> >>>>>>>>>>>>>>>>>>>> Option/Rejected
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> alternatives.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I agree timestamp,
> >>>>>> key.*, value.* are connector
> >>>>>>>>>>>>>>>> properties.
> >>>>>>>>>>>>>>>>>>>> Why I
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wanted to suggest not
> >>>>>> adding that prefix in the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> first
> >>>>>>>>>>>>>>>> version
> >>>>>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> actually all the
> >>>>>> properties in the WITH section are
> >>>>>>>>>>>>>>>> connector
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> properties.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Even format is in the
> >>>>>> end a connector property as
> >>>>>>>>>> some
> >>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>> sources
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> might
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> not have a format, imo.
> >>>>>> The benefit of not
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> adding the
> >>>>>>>>>>>>>>>> prefix
> >>>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>>>>> that it
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> makes the keys a bit
> >>>>>> shorter. Imagine prefixing all
> >>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> properties
> >>>>>>>>>>>>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> connector (or if we go
> >>>>>> with FLINK-12557:
> >>>>>>>>>>>>>> elasticsearch):
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>> elasticsearch.key.format.type: csv
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>> elasticsearch.key.format.field: ....
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>> elasticsearch.key.format.delimiter: ....
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>> elasticsearch.key.format.*: ....
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I am fine with
> >>>>>> doing it though if this is a
> >>>>>>>>>> preferred
> >>>>>>>>>>>>>>>>>>>> approach
> >>>>>>>>>>>>>>>>>>>>>>>> in the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> community.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Ad in-line comments:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I forgot to update
> >>>>>> the `value.fields.include`
> >>>>>>>>>>>>>> property.
> >>>>>>>>>>>>>>>> It
> >>>>>>>>>>>>>>>>>>>>>>>> should be
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> value.fields-include.
> >>>>>> Which I think you also
> >>>>>>>>>> suggested
> >>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>> comment,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> right?
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> As for the cast vs
> >>>>>> declaring output type of
> >>>>>>>>>> computed
> >>>>>>>>>>>>>>>>>> column.
> >>>>>>>>>>>>>>>>>>>> I
> >>>>>>>>>>>>>>>>>>>>>>>> think
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it's better not to use
> >>>>>> CAST, but declare a type
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of an
> >>>>>>>>>>>>>>>>>>>> expression
> >>>>>>>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> later
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> on infer the output
> >>>>>> type of SYSTEM_METADATA. The
> >>>>>>>>>> reason
> >>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>> I
> >>>>>>>>>>>>>>>>>>>> think
> >>>>>>>>>>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> way
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it will be easier to
> >>>>>> implement e.g. filter push
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> downs
> >>>>>>>>>>>>>>> when
> >>>>>>>>>>>>>>>>>>>> working
> >>>>>>>>>>>>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> native types of the
> >>>>>> source, e.g. in case of Kafka's
> >>>>>>>>>>>>>>>> offset, i
> >>>>>>>>>>>>>>>>>>>>>>>> think it's
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> better to pushdown long
> >>>>>> rather than string. This
> >>>>>>>>>> could
> >>>>>>>>>>>>>>> let
> >>>>>>>>>>>>>>>> us
> >>>>>>>>>>>>>>>>>>>> push
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> expression like e.g.
> >>>>>> offset > 12345 & offset <
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 59382.
> >>>>>>>>>>>>>>>>>>>> Otherwise we
> >>>>>>>>>>>>>>>>>>>>>>>> would
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> have to push down
> >>>>>> cast(offset, long) > 12345 &&
> >>>>>>>>>>>>>>>> cast(offset,
> >>>>>>>>>>>>>>>>>>>> long)
> >>>>>>>>>>>>>>>>>>>>>>>> <
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 59382.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Moreover I think we
> >>>>>> need to introduce the type for
> >>>>>>>>>>>>>>> computed
> >>>>>>>>>>>>>>>>>>>> columns
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> anyway
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to support functions
> >>>>>> that infer output type
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> based on
> >>>>>>>>>>>>>>>> expected
> >>>>>>>>>>>>>>>>>>>>>>>> return
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> type.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> As for the computed
> >>>>>> column push down. Yes,
> >>>>>>>>>>>>>>>> SYSTEM_METADATA
> >>>>>>>>>>>>>>>>>>>> would
> >>>>>>>>>>>>>>>>>>>>>>>> have
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to be pushed down to
> >>>>>> the source. If it is not
> >>>>>>>>>> possible
> >>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>> planner
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> should
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> fail. As far as I know
> >>>>>> computed columns push down
> >>>>>>>>>> will
> >>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>> part
> >>>>>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>>>>>> source
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> rework, won't it? ;)
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> As for the
> >>>>>> persisted computed column. I think
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it is
> >>>>>>>>>>>>>>>>>>>> completely
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> orthogonal. In my
> >>>>>> current proposal you can also
> >>>>>>>>>>>>>> partition
> >>>>>>>>>>>>>>>> by
> >>>>>>>>>>>>>>>>>> a
> >>>>>>>>>>>>>>>>>>>>>>>> computed
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> column. The difference
> >>>>>> between using a udf in
> >>>>>>>>>>>>>> partitioned
> >>>>>>>>>>>>>>>> by
> >>>>>>>>>>>>>>>>>> vs
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitioned
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> by a computed column is
> >>>>>> that when you partition
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> by a
> >>>>>>>>>>>>>>>> computed
> >>>>>>>>>>>>>>>>>>>>>>>> column
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> column must be also
> >>>>>> computed when reading the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> table.
> >>>>>>>>>> If
> >>>>>>>>>>>>>>> you
> >>>>>>>>>>>>>>>>>>>> use a
> >>>>>>>>>>>>>>>>>>>>>>>> udf in
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the partitioned by, the
> >>>>>> expression is computed only
> >>>>>>>>>>>>>> when
> >>>>>>>>>>>>>>>>>>>> inserting
> >>>>>>>>>>>>>>>>>>>>>>>> into
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> table.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hope this answers
> >>>>>> some of your questions. Looking
> >>>>>>>>>>>>>>> forward
> >>>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>>>>>> further
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> suggestions.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Dawid
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On 02/03/2020
> >>>>>> 05:18, Jark Wu wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks Dawid for
> >>>>>> starting such a great
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> discussion.
> >>>>>>>>>>>>>>>> Reaing
> >>>>>>>>>>>>>>>>>>>>>>>> metadata
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> key-part
> >>>>>> information from source is an important
> >>>>>>>>>>>>>>> feature
> >>>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> streaming
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> users.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> In general, I
> >>>>>> agree with the proposal of the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> FLIP.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I will leave my
> >>>>>> thoughts and comments here:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1) +1 to use
> >>>>>> connector properties instead of
> >>>>>>>>>>>>>>> introducing
> >>>>>>>>>>>>>>>>>>>> HEADER
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> keyword as
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the reason you
> >>>>>> mentioned in the FLIP.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2) we already
> >>>>>> introduced PARTITIONED BY in
> >>>>>>>>>> FLIP-63.
> >>>>>>>>>>>>>>>> Maybe
> >>>>>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>>>>>>> should
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> add a
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> section to
> >>>>>> explain what's the relationship
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> between
> >>>>>>>>>>>>>>> them.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Do their concepts
> >>>>>> conflict? Could INSERT
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> PARTITION
> >>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>> used
> >>>>>>>>>>>>>>>>>>>> on
> >>>>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> PARTITIONED table
> >>>>>> in this FLIP?
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3) Currently,
> >>>>>> properties are hierarchical in
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Flink
> >>>>>>>>>>>>>>> SQL.
> >>>>>>>>>>>>>>>>>>>> Shall we
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> make
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> new introduced
> >>>>>> properties more hierarchical?
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> For example,
> >>>>>> "timestamp" =>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>> "connector.timestamp"?
> >>>>>>>>>>>>>>>>>>>> (actually, I
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> prefer
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> "kafka.timestamp"
> >>>>>> which is another
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> improvement for
> >>>>>>>>>>>>>>>>>>>> properties
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> FLINK-12557)
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> A single
> >>>>>> "timestamp" in properties may mislead
> >>>>>>>>>> users
> >>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> field
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> a rowtime
> >>>>>> attribute.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I also left some
> >>>>>> minor comments in the FLIP.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Jark
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Sun, 1 Mar
> >>>>>> 2020 at 22:30, Dawid Wysakowicz <
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> dwysakow...@apache.org>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I would like to
> >>>>>> propose an improvement that
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> would
> >>>>>>>>>>>>>>>> enable
> >>>>>>>>>>>>>>>>>>>>>>>> reading
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> table
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> columns from
> >>>>>> different parts of source records.
> >>>>>>>>>>>>>>> Besides
> >>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>> main
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> payload
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> majority (if
> >>>>>> not all of the sources) expose
> >>>>>>>>>>>>>>> additional
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> information. It
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> can be simply a
> >>>>>> read-only metadata such as
> >>>>>>>>>> offset,
> >>>>>>>>>>>>>>>>>>>> ingestion
> >>>>>>>>>>>>>>>>>>>>>>>> time
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> or a
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> read and write
> >>>>>> parts of the record that contain
> >>>>>>>>>>>>>> data
> >>>>>>>>>>>>>>>> but
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> additionally
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> serve different
> >>>>>> purposes (partitioning,
> >>>>>>>>>> compaction
> >>>>>>>>>>>>>>>> etc.),
> >>>>>>>>>>>>>>>>>>>> e.g.
> >>>>>>>>>>>>>>>>>>>>>>>> key
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> or
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> timestamp in
> >>>>>> Kafka.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> We should make
> >>>>>> it possible to read and write
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> data
> >>>>>>>>>>>>>>> from
> >>>>>>>>>>>>>>>>>> all
> >>>>>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>>>>>> those
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> locations. In
> >>>>>> this proposal I discuss reading
> >>>>>>>>>>>>>>>>>> partitioning
> >>>>>>>>>>>>>>>>>>>>>>>> data,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> completeness
> >>>>>> this proposal discusses also the
> >>>>>>>>>>>>>>>>>> partitioning
> >>>>>>>>>>>>>>>>>>>> when
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> writing
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> data out.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I am looking
> >>>>>> forward to your comments.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> You can access
> >>>>>> the FLIP here:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records?src=contextnavpagetreemode
> >>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Dawid
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>
> >
>
>

Reply via email to