“Personally, I still like the computed column design more because it
allows to have full flexibility to compute the final column”

I have the same feeling, the non-standard syntax "timestamp INT
SYSTEM_METADATA("ts")" is neither a computed column nor normal column. It
looks very likely a computed column but it's not (no AS keyword there), we
should be cautious for such syntax because we use a function as a column
constraint. No SQL vendor has such a syntax.

Can we just use the SQL keyword as a constraint to mark the column as
metadata ?

timestamp INT METADATA [FROM 'my-timestamp-field'] [VIRTUAL]

Note that the "FROM 'field name'" is only needed when the name conflicts
with the declared table column name, when there are no conflicts, we can
simplify it to:

timestamp INT METADATA

By default, the field is non-virtual and can be read and written, users
need to mark the column as virtual when it is only readable.

Timo Walther <twal...@apache.org> 于2020年9月9日周三 下午6:41写道:

> Hi everyone,
>
> "key" and "value" in the properties are a special case because they need
> to configure a format. So key and value are more than just metadata.
> Jark's example for setting a timestamp would work but as the FLIP
> discusses, we have way more metadata fields like headers, epoch-leader,
> etc. Having a property for all of this metadata would mess up the WITH
> section entirely. Furthermore, we also want to deal with metadata from
> the formats. Solving this through properties as well would further
> complicate the property design.
>
> Personally, I still like the computed column design more because it
> allows to have full flexibility to compute the final column:
>
> timestamp AS adjustTimestamp(CAST(SYSTEM_METADATA("ts") AS TIMESTAMP(3)))
>
> Instead of having a helper column and a real column in the table:
>
> helperTimestamp AS CAST(SYSTEM_METADATA("ts") AS TIMESTAMP(3))
> realTimestamp AS adjustTimestamp(helperTimestamp)
>
> But I see that the discussion leans towards:
>
> timestamp INT SYSTEM_METADATA("ts")
>
> Which is fine with me. It is the shortest solution, because we don't
> need additional CAST. We can discuss the syntax, so that confusion with
> computed columns can be avoided.
>
> timestamp INT USING SYSTEM_METADATA("ts")
> timestamp INT FROM SYSTEM_METADATA("ts")
> timestamp INT FROM SYSTEM_METADATA("ts") PERSISTED
>
> We use `SYSTEM_TIME` for temporal tables. I think prefixing with SYSTEM
> makes it clearer that it comes magically from the system.
>
> What do you think?
>
> Regards,
> Timo
>
>
>
> On 09.09.20 11:41, Jark Wu wrote:
> > Hi Danny,
> >
> > This is not Oracle and MySQL computed column syntax, because there is no
> > "AS" after the type.
> >
> > Hi everyone,
> >
> > If we want to use "offset INT SYSTEM_METADATA("offset")", then I think we
> > must further discuss about "PERSISED" or "VIRTUAL" keyword for query-sink
> > schema problem.
> > Personally, I think we can use a shorter keyword "METADATA" for
> > "SYSTEM_METADATA". Because "SYSTEM_METADATA" sounds like a system
> function
> > and confuse users this looks like a computed column.
> >
> >
> > Best,
> > Jark
> >
> >
> >
> > On Wed, 9 Sep 2020 at 17:23, Danny Chan <danny0...@apache.org> wrote:
> >
> >> "offset INT SYSTEM_METADATA("offset")"
> >>
> >> This is actually Oracle or MySQL style computed column syntax.
> >>
> >> "You are right that one could argue that "timestamp", "headers" are
> >> something like "key" and "value""
> >>
> >> I have the same feeling, both key value and headers timestamp are *real*
> >> data
> >> stored in the consumed record, they are not computed or generated.
> >>
> >> "Trying to solve everything via properties sounds rather like a hack to
> >> me"
> >>
> >> Things are not that hack if we can unify the routines or the definitions
> >> (all from the computed column way or all from the table options), i also
> >> think that it is a hacky that we mix in 2 kinds of syntax for different
> >> kinds of metadata (read-only and read-write). In this FLIP, we declare
> the
> >> Kafka key fields with table options but SYSTEM_METADATA for other
> metadata,
> >> that is a hacky thing or something in-consistent.
> >>
> >> Kurt Young <ykt...@gmail.com> 于2020年9月9日周三 下午4:48写道:
> >>
> >>>   I would vote for `offset INT SYSTEM_METADATA("offset")`.
> >>>
> >>> I don't think we can stick with the SQL standard in DDL part forever,
> >>> especially as there are more and more
> >>> requirements coming from different connectors and external systems.
> >>>
> >>> Best,
> >>> Kurt
> >>>
> >>>
> >>> On Wed, Sep 9, 2020 at 4:40 PM Timo Walther <twal...@apache.org>
> wrote:
> >>>
> >>>> Hi Jark,
> >>>>
> >>>> now we are back at the original design proposed by Dawid :D Yes, we
> >>>> should be cautious about adding new syntax. But the length of this
> >>>> discussion shows that we are looking for a good long-term solution. In
> >>>> this case I would rather vote for a deep integration into the syntax.
> >>>>
> >>>> Computed columns are also not SQL standard compliant. And our DDL is
> >>>> neither, so we have some degree of freedom here.
> >>>>
> >>>> Trying to solve everything via properties sounds rather like a hack to
> >>>> me. You are right that one could argue that "timestamp", "headers" are
> >>>> something like "key" and "value". However, mixing
> >>>>
> >>>> `offset AS SYSTEM_METADATA("offset")`
> >>>>
> >>>> and
> >>>>
> >>>> `'timestamp.field' = 'ts'`
> >>>>
> >>>> looks more confusing to users that an explicit
> >>>>
> >>>> `offset AS CAST(SYSTEM_METADATA("offset") AS INT)`
> >>>>
> >>>> or
> >>>>
> >>>> `offset INT SYSTEM_METADATA("offset")`
> >>>>
> >>>> that is symetric for both source and sink.
> >>>>
> >>>> What do others think?
> >>>>
> >>>> Regards,
> >>>> Timo
> >>>>
> >>>>
> >>>> On 09.09.20 10:09, Jark Wu wrote:
> >>>>> Hi everyone,
> >>>>>
> >>>>> I think we have a conclusion that the writable metadata shouldn't be
> >>>>> defined as a computed column, but a normal column.
> >>>>>
> >>>>> "timestamp STRING SYSTEM_METADATA('timestamp')" is one of the
> >>> approaches.
> >>>>> However, it is not SQL standard compliant, we need to be cautious
> >>> enough
> >>>>> when adding new syntax.
> >>>>> Besides, we have to introduce the `PERSISTED` or `VIRTUAL` keyword to
> >>>>> resolve the query-sink schema problem if it is read-only metadata.
> >> That
> >>>>> adds more stuff to learn for users.
> >>>>>
> >>>>> >From my point of view, the "timestamp", "headers" are something like
> >>>> "key"
> >>>>> and "value" that stores with the real data. So why not define the
> >>>>> "timestamp" in the same way with "key" by using a "timestamp.field"
> >>>>> connector option?
> >>>>> On the other side, the read-only metadata, such as "offset",
> >> shouldn't
> >>> be
> >>>>> defined as a normal column. So why not use the existing computed
> >> column
> >>>>> syntax for such metadata? Then we don't have the query-sink schema
> >>>> problem.
> >>>>> So here is my proposal:
> >>>>>
> >>>>> CREATE TABLE kafka_table (
> >>>>>     id BIGINT,
> >>>>>     name STRING,
> >>>>>     col1 STRING,
> >>>>>     col2 STRING,
> >>>>>     ts TIMESTAMP(3) WITH LOCAL TIME ZONE,    -- ts is a normal field,
> >> so
> >>>> can
> >>>>> be read and written.
> >>>>>     offset AS SYSTEM_METADATA("offset")
> >>>>> ) WITH (
> >>>>>     'connector' = 'kafka',
> >>>>>     'topic' = 'test-topic',
> >>>>>     'key.fields' = 'id, name',
> >>>>>     'key.format' = 'csv',
> >>>>>     'value.format' = 'avro',
> >>>>>     'timestamp.field' = 'ts'    -- define the mapping of Kafka
> >> timestamp
> >>>>> );
> >>>>>
> >>>>> INSERT INTO kafka_table
> >>>>> SELECT id, name, col1, col2, rowtime FROM another_table;
> >>>>>
> >>>>> I think this can solve all the problems without introducing any new
> >>>> syntax.
> >>>>> The only minor disadvantage is that we separate the definition
> >>> way/syntax
> >>>>> of read-only metadata and read-write fields.
> >>>>> However, I don't think this is a big problem.
> >>>>>
> >>>>> Best,
> >>>>> Jark
> >>>>>
> >>>>>
> >>>>> On Wed, 9 Sep 2020 at 15:09, Timo Walther <twal...@apache.org>
> >> wrote:
> >>>>>
> >>>>>> Hi Kurt,
> >>>>>>
> >>>>>> thanks for sharing your opinion. I'm totally up for not reusing
> >>> computed
> >>>>>> columns. I think Jark was a big supporter of this syntax, @Jark are
> >>> you
> >>>>>> fine with this as well? The non-computed column approach was only a
> >>>>>> "slightly rejected alternative".
> >>>>>>
> >>>>>> Furthermore, we would need to think about how such a new design
> >>>>>> influences the LIKE clause though.
> >>>>>>
> >>>>>> However, we should still keep the `PERSISTED` keyword as it
> >> influences
> >>>>>> the query->sink schema. If you look at the list of metadata for
> >>> existing
> >>>>>> connectors and formats, we currently offer only two writable
> >> metadata
> >>>>>> fields. Otherwise, one would need to declare two tables whenever a
> >>>>>> metadata columns is read (one for the source, one for the sink).
> >> This
> >>>>>> can be quite inconvientient e.g. for just reading the topic.
> >>>>>>
> >>>>>> Regards,
> >>>>>> Timo
> >>>>>>
> >>>>>>
> >>>>>> On 09.09.20 08:52, Kurt Young wrote:
> >>>>>>> I also share the concern that reusing the computed column syntax
> >> but
> >>>> have
> >>>>>>> different semantics
> >>>>>>> would confuse users a lot.
> >>>>>>>
> >>>>>>> Besides, I think metadata fields are conceptually not the same with
> >>>>>>> computed columns. The metadata
> >>>>>>> field is a connector specific thing and it only contains the
> >>>> information
> >>>>>>> that where does the field come
> >>>>>>> from (during source) or where does the field need to write to
> >> (during
> >>>>>>> sink). It's more similar with normal
> >>>>>>> fields, with assumption that all these fields need going to the
> >> data
> >>>>>> part.
> >>>>>>>
> >>>>>>> Thus I'm more lean to the rejected alternative that Timo mentioned.
> >>>> And I
> >>>>>>> think we don't need the
> >>>>>>> PERSISTED keyword, SYSTEM_METADATA should be enough.
> >>>>>>>
> >>>>>>> During implementation, the framework only needs to pass such
> >> <field,
> >>>>>>> metadata field> information to the
> >>>>>>> connector, and the logic of handling such fields inside the
> >> connector
> >>>>>>> should be straightforward.
> >>>>>>>
> >>>>>>> Regarding the downside Timo mentioned:
> >>>>>>>
> >>>>>>>> The disadvantage is that users cannot call UDFs or parse
> >> timestamps.
> >>>>>>>
> >>>>>>> I think this is fairly simple to solve. Since the metadata field
> >>> isn't
> >>>> a
> >>>>>>> computed column anymore, we can support
> >>>>>>> referencing such fields in the computed column. For example:
> >>>>>>>
> >>>>>>> CREATE TABLE kafka_table (
> >>>>>>>         id BIGINT,
> >>>>>>>         name STRING,
> >>>>>>>         timestamp STRING SYSTEM_METADATA("timestamp"),  // get the
> >>>>>> timestamp
> >>>>>>> field from metadata
> >>>>>>>         ts AS to_timestamp(timestamp) // normal computed column,
> >> parse
> >>>> the
> >>>>>>> string to TIMESTAMP type by using the metadata field
> >>>>>>> ) WITH (
> >>>>>>>        ...
> >>>>>>> )
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Kurt
> >>>>>>>
> >>>>>>>
> >>>>>>> On Tue, Sep 8, 2020 at 11:57 PM Timo Walther <twal...@apache.org>
> >>>> wrote:
> >>>>>>>
> >>>>>>>> Hi Leonard,
> >>>>>>>>
> >>>>>>>> the only alternative I see is that we introduce a concept that is
> >>>>>>>> completely different to computed columns. This is also mentioned
> >> in
> >>>> the
> >>>>>>>> rejected alternative section of the FLIP. Something like:
> >>>>>>>>
> >>>>>>>> CREATE TABLE kafka_table (
> >>>>>>>>         id BIGINT,
> >>>>>>>>         name STRING,
> >>>>>>>>         timestamp INT SYSTEM_METADATA("timestamp") PERSISTED,
> >>>>>>>>         headers MAP<STRING, BYTES> SYSTEM_METADATA("headers")
> >>> PERSISTED
> >>>>>>>> ) WITH (
> >>>>>>>>        ...
> >>>>>>>> )
> >>>>>>>>
> >>>>>>>> This way we would avoid confusion at all and can easily map
> >> columns
> >>> to
> >>>>>>>> metadata columns. The disadvantage is that users cannot call UDFs
> >> or
> >>>>>>>> parse timestamps. This would need to be done in a real computed
> >>>> column.
> >>>>>>>>
> >>>>>>>> I'm happy about better alternatives.
> >>>>>>>>
> >>>>>>>> Regards,
> >>>>>>>> Timo
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On 08.09.20 15:37, Leonard Xu wrote:
> >>>>>>>>> HI, Timo
> >>>>>>>>>
> >>>>>>>>> Thanks for driving this FLIP.
> >>>>>>>>>
> >>>>>>>>> Sorry but I have a concern about Writing metadata via
> >>>> DynamicTableSink
> >>>>>>>> section:
> >>>>>>>>>
> >>>>>>>>> CREATE TABLE kafka_table (
> >>>>>>>>>       id BIGINT,
> >>>>>>>>>       name STRING,
> >>>>>>>>>       timestamp AS CAST(SYSTEM_METADATA("timestamp") AS BIGINT)
> >>>>>> PERSISTED,
> >>>>>>>>>       headers AS CAST(SYSTEM_METADATA("headers") AS MAP<STRING,
> >>>> BYTES>)
> >>>>>>>> PERSISTED
> >>>>>>>>> ) WITH (
> >>>>>>>>>       ...
> >>>>>>>>> )
> >>>>>>>>> An insert statement could look like:
> >>>>>>>>>
> >>>>>>>>> INSERT INTO kafka_table VALUES (
> >>>>>>>>>       (1, "ABC", 1599133672, MAP('checksum',
> >> computeChecksum(...)))
> >>>>>>>>> )
> >>>>>>>>>
> >>>>>>>>> The proposed INERT syntax does not make sense to me, because it
> >>>>>> contains
> >>>>>>>> computed(generated) column.
> >>>>>>>>> Both SQL server and Postgresql do not allow to insert value to
> >>>> computed
> >>>>>>>> columns even they are persisted, this boke the generated column
> >>>>>> semantics
> >>>>>>>> and may confuse user much.
> >>>>>>>>>
> >>>>>>>>> For SQL server computed column[1]:
> >>>>>>>>>> column_name AS computed_column_expression [ PERSISTED [ NOT
> >> NULL ]
> >>>>>> ]...
> >>>>>>>>>> NOTE: A computed column cannot be the target of an INSERT or
> >>> UPDATE
> >>>>>>>> statement.
> >>>>>>>>>
> >>>>>>>>> For Postgresql generated column[2]:
> >>>>>>>>>>      height_in numeric GENERATED ALWAYS AS (height_cm / 2.54)
> >>> STORED
> >>>>>>>>>> NOTE: A generated column cannot be written to directly. In
> >> INSERT
> >>> or
> >>>>>>>> UPDATE commands, a value cannot be specified for a generated
> >> column,
> >>>> but
> >>>>>>>> the keyword DEFAULT may be specified.
> >>>>>>>>>
> >>>>>>>>> It shouldn't be allowed to set/update value for generated column
> >>>> after
> >>>>>>>> lookup the SQL 2016:
> >>>>>>>>>> <insert statement> ::=
> >>>>>>>>>> INSERT INTO <insertion target> <insert columns and source>
> >>>>>>>>>>
> >>>>>>>>>> If <contextually typed table value constructor> CTTVC is
> >>> specified,
> >>>>>>>> then every <contextually typed row
> >>>>>>>>>> value constructor element> simply contained in CTTVC whose
> >>>>>> positionally
> >>>>>>>> corresponding <column name>
> >>>>>>>>>> in <insert column list> references a column of which some
> >>> underlying
> >>>>>>>> column is a generated column shall
> >>>>>>>>>> be a <default specification>.
> >>>>>>>>>> A <default specification> specifies the default value of some
> >>>>>>>> associated item.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> [1]
> >>>>>>>>
> >>>>>>
> >>>>
> >>>
> >>
> https://docs.microsoft.com/en-US/sql/t-sql/statements/alter-table-computed-column-definition-transact-sql?view=sql-server-ver15
> >>>>>>>> <
> >>>>>>>>
> >>>>>>
> >>>>
> >>>
> >>
> https://docs.microsoft.com/en-US/sql/t-sql/statements/alter-table-computed-column-definition-transact-sql?view=sql-server-ver15
> >>>>>>>>>
> >>>>>>>>> [2]
> >> https://www.postgresql.org/docs/12/ddl-generated-columns.html
> >>> <
> >>>>>>>> https://www.postgresql.org/docs/12/ddl-generated-columns.html>
> >>>>>>>>>
> >>>>>>>>>> 在 2020年9月8日,17:31,Timo Walther <twal...@apache.org> 写道:
> >>>>>>>>>>
> >>>>>>>>>> Hi Jark,
> >>>>>>>>>>
> >>>>>>>>>> according to Flink's and Calcite's casting definition in [1][2]
> >>>>>>>> TIMESTAMP WITH LOCAL TIME ZONE should be castable from BIGINT. If
> >>> not,
> >>>>>> we
> >>>>>>>> will make it possible ;-)
> >>>>>>>>>>
> >>>>>>>>>> I'm aware of DeserializationSchema.getProducedType but I think
> >>> that
> >>>>>>>> this method is actually misplaced. The type should rather be
> >> passed
> >>> to
> >>>>>> the
> >>>>>>>> source itself.
> >>>>>>>>>>
> >>>>>>>>>> For our Kafka SQL source, we will also not use this method
> >> because
> >>>> the
> >>>>>>>> Kafka source will add own metadata in addition to the
> >>>>>>>> DeserializationSchema. So DeserializationSchema.getProducedType
> >> will
> >>>>>> never
> >>>>>>>> be read.
> >>>>>>>>>>
> >>>>>>>>>> For now I suggest to leave out the `DataType` from
> >>>>>>>> DecodingFormat.applyReadableMetadata. Also because the format's
> >>>> physical
> >>>>>>>> type is passed later in `createRuntimeDecoder`. If necessary, it
> >> can
> >>>> be
> >>>>>>>> computed manually by consumedType + metadata types. We will
> >> provide
> >>> a
> >>>>>>>> metadata utility class for that.
> >>>>>>>>>>
> >>>>>>>>>> Regards,
> >>>>>>>>>> Timo
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> [1]
> >>>>>>>>
> >>>>>>
> >>>>
> >>>
> >>
> https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java#L200
> >>>>>>>>>> [2]
> >>>>>>>>
> >>>>>>
> >>>>
> >>>
> >>
> https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/sql/type/SqlTypeCoercionRule.java#L254
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On 08.09.20 10:52, Jark Wu wrote:
> >>>>>>>>>>> Hi Timo,
> >>>>>>>>>>> The updated CAST SYSTEM_METADATA behavior sounds good to me. I
> >>> just
> >>>>>>>> noticed
> >>>>>>>>>>> that a BIGINT can't be converted to "TIMESTAMP(3) WITH LOCAL
> >> TIME
> >>>>>>>> ZONE".
> >>>>>>>>>>> So maybe we need to support this, or use "TIMESTAMP(3) WITH
> >> LOCAL
> >>>>>> TIME
> >>>>>>>>>>> ZONE" as the defined type of Kafka timestamp? I think this
> >> makes
> >>>>>> sense,
> >>>>>>>>>>> because it represents the milli-seconds since epoch.
> >>>>>>>>>>> Regarding "DeserializationSchema doesn't need TypeInfo", I
> >> don't
> >>>>>> think
> >>>>>>>> so.
> >>>>>>>>>>> The DeserializationSchema implements ResultTypeQueryable, thus
> >>> the
> >>>>>>>>>>> implementation needs to return an output TypeInfo.
> >>>>>>>>>>> Besides, FlinkKafkaConsumer also
> >>>>>>>>>>> calls DeserializationSchema.getProducedType as the produced
> >> type
> >>> of
> >>>>>> the
> >>>>>>>>>>> source function [1].
> >>>>>>>>>>> Best,
> >>>>>>>>>>> Jark
> >>>>>>>>>>> [1]:
> >>>>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>
> >>>
> >>
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L1066
> >>>>>>>>>>> On Tue, 8 Sep 2020 at 16:35, Timo Walther <twal...@apache.org>
> >>>>>> wrote:
> >>>>>>>>>>>> Hi everyone,
> >>>>>>>>>>>>
> >>>>>>>>>>>> I updated the FLIP again and hope that I could address the
> >>>> mentioned
> >>>>>>>>>>>> concerns.
> >>>>>>>>>>>>
> >>>>>>>>>>>> @Leonard: Thanks for the explanation. I wasn't aware that
> >> ts_ms
> >>>> and
> >>>>>>>>>>>> source.ts_ms have different semantics. I updated the FLIP and
> >>>> expose
> >>>>>>>> the
> >>>>>>>>>>>> most commonly used properties separately. So frequently used
> >>>>>>>> properties
> >>>>>>>>>>>> are not hidden in the MAP anymore:
> >>>>>>>>>>>>
> >>>>>>>>>>>> debezium-json.ingestion-timestamp
> >>>>>>>>>>>> debezium-json.source.timestamp
> >>>>>>>>>>>> debezium-json.source.database
> >>>>>>>>>>>> debezium-json.source.schema
> >>>>>>>>>>>> debezium-json.source.table
> >>>>>>>>>>>>
> >>>>>>>>>>>> However, since other properties depend on the used
> >>>> connector/vendor,
> >>>>>>>> the
> >>>>>>>>>>>> remaining options are stored in:
> >>>>>>>>>>>>
> >>>>>>>>>>>> debezium-json.source.properties
> >>>>>>>>>>>>
> >>>>>>>>>>>> And accessed with:
> >>>>>>>>>>>>
> >>>>>>>>>>>> CAST(SYSTEM_METADATA('debezium-json.source.properties') AS
> >>>>>> MAP<STRING,
> >>>>>>>>>>>> STRING>)['table']
> >>>>>>>>>>>>
> >>>>>>>>>>>> Otherwise it is not possible to figure out the value and
> >> column
> >>>> type
> >>>>>>>>>>>> during validation.
> >>>>>>>>>>>>
> >>>>>>>>>>>> @Jark: You convinced me in relaxing the CAST constraints. I
> >>> added
> >>>> a
> >>>>>>>>>>>> dedicacated sub-section to the FLIP:
> >>>>>>>>>>>>
> >>>>>>>>>>>> For making the use of SYSTEM_METADATA easier and avoid nested
> >>>>>> casting
> >>>>>>>> we
> >>>>>>>>>>>> allow explicit casting to a target data type:
> >>>>>>>>>>>>
> >>>>>>>>>>>> rowtime AS CAST(SYSTEM_METADATA("timestamp") AS TIMESTAMP(3)
> >>> WITH
> >>>>>>>> LOCAL
> >>>>>>>>>>>> TIME ZONE)
> >>>>>>>>>>>>
> >>>>>>>>>>>> A connector still produces and consumes the data type returned
> >>> by
> >>>>>>>>>>>> `listMetadata()`. The planner will insert necessary explicit
> >>>> casts.
> >>>>>>>>>>>>
> >>>>>>>>>>>> In any case, the user must provide a CAST such that the
> >> computed
> >>>>>>>> column
> >>>>>>>>>>>> receives a valid data type when constructing the table schema.
> >>>>>>>>>>>>
> >>>>>>>>>>>> "I don't see a reason why
> >> `DecodingFormat#applyReadableMetadata`
> >>>>>>>> needs a
> >>>>>>>>>>>> DataType argument."
> >>>>>>>>>>>>
> >>>>>>>>>>>> Correct he DeserializationSchema doesn't need TypeInfo, it is
> >>>> always
> >>>>>>>>>>>> executed locally. It is the source that needs TypeInfo for
> >>>>>> serializing
> >>>>>>>>>>>> the record to the next operator. And that's this is what we
> >>>> provide.
> >>>>>>>>>>>>
> >>>>>>>>>>>> @Danny:
> >>>>>>>>>>>>
> >>>>>>>>>>>> “SYSTEM_METADATA("offset")` returns the NULL type by default”
> >>>>>>>>>>>>
> >>>>>>>>>>>> We can also use some other means to represent an UNKNOWN data
> >>>> type.
> >>>>>> In
> >>>>>>>>>>>> the Flink type system, we use the NullType for it. The
> >> important
> >>>>>> part
> >>>>>>>> is
> >>>>>>>>>>>> that the final data type is known for the entire computed
> >>> column.
> >>>>>> As I
> >>>>>>>>>>>> mentioned before, I would avoid the suggested option b) that
> >>> would
> >>>>>> be
> >>>>>>>>>>>> similar to your suggestion. The CAST should be enough and
> >> allows
> >>>> for
> >>>>>>>>>>>> complex expressions in the computed column. Option b) would
> >> need
> >>>>>>>> parser
> >>>>>>>>>>>> changes.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Regards,
> >>>>>>>>>>>> Timo
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> On 08.09.20 06:21, Leonard Xu wrote:
> >>>>>>>>>>>>> Hi, Timo
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks for you explanation and update,  I have only one
> >>> question
> >>>>>> for
> >>>>>>>>>>>> the latest FLIP.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> About the MAP<STRING, STRING> DataType of key
> >>>>>>>> 'debezium-json.source', if
> >>>>>>>>>>>> user want to use the table name metadata, they need to write:
> >>>>>>>>>>>>> tableName STRING AS
> >> CAST(SYSTEM_METADATA('debeuim-json.source')
> >>>> AS
> >>>>>>>>>>>> MAP<STRING, STRING>)['table']
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> the expression is a little complex for user, Could we only
> >>>> support
> >>>>>>>>>>>> necessary metas with simple DataType as following?
> >>>>>>>>>>>>> tableName STRING AS
> >>>>>>>> CAST(SYSTEM_METADATA('debeuim-json.source.table') AS
> >>>>>>>>>>>> STRING),
> >>>>>>>>>>>>> transactionTime LONG AS
> >>>>>>>>>>>> CAST(SYSTEM_METADATA('debeuim-json.source.ts_ms') AS BIGINT),
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> In this way, we can simplify the expression, the mainly used
> >>>>>>>> metadata in
> >>>>>>>>>>>> changelog format may include
> >>>>>>>> 'database','table','source.ts_ms','ts_ms' from
> >>>>>>>>>>>> my side,
> >>>>>>>>>>>>> maybe we could only support them at first version.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Both Debezium and Canal have above four metadata, and I‘m
> >>> willing
> >>>>>> to
> >>>>>>>>>>>> take some subtasks in next development if necessary.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Debezium:
> >>>>>>>>>>>>> {
> >>>>>>>>>>>>>        "before": null,
> >>>>>>>>>>>>>        "after": {  "id": 101,"name": "scooter"},
> >>>>>>>>>>>>>        "source": {
> >>>>>>>>>>>>>          "db": "inventory",                  # 1. database
> >> name
> >>>> the
> >>>>>>>>>>>> changelog belongs to.
> >>>>>>>>>>>>>          "table": "products",                # 2. table name
> >> the
> >>>>>>>> changelog
> >>>>>>>>>>>> belongs to.
> >>>>>>>>>>>>>          "ts_ms": 1589355504100,             # 3. timestamp
> of
> >>> the
> >>>>>>>> change
> >>>>>>>>>>>> happened in database system, i.e.: transaction time in
> >> database.
> >>>>>>>>>>>>>          "connector": "mysql",
> >>>>>>>>>>>>>          ….
> >>>>>>>>>>>>>        },
> >>>>>>>>>>>>>        "ts_ms": 1589355606100,              # 4. timestamp
> >> when
> >>>> the
> >>>>>>>> debezium
> >>>>>>>>>>>> processed the changelog.
> >>>>>>>>>>>>>        "op": "c",
> >>>>>>>>>>>>>        "transaction": null
> >>>>>>>>>>>>> }
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Canal:
> >>>>>>>>>>>>> {
> >>>>>>>>>>>>>        "data": [{  "id": "102", "name": "car battery" }],
> >>>>>>>>>>>>>        "database": "inventory",      # 1. database name the
> >>>> changelog
> >>>>>>>>>>>> belongs to.
> >>>>>>>>>>>>>        "table": "products",          # 2. table name the
> >>> changelog
> >>>>>>>> belongs
> >>>>>>>>>>>> to.
> >>>>>>>>>>>>>        "es": 1589374013000,          # 3. execution time of
> >> the
> >>>>>> change
> >>>>>>>> in
> >>>>>>>>>>>> database system, i.e.: transaction time in database.
> >>>>>>>>>>>>>        "ts": 1589374013680,          # 4. timestamp when the
> >>>> cannal
> >>>>>>>>>>>> processed the changelog.
> >>>>>>>>>>>>>        "isDdl": false,
> >>>>>>>>>>>>>        "mysqlType": {},
> >>>>>>>>>>>>>        ....
> >>>>>>>>>>>>> }
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Best
> >>>>>>>>>>>>> Leonard
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> 在 2020年9月8日,11:57,Danny Chan <yuzhao....@gmail.com> 写道:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thanks Timo ~
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> The FLIP was already in pretty good shape, I have only 2
> >>>> questions
> >>>>>>>> here:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 1. “`CAST(SYSTEM_METADATA("offset") AS INT)` would be a
> >> valid
> >>>>>>>> read-only
> >>>>>>>>>>>> computed column for Kafka and can be extracted by the
> >> planner.”
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> What is the pros we follow the SQL-SERVER syntax here ?
> >>> Usually
> >>>> an
> >>>>>>>>>>>> expression return type can be inferred automatically. But I
> >>> guess
> >>>>>>>>>>>> SQL-SERVER does not have function like SYSTEM_METADATA which
> >>>>>> actually
> >>>>>>>> does
> >>>>>>>>>>>> not have a specific return type.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> And why not use the Oracle or MySQL syntax there ?
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> column_name [datatype] [GENERATED ALWAYS] AS (expression)
> >>>>>> [VIRTUAL]
> >>>>>>>>>>>>>> Which is more straight-forward.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 2. “SYSTEM_METADATA("offset")` returns the NULL type by
> >>> default”
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> The default type should not be NULL because only NULL
> >> literal
> >>>> does
> >>>>>>>>>>>> that. Usually we use ANY as the type if we do not know the
> >>>> specific
> >>>>>>>> type in
> >>>>>>>>>>>> the SQL context. ANY means the physical value can be any java
> >>>>>> object.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> [1]
> >>> https://oracle-base.com/articles/11g/virtual-columns-11gr1
> >>>>>>>>>>>>>> [2]
> >>>>>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>
> >>>
> >>
> https://dev.mysql.com/doc/refman/5.7/en/create-table-generated-columns.html
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>> Danny Chan
> >>>>>>>>>>>>>> 在 2020年9月4日 +0800 PM4:48,Timo Walther <twal...@apache.org
> >>>> ,写道:
> >>>>>>>>>>>>>>> Hi everyone,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> I completely reworked FLIP-107. It now covers the full
> >> story
> >>>> how
> >>>>>> to
> >>>>>>>>>>>> read
> >>>>>>>>>>>>>>> and write metadata from/to connectors and formats. It
> >>> considers
> >>>>>>>> all of
> >>>>>>>>>>>>>>> the latest FLIPs, namely FLIP-95, FLIP-132 and FLIP-122. It
> >>>>>>>> introduces
> >>>>>>>>>>>>>>> the concept of PERSISTED computed columns and leaves out
> >>>>>>>> partitioning
> >>>>>>>>>>>>>>> for now.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Looking forward to your feedback.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>> Timo
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On 04.03.20 09:45, Kurt Young wrote:
> >>>>>>>>>>>>>>>> Sorry, forgot one question.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> 4. Can we make the value.fields-include more orthogonal?
> >>> Like
> >>>>>> one
> >>>>>>>> can
> >>>>>>>>>>>>>>>> specify it as "EXCEPT_KEY, EXCEPT_TIMESTAMP".
> >>>>>>>>>>>>>>>> With current EXCEPT_KEY and EXCEPT_KEY_TIMESTAMP, users
> >> can
> >>>> not
> >>>>>>>>>>>> config to
> >>>>>>>>>>>>>>>> just ignore timestamp but keep key.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>> Kurt
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On Wed, Mar 4, 2020 at 4:42 PM Kurt Young <
> >> ykt...@gmail.com
> >>>>
> >>>>>>>> wrote:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Hi Dawid,
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> I have a couple of questions around key fields, actually
> >> I
> >>>> also
> >>>>>>>> have
> >>>>>>>>>>>> some
> >>>>>>>>>>>>>>>>> other questions but want to be focused on key fields
> >> first.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> 1. I don't fully understand the usage of "key.fields". Is
> >>>> this
> >>>>>>>>>>>> option only
> >>>>>>>>>>>>>>>>> valid during write operation? Because for
> >>>>>>>>>>>>>>>>> reading, I can't imagine how such options can be
> >> applied. I
> >>>>>> would
> >>>>>>>>>>>> expect
> >>>>>>>>>>>>>>>>> that there might be a SYSTEM_METADATA("key")
> >>>>>>>>>>>>>>>>> to read and assign the key to a normal field?
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> 2. If "key.fields" is only valid in write operation, I
> >> want
> >>>> to
> >>>>>>>>>>>> propose we
> >>>>>>>>>>>>>>>>> can simplify the options to not introducing
> >> key.format.type
> >>>> and
> >>>>>>>>>>>>>>>>> other related options. I think a single "key.field" (not
> >>>>>> fields)
> >>>>>>>>>>>> would be
> >>>>>>>>>>>>>>>>> enough, users can use UDF to calculate whatever key they
> >>>>>>>>>>>>>>>>> want before sink.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> 3. Also I don't want to introduce "value.format.type" and
> >>>>>>>>>>>>>>>>> "value.format.xxx" with the "value" prefix. Not every
> >>>> connector
> >>>>>>>> has a
> >>>>>>>>>>>>>>>>> concept
> >>>>>>>>>>>>>>>>> of key and values. The old parameter "format.type"
> >> already
> >>>> good
> >>>>>>>>>>>> enough to
> >>>>>>>>>>>>>>>>> use.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>>> Kurt
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On Tue, Mar 3, 2020 at 10:40 PM Jark Wu <
> >> imj...@gmail.com>
> >>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Thanks Dawid,
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> I have two more questions.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> SupportsMetadata
> >>>>>>>>>>>>>>>>>> Introducing SupportsMetadata sounds good to me. But I
> >> have
> >>>>>> some
> >>>>>>>>>>>> questions
> >>>>>>>>>>>>>>>>>> regarding to this interface.
> >>>>>>>>>>>>>>>>>> 1) How do the source know what the expected return type
> >> of
> >>>>>> each
> >>>>>>>>>>>> metadata?
> >>>>>>>>>>>>>>>>>> 2) Where to put the metadata fields? Append to the
> >>> existing
> >>>>>>>> physical
> >>>>>>>>>>>>>>>>>> fields?
> >>>>>>>>>>>>>>>>>> If yes, I would suggest to change the signature to
> >>>>>> `TableSource
> >>>>>>>>>>>>>>>>>> appendMetadataFields(String[] metadataNames, DataType[]
> >>>>>>>>>>>> metadataTypes)`
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> SYSTEM_METADATA("partition")
> >>>>>>>>>>>>>>>>>> Can SYSTEM_METADATA() function be used nested in a
> >>> computed
> >>>>>>>> column
> >>>>>>>>>>>>>>>>>> expression? If yes, how to specify the return type of
> >>>>>>>>>>>> SYSTEM_METADATA?
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>>>> Jark
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> On Tue, 3 Mar 2020 at 17:06, Dawid Wysakowicz <
> >>>>>>>>>>>> dwysakow...@apache.org>
> >>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Hi,
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 1. I thought a bit more on how the source would emit
> >> the
> >>>>>>>> columns
> >>>>>>>>>>>> and I
> >>>>>>>>>>>>>>>>>>> now see its not exactly the same as regular columns. I
> >>> see
> >>>> a
> >>>>>>>> need
> >>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>> elaborate a bit more on that in the FLIP as you asked,
> >>>> Jark.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> I do agree mostly with Danny on how we should do that.
> >>> One
> >>>>>>>>>>>> additional
> >>>>>>>>>>>>>>>>>>> things I would introduce is an
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> interface SupportsMetadata {
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> boolean supportsMetadata(Set<String> metadataFields);
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> TableSource generateMetadataFields(Set<String>
> >>>>>> metadataFields);
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> }
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> This way the source would have to declare/emit only the
> >>>>>>>> requested
> >>>>>>>>>>>>>>>>>>> metadata fields. In order not to clash with user
> >> defined
> >>>>>>>> fields.
> >>>>>>>>>>>> When
> >>>>>>>>>>>>>>>>>>> emitting the metadata field I would prepend the column
> >>> name
> >>>>>>>> with
> >>>>>>>>>>>>>>>>>>> __system_{property_name}. Therefore when requested
> >>>>>>>>>>>>>>>>>>> SYSTEM_METADATA("partition") the source would append a
> >>>> field
> >>>>>>>>>>>>>>>>>>> __system_partition to the schema. This would be never
> >>>> visible
> >>>>>>>> to
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> user as it would be used only for the subsequent
> >> computed
> >>>>>>>> columns.
> >>>>>>>>>>>> If
> >>>>>>>>>>>>>>>>>>> that makes sense to you, I will update the FLIP with
> >> this
> >>>>>>>>>>>> description.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 2. CAST vs explicit type in computed columns
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Here I agree with Danny. It is also the current state
> >> of
> >>>> the
> >>>>>>>>>>>> proposal.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 3. Partitioning on computed column vs function
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Here I also agree with Danny. I also think those are
> >>>>>>>> orthogonal. I
> >>>>>>>>>>>> would
> >>>>>>>>>>>>>>>>>>> leave out the STORED computed columns out of the
> >>>> discussion.
> >>>>>> I
> >>>>>>>>>>>> don't see
> >>>>>>>>>>>>>>>>>>> how do they relate to the partitioning. I already put
> >>> both
> >>>> of
> >>>>>>>> those
> >>>>>>>>>>>>>>>>>>> cases in the document. We can either partition on a
> >>>> computed
> >>>>>>>>>>>> column or
> >>>>>>>>>>>>>>>>>>> use a udf in a partioned by clause. I am fine with
> >>> leaving
> >>>>>> out
> >>>>>>>> the
> >>>>>>>>>>>>>>>>>>> partitioning by udf in the first version if you still
> >>> have
> >>>>>> some
> >>>>>>>>>>>>>>>>>> concerns.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> As for your question Danny. It depends which
> >> partitioning
> >>>>>>>> strategy
> >>>>>>>>>>>> you
> >>>>>>>>>>>>>>>>>> use.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> For the HASH partitioning strategy I thought it would
> >>> work
> >>>> as
> >>>>>>>> you
> >>>>>>>>>>>>>>>>>>> explained. It would be N = MOD(expr, num). I am not
> >> sure
> >>>>>>>> though if
> >>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>> should introduce the PARTITIONS clause. Usually Flink
> >>> does
> >>>>>> not
> >>>>>>>> own
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> data and the partitions are already an intrinsic
> >> property
> >>>> of
> >>>>>>>> the
> >>>>>>>>>>>>>>>>>>> underlying source e.g. for kafka we do not create
> >> topics,
> >>>> but
> >>>>>>>> we
> >>>>>>>>>>>> just
> >>>>>>>>>>>>>>>>>>> describe pre-existing pre-partitioned topic.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 4. timestamp vs timestamp.field vs connector.field vs
> >> ...
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> I am fine with changing it to timestamp.field to be
> >>>>>> consistent
> >>>>>>>> with
> >>>>>>>>>>>>>>>>>>> other value.fields and key.fields. Actually that was
> >> also
> >>>> my
> >>>>>>>>>>>> initial
> >>>>>>>>>>>>>>>>>>> proposal in a first draft I prepared. I changed it
> >>>> afterwards
> >>>>>>>> to
> >>>>>>>>>>>> shorten
> >>>>>>>>>>>>>>>>>>> the key.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Dawid
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> On 03/03/2020 09:00, Danny Chan wrote:
> >>>>>>>>>>>>>>>>>>>> Thanks Dawid for bringing up this discussion, I think
> >> it
> >>>> is
> >>>>>> a
> >>>>>>>>>>>> useful
> >>>>>>>>>>>>>>>>>>> feature ~
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> About how the metadata outputs from source
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> I think it is completely orthogonal, computed column
> >>> push
> >>>>>>>> down is
> >>>>>>>>>>>>>>>>>>> another topic, this should not be a blocker but a
> >>>> promotion,
> >>>>>>>> if we
> >>>>>>>>>>>> do
> >>>>>>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>>>> have any filters on the computed column, there is no
> >> need
> >>>> to
> >>>>>>>> do any
> >>>>>>>>>>>>>>>>>>> pushings; the source node just emit the complete record
> >>>> with
> >>>>>>>> full
> >>>>>>>>>>>>>>>>>> metadata
> >>>>>>>>>>>>>>>>>>> with the declared physical schema, then when generating
> >>> the
> >>>>>>>> virtual
> >>>>>>>>>>>>>>>>>>> columns, we would extract the metadata info and output
> >> as
> >>>>>> full
> >>>>>>>>>>>>>>>>>> columns(with
> >>>>>>>>>>>>>>>>>>> full schema).
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> About the type of metadata column
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Personally i prefer explicit type instead of CAST,
> >> they
> >>>> are
> >>>>>>>>>>>> symantic
> >>>>>>>>>>>>>>>>>>> equivalent though, explict type is more
> >> straight-forward
> >>>> and
> >>>>>>>> we can
> >>>>>>>>>>>>>>>>>> declare
> >>>>>>>>>>>>>>>>>>> the nullable attribute there.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> About option A: partitioning based on acomputed column
> >>> VS
> >>>>>>>> option
> >>>>>>>>>>>> B:
> >>>>>>>>>>>>>>>>>>> partitioning with just a function
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>      From the FLIP, it seems that B's partitioning is
> >>> just
> >>>> a
> >>>>>>>> strategy
> >>>>>>>>>>>> when
> >>>>>>>>>>>>>>>>>>> writing data, the partiton column is not included in
> >> the
> >>>>>> table
> >>>>>>>>>>>> schema,
> >>>>>>>>>>>>>>>>>> so
> >>>>>>>>>>>>>>>>>>> it's just useless when reading from that.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> - Compared to A, we do not need to generate the
> >>> partition
> >>>>>>>> column
> >>>>>>>>>>>> when
> >>>>>>>>>>>>>>>>>>> selecting from the table(but insert into)
> >>>>>>>>>>>>>>>>>>>> - For A we can also mark the column as STORED when we
> >>> want
> >>>>>> to
> >>>>>>>>>>>> persist
> >>>>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> So in my opition they are orthogonal, we can support
> >>>> both, i
> >>>>>>>> saw
> >>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>> MySQL/Oracle[1][2] would suggest to also define the
> >>>>>> PARTITIONS
> >>>>>>>>>>>> num, and
> >>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> partitions are managed under a "tablenamespace", the
> >>>>>> partition
> >>>>>>>> in
> >>>>>>>>>>>> which
> >>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> record is stored is partition number N, where N =
> >>> MOD(expr,
> >>>>>>>> num),
> >>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>> your
> >>>>>>>>>>>>>>>>>>> design, which partiton the record would persist ?
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> [1]
> >>>>>>>>>>>>
> >> https://dev.mysql.com/doc/refman/5.7/en/partitioning-hash.html
> >>>>>>>>>>>>>>>>>>>> [2]
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>
> >>>
> >>
> https://docs.oracle.com/database/121/VLDBG/GUID-F023D3ED-262F-4B19-950A-D3C8F8CDB4F4.htm#VLDBG1270
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>>>>>> Danny Chan
> >>>>>>>>>>>>>>>>>>>> 在 2020年3月2日 +0800 PM6:16,Dawid Wysakowicz <
> >>>>>>>> dwysakow...@apache.org
> >>>>>>>>>>>>>>>>>>> ,写道:
> >>>>>>>>>>>>>>>>>>>>> Hi Jark,
> >>>>>>>>>>>>>>>>>>>>> Ad. 2 I added a section to discuss relation to
> >> FLIP-63
> >>>>>>>>>>>>>>>>>>>>> Ad. 3 Yes, I also tried to somewhat keep hierarchy of
> >>>>>>>> properties.
> >>>>>>>>>>>>>>>>>>> Therefore you have the key.format.type.
> >>>>>>>>>>>>>>>>>>>>> I also considered exactly what you are suggesting
> >>>>>> (prefixing
> >>>>>>>> with
> >>>>>>>>>>>>>>>>>>> connector or kafka). I should've put that into an
> >>>>>>>> Option/Rejected
> >>>>>>>>>>>>>>>>>>> alternatives.
> >>>>>>>>>>>>>>>>>>>>> I agree timestamp, key.*, value.* are connector
> >>>> properties.
> >>>>>>>> Why I
> >>>>>>>>>>>>>>>>>>> wanted to suggest not adding that prefix in the first
> >>>> version
> >>>>>>>> is
> >>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>> actually all the properties in the WITH section are
> >>>> connector
> >>>>>>>>>>>>>>>>>> properties.
> >>>>>>>>>>>>>>>>>>> Even format is in the end a connector property as some
> >> of
> >>>> the
> >>>>>>>>>>>> sources
> >>>>>>>>>>>>>>>>>> might
> >>>>>>>>>>>>>>>>>>> not have a format, imo. The benefit of not adding the
> >>>> prefix
> >>>>>> is
> >>>>>>>>>>>> that it
> >>>>>>>>>>>>>>>>>>> makes the keys a bit shorter. Imagine prefixing all the
> >>>>>>>> properties
> >>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>>> connector (or if we go with FLINK-12557:
> >> elasticsearch):
> >>>>>>>>>>>>>>>>>>>>> elasticsearch.key.format.type: csv
> >>>>>>>>>>>>>>>>>>>>> elasticsearch.key.format.field: ....
> >>>>>>>>>>>>>>>>>>>>> elasticsearch.key.format.delimiter: ....
> >>>>>>>>>>>>>>>>>>>>> elasticsearch.key.format.*: ....
> >>>>>>>>>>>>>>>>>>>>> I am fine with doing it though if this is a preferred
> >>>>>>>> approach
> >>>>>>>>>>>> in the
> >>>>>>>>>>>>>>>>>>> community.
> >>>>>>>>>>>>>>>>>>>>> Ad in-line comments:
> >>>>>>>>>>>>>>>>>>>>> I forgot to update the `value.fields.include`
> >> property.
> >>>> It
> >>>>>>>>>>>> should be
> >>>>>>>>>>>>>>>>>>> value.fields-include. Which I think you also suggested
> >> in
> >>>> the
> >>>>>>>>>>>> comment,
> >>>>>>>>>>>>>>>>>>> right?
> >>>>>>>>>>>>>>>>>>>>> As for the cast vs declaring output type of computed
> >>>>>> column.
> >>>>>>>> I
> >>>>>>>>>>>> think
> >>>>>>>>>>>>>>>>>>> it's better not to use CAST, but declare a type of an
> >>>>>>>> expression
> >>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>> later
> >>>>>>>>>>>>>>>>>>> on infer the output type of SYSTEM_METADATA. The reason
> >>> is
> >>>> I
> >>>>>>>> think
> >>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>> way
> >>>>>>>>>>>>>>>>>>> it will be easier to implement e.g. filter push downs
> >>> when
> >>>>>>>> working
> >>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> native types of the source, e.g. in case of Kafka's
> >>>> offset, i
> >>>>>>>>>>>> think it's
> >>>>>>>>>>>>>>>>>>> better to pushdown long rather than string. This could
> >>> let
> >>>> us
> >>>>>>>> push
> >>>>>>>>>>>>>>>>>>> expression like e.g. offset > 12345 & offset < 59382.
> >>>>>>>> Otherwise we
> >>>>>>>>>>>> would
> >>>>>>>>>>>>>>>>>>> have to push down cast(offset, long) > 12345 &&
> >>>> cast(offset,
> >>>>>>>> long)
> >>>>>>>>>>>> <
> >>>>>>>>>>>>>>>>>> 59382.
> >>>>>>>>>>>>>>>>>>> Moreover I think we need to introduce the type for
> >>> computed
> >>>>>>>> columns
> >>>>>>>>>>>>>>>>>> anyway
> >>>>>>>>>>>>>>>>>>> to support functions that infer output type based on
> >>>> expected
> >>>>>>>>>>>> return
> >>>>>>>>>>>>>>>>>> type.
> >>>>>>>>>>>>>>>>>>>>> As for the computed column push down. Yes,
> >>>> SYSTEM_METADATA
> >>>>>>>> would
> >>>>>>>>>>>> have
> >>>>>>>>>>>>>>>>>>> to be pushed down to the source. If it is not possible
> >>> the
> >>>>>>>> planner
> >>>>>>>>>>>>>>>>>> should
> >>>>>>>>>>>>>>>>>>> fail. As far as I know computed columns push down will
> >> be
> >>>>>> part
> >>>>>>>> of
> >>>>>>>>>>>> source
> >>>>>>>>>>>>>>>>>>> rework, won't it? ;)
> >>>>>>>>>>>>>>>>>>>>> As for the persisted computed column. I think it is
> >>>>>>>> completely
> >>>>>>>>>>>>>>>>>>> orthogonal. In my current proposal you can also
> >> partition
> >>>> by
> >>>>>> a
> >>>>>>>>>>>> computed
> >>>>>>>>>>>>>>>>>>> column. The difference between using a udf in
> >> partitioned
> >>>> by
> >>>>>> vs
> >>>>>>>>>>>>>>>>>> partitioned
> >>>>>>>>>>>>>>>>>>> by a computed column is that when you partition by a
> >>>> computed
> >>>>>>>>>>>> column
> >>>>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>> column must be also computed when reading the table. If
> >>> you
> >>>>>>>> use a
> >>>>>>>>>>>> udf in
> >>>>>>>>>>>>>>>>>>> the partitioned by, the expression is computed only
> >> when
> >>>>>>>> inserting
> >>>>>>>>>>>> into
> >>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>> table.
> >>>>>>>>>>>>>>>>>>>>> Hope this answers some of your questions. Looking
> >>> forward
> >>>>>> for
> >>>>>>>>>>>> further
> >>>>>>>>>>>>>>>>>>> suggestions.
> >>>>>>>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>>>>>>> Dawid
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> On 02/03/2020 05:18, Jark Wu wrote:
> >>>>>>>>>>>>>>>>>>>>>> Hi,
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Thanks Dawid for starting such a great discussion.
> >>>> Reaing
> >>>>>>>>>>>> metadata
> >>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>> key-part information from source is an important
> >>> feature
> >>>>>> for
> >>>>>>>>>>>>>>>>>> streaming
> >>>>>>>>>>>>>>>>>>>>>> users.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> In general, I agree with the proposal of the FLIP.
> >>>>>>>>>>>>>>>>>>>>>> I will leave my thoughts and comments here:
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> 1) +1 to use connector properties instead of
> >>> introducing
> >>>>>>>> HEADER
> >>>>>>>>>>>>>>>>>>> keyword as
> >>>>>>>>>>>>>>>>>>>>>> the reason you mentioned in the FLIP.
> >>>>>>>>>>>>>>>>>>>>>> 2) we already introduced PARTITIONED BY in FLIP-63.
> >>>> Maybe
> >>>>>> we
> >>>>>>>>>>>> should
> >>>>>>>>>>>>>>>>>>> add a
> >>>>>>>>>>>>>>>>>>>>>> section to explain what's the relationship between
> >>> them.
> >>>>>>>>>>>>>>>>>>>>>> Do their concepts conflict? Could INSERT PARTITION
> >> be
> >>>> used
> >>>>>>>> on
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> PARTITIONED table in this FLIP?
> >>>>>>>>>>>>>>>>>>>>>> 3) Currently, properties are hierarchical in Flink
> >>> SQL.
> >>>>>>>> Shall we
> >>>>>>>>>>>>>>>>>> make
> >>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> new introduced properties more hierarchical?
> >>>>>>>>>>>>>>>>>>>>>> For example, "timestamp" => "connector.timestamp"?
> >>>>>>>> (actually, I
> >>>>>>>>>>>>>>>>>>> prefer
> >>>>>>>>>>>>>>>>>>>>>> "kafka.timestamp" which is another improvement for
> >>>>>>>> properties
> >>>>>>>>>>>>>>>>>>> FLINK-12557)
> >>>>>>>>>>>>>>>>>>>>>> A single "timestamp" in properties may mislead users
> >>>> that
> >>>>>>>> the
> >>>>>>>>>>>>>>>>>> field
> >>>>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>>> a rowtime attribute.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> I also left some minor comments in the FLIP.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>>>>>>> Jark
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> On Sun, 1 Mar 2020 at 22:30, Dawid Wysakowicz <
> >>>>>>>>>>>>>>>>>> dwysakow...@apache.org>
> >>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Hi,
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> I would like to propose an improvement that would
> >>>> enable
> >>>>>>>>>>>> reading
> >>>>>>>>>>>>>>>>>> table
> >>>>>>>>>>>>>>>>>>>>>>> columns from different parts of source records.
> >>> Besides
> >>>>>> the
> >>>>>>>>>>>> main
> >>>>>>>>>>>>>>>>>>> payload
> >>>>>>>>>>>>>>>>>>>>>>> majority (if not all of the sources) expose
> >>> additional
> >>>>>>>>>>>>>>>>>> information. It
> >>>>>>>>>>>>>>>>>>>>>>> can be simply a read-only metadata such as offset,
> >>>>>>>> ingestion
> >>>>>>>>>>>> time
> >>>>>>>>>>>>>>>>>> or a
> >>>>>>>>>>>>>>>>>>>>>>> read and write parts of the record that contain
> >> data
> >>>> but
> >>>>>>>>>>>>>>>>>> additionally
> >>>>>>>>>>>>>>>>>>>>>>> serve different purposes (partitioning, compaction
> >>>> etc.),
> >>>>>>>> e.g.
> >>>>>>>>>>>> key
> >>>>>>>>>>>>>>>>>> or
> >>>>>>>>>>>>>>>>>>>>>>> timestamp in Kafka.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> We should make it possible to read and write data
> >>> from
> >>>>>> all
> >>>>>>>> of
> >>>>>>>>>>>> those
> >>>>>>>>>>>>>>>>>>>>>>> locations. In this proposal I discuss reading
> >>>>>> partitioning
> >>>>>>>>>>>> data,
> >>>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>>>>> completeness this proposal discusses also the
> >>>>>> partitioning
> >>>>>>>> when
> >>>>>>>>>>>>>>>>>>> writing
> >>>>>>>>>>>>>>>>>>>>>>> data out.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> I am looking forward to your comments.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> You can access the FLIP here:
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>
> >>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records?src=contextnavpagetreemode
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Dawid
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>>
> >>>
> >>
> >
>
>

Reply via email to