"offset INT SYSTEM_METADATA("offset")"

This is actually Oracle or MySQL style computed column syntax.

"You are right that one could argue that "timestamp", "headers" are
something like "key" and "value""

I have the same feeling, both key value and headers timestamp are *real* data
stored in the consumed record, they are not computed or generated.

"Trying to solve everything via properties sounds rather like a hack to
me"

Things are not that hack if we can unify the routines or the definitions
(all from the computed column way or all from the table options), i also
think that it is a hacky that we mix in 2 kinds of syntax for different
kinds of metadata (read-only and read-write). In this FLIP, we declare the
Kafka key fields with table options but SYSTEM_METADATA for other metadata,
that is a hacky thing or something in-consistent.

Kurt Young <ykt...@gmail.com> 于2020年9月9日周三 下午4:48写道:

>  I would vote for `offset INT SYSTEM_METADATA("offset")`.
>
> I don't think we can stick with the SQL standard in DDL part forever,
> especially as there are more and more
> requirements coming from different connectors and external systems.
>
> Best,
> Kurt
>
>
> On Wed, Sep 9, 2020 at 4:40 PM Timo Walther <twal...@apache.org> wrote:
>
> > Hi Jark,
> >
> > now we are back at the original design proposed by Dawid :D Yes, we
> > should be cautious about adding new syntax. But the length of this
> > discussion shows that we are looking for a good long-term solution. In
> > this case I would rather vote for a deep integration into the syntax.
> >
> > Computed columns are also not SQL standard compliant. And our DDL is
> > neither, so we have some degree of freedom here.
> >
> > Trying to solve everything via properties sounds rather like a hack to
> > me. You are right that one could argue that "timestamp", "headers" are
> > something like "key" and "value". However, mixing
> >
> > `offset AS SYSTEM_METADATA("offset")`
> >
> > and
> >
> > `'timestamp.field' = 'ts'`
> >
> > looks more confusing to users that an explicit
> >
> > `offset AS CAST(SYSTEM_METADATA("offset") AS INT)`
> >
> > or
> >
> > `offset INT SYSTEM_METADATA("offset")`
> >
> > that is symetric for both source and sink.
> >
> > What do others think?
> >
> > Regards,
> > Timo
> >
> >
> > On 09.09.20 10:09, Jark Wu wrote:
> > > Hi everyone,
> > >
> > > I think we have a conclusion that the writable metadata shouldn't be
> > > defined as a computed column, but a normal column.
> > >
> > > "timestamp STRING SYSTEM_METADATA('timestamp')" is one of the
> approaches.
> > > However, it is not SQL standard compliant, we need to be cautious
> enough
> > > when adding new syntax.
> > > Besides, we have to introduce the `PERSISTED` or `VIRTUAL` keyword to
> > > resolve the query-sink schema problem if it is read-only metadata. That
> > > adds more stuff to learn for users.
> > >
> > >>From my point of view, the "timestamp", "headers" are something like
> > "key"
> > > and "value" that stores with the real data. So why not define the
> > > "timestamp" in the same way with "key" by using a "timestamp.field"
> > > connector option?
> > > On the other side, the read-only metadata, such as "offset", shouldn't
> be
> > > defined as a normal column. So why not use the existing computed column
> > > syntax for such metadata? Then we don't have the query-sink schema
> > problem.
> > > So here is my proposal:
> > >
> > > CREATE TABLE kafka_table (
> > >    id BIGINT,
> > >    name STRING,
> > >    col1 STRING,
> > >    col2 STRING,
> > >    ts TIMESTAMP(3) WITH LOCAL TIME ZONE,    -- ts is a normal field, so
> > can
> > > be read and written.
> > >    offset AS SYSTEM_METADATA("offset")
> > > ) WITH (
> > >    'connector' = 'kafka',
> > >    'topic' = 'test-topic',
> > >    'key.fields' = 'id, name',
> > >    'key.format' = 'csv',
> > >    'value.format' = 'avro',
> > >    'timestamp.field' = 'ts'    -- define the mapping of Kafka timestamp
> > > );
> > >
> > > INSERT INTO kafka_table
> > > SELECT id, name, col1, col2, rowtime FROM another_table;
> > >
> > > I think this can solve all the problems without introducing any new
> > syntax.
> > > The only minor disadvantage is that we separate the definition
> way/syntax
> > > of read-only metadata and read-write fields.
> > > However, I don't think this is a big problem.
> > >
> > > Best,
> > > Jark
> > >
> > >
> > > On Wed, 9 Sep 2020 at 15:09, Timo Walther <twal...@apache.org> wrote:
> > >
> > >> Hi Kurt,
> > >>
> > >> thanks for sharing your opinion. I'm totally up for not reusing
> computed
> > >> columns. I think Jark was a big supporter of this syntax, @Jark are
> you
> > >> fine with this as well? The non-computed column approach was only a
> > >> "slightly rejected alternative".
> > >>
> > >> Furthermore, we would need to think about how such a new design
> > >> influences the LIKE clause though.
> > >>
> > >> However, we should still keep the `PERSISTED` keyword as it influences
> > >> the query->sink schema. If you look at the list of metadata for
> existing
> > >> connectors and formats, we currently offer only two writable metadata
> > >> fields. Otherwise, one would need to declare two tables whenever a
> > >> metadata columns is read (one for the source, one for the sink). This
> > >> can be quite inconvientient e.g. for just reading the topic.
> > >>
> > >> Regards,
> > >> Timo
> > >>
> > >>
> > >> On 09.09.20 08:52, Kurt Young wrote:
> > >>> I also share the concern that reusing the computed column syntax but
> > have
> > >>> different semantics
> > >>> would confuse users a lot.
> > >>>
> > >>> Besides, I think metadata fields are conceptually not the same with
> > >>> computed columns. The metadata
> > >>> field is a connector specific thing and it only contains the
> > information
> > >>> that where does the field come
> > >>> from (during source) or where does the field need to write to (during
> > >>> sink). It's more similar with normal
> > >>> fields, with assumption that all these fields need going to the data
> > >> part.
> > >>>
> > >>> Thus I'm more lean to the rejected alternative that Timo mentioned.
> > And I
> > >>> think we don't need the
> > >>> PERSISTED keyword, SYSTEM_METADATA should be enough.
> > >>>
> > >>> During implementation, the framework only needs to pass such <field,
> > >>> metadata field> information to the
> > >>> connector, and the logic of handling such fields inside the connector
> > >>> should be straightforward.
> > >>>
> > >>> Regarding the downside Timo mentioned:
> > >>>
> > >>>> The disadvantage is that users cannot call UDFs or parse timestamps.
> > >>>
> > >>> I think this is fairly simple to solve. Since the metadata field
> isn't
> > a
> > >>> computed column anymore, we can support
> > >>> referencing such fields in the computed column. For example:
> > >>>
> > >>> CREATE TABLE kafka_table (
> > >>>        id BIGINT,
> > >>>        name STRING,
> > >>>        timestamp STRING SYSTEM_METADATA("timestamp"),  // get the
> > >> timestamp
> > >>> field from metadata
> > >>>        ts AS to_timestamp(timestamp) // normal computed column, parse
> > the
> > >>> string to TIMESTAMP type by using the metadata field
> > >>> ) WITH (
> > >>>       ...
> > >>> )
> > >>>
> > >>> Best,
> > >>> Kurt
> > >>>
> > >>>
> > >>> On Tue, Sep 8, 2020 at 11:57 PM Timo Walther <twal...@apache.org>
> > wrote:
> > >>>
> > >>>> Hi Leonard,
> > >>>>
> > >>>> the only alternative I see is that we introduce a concept that is
> > >>>> completely different to computed columns. This is also mentioned in
> > the
> > >>>> rejected alternative section of the FLIP. Something like:
> > >>>>
> > >>>> CREATE TABLE kafka_table (
> > >>>>        id BIGINT,
> > >>>>        name STRING,
> > >>>>        timestamp INT SYSTEM_METADATA("timestamp") PERSISTED,
> > >>>>        headers MAP<STRING, BYTES> SYSTEM_METADATA("headers")
> PERSISTED
> > >>>> ) WITH (
> > >>>>       ...
> > >>>> )
> > >>>>
> > >>>> This way we would avoid confusion at all and can easily map columns
> to
> > >>>> metadata columns. The disadvantage is that users cannot call UDFs or
> > >>>> parse timestamps. This would need to be done in a real computed
> > column.
> > >>>>
> > >>>> I'm happy about better alternatives.
> > >>>>
> > >>>> Regards,
> > >>>> Timo
> > >>>>
> > >>>>
> > >>>> On 08.09.20 15:37, Leonard Xu wrote:
> > >>>>> HI, Timo
> > >>>>>
> > >>>>> Thanks for driving this FLIP.
> > >>>>>
> > >>>>> Sorry but I have a concern about Writing metadata via
> > DynamicTableSink
> > >>>> section:
> > >>>>>
> > >>>>> CREATE TABLE kafka_table (
> > >>>>>      id BIGINT,
> > >>>>>      name STRING,
> > >>>>>      timestamp AS CAST(SYSTEM_METADATA("timestamp") AS BIGINT)
> > >> PERSISTED,
> > >>>>>      headers AS CAST(SYSTEM_METADATA("headers") AS MAP<STRING,
> > BYTES>)
> > >>>> PERSISTED
> > >>>>> ) WITH (
> > >>>>>      ...
> > >>>>> )
> > >>>>> An insert statement could look like:
> > >>>>>
> > >>>>> INSERT INTO kafka_table VALUES (
> > >>>>>      (1, "ABC", 1599133672, MAP('checksum', computeChecksum(...)))
> > >>>>> )
> > >>>>>
> > >>>>> The proposed INERT syntax does not make sense to me, because it
> > >> contains
> > >>>> computed(generated) column.
> > >>>>> Both SQL server and Postgresql do not allow to insert value to
> > computed
> > >>>> columns even they are persisted, this boke the generated column
> > >> semantics
> > >>>> and may confuse user much.
> > >>>>>
> > >>>>> For SQL server computed column[1]:
> > >>>>>> column_name AS computed_column_expression [ PERSISTED [ NOT NULL ]
> > >> ]...
> > >>>>>> NOTE: A computed column cannot be the target of an INSERT or
> UPDATE
> > >>>> statement.
> > >>>>>
> > >>>>> For Postgresql generated column[2]:
> > >>>>>>     height_in numeric GENERATED ALWAYS AS (height_cm / 2.54)
> STORED
> > >>>>>> NOTE: A generated column cannot be written to directly. In INSERT
> or
> > >>>> UPDATE commands, a value cannot be specified for a generated column,
> > but
> > >>>> the keyword DEFAULT may be specified.
> > >>>>>
> > >>>>> It shouldn't be allowed to set/update value for generated column
> > after
> > >>>> lookup the SQL 2016:
> > >>>>>> <insert statement> ::=
> > >>>>>> INSERT INTO <insertion target> <insert columns and source>
> > >>>>>>
> > >>>>>> If <contextually typed table value constructor> CTTVC is
> specified,
> > >>>> then every <contextually typed row
> > >>>>>> value constructor element> simply contained in CTTVC whose
> > >> positionally
> > >>>> corresponding <column name>
> > >>>>>> in <insert column list> references a column of which some
> underlying
> > >>>> column is a generated column shall
> > >>>>>> be a <default specification>.
> > >>>>>> A <default specification> specifies the default value of some
> > >>>> associated item.
> > >>>>>
> > >>>>>
> > >>>>> [1]
> > >>>>
> > >>
> >
> https://docs.microsoft.com/en-US/sql/t-sql/statements/alter-table-computed-column-definition-transact-sql?view=sql-server-ver15
> > >>>> <
> > >>>>
> > >>
> >
> https://docs.microsoft.com/en-US/sql/t-sql/statements/alter-table-computed-column-definition-transact-sql?view=sql-server-ver15
> > >>>>>
> > >>>>> [2] https://www.postgresql.org/docs/12/ddl-generated-columns.html
> <
> > >>>> https://www.postgresql.org/docs/12/ddl-generated-columns.html>
> > >>>>>
> > >>>>>> 在 2020年9月8日,17:31,Timo Walther <twal...@apache.org> 写道:
> > >>>>>>
> > >>>>>> Hi Jark,
> > >>>>>>
> > >>>>>> according to Flink's and Calcite's casting definition in [1][2]
> > >>>> TIMESTAMP WITH LOCAL TIME ZONE should be castable from BIGINT. If
> not,
> > >> we
> > >>>> will make it possible ;-)
> > >>>>>>
> > >>>>>> I'm aware of DeserializationSchema.getProducedType but I think
> that
> > >>>> this method is actually misplaced. The type should rather be passed
> to
> > >> the
> > >>>> source itself.
> > >>>>>>
> > >>>>>> For our Kafka SQL source, we will also not use this method because
> > the
> > >>>> Kafka source will add own metadata in addition to the
> > >>>> DeserializationSchema. So DeserializationSchema.getProducedType will
> > >> never
> > >>>> be read.
> > >>>>>>
> > >>>>>> For now I suggest to leave out the `DataType` from
> > >>>> DecodingFormat.applyReadableMetadata. Also because the format's
> > physical
> > >>>> type is passed later in `createRuntimeDecoder`. If necessary, it can
> > be
> > >>>> computed manually by consumedType + metadata types. We will provide
> a
> > >>>> metadata utility class for that.
> > >>>>>>
> > >>>>>> Regards,
> > >>>>>> Timo
> > >>>>>>
> > >>>>>>
> > >>>>>> [1]
> > >>>>
> > >>
> >
> https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java#L200
> > >>>>>> [2]
> > >>>>
> > >>
> >
> https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/sql/type/SqlTypeCoercionRule.java#L254
> > >>>>>>
> > >>>>>>
> > >>>>>> On 08.09.20 10:52, Jark Wu wrote:
> > >>>>>>> Hi Timo,
> > >>>>>>> The updated CAST SYSTEM_METADATA behavior sounds good to me. I
> just
> > >>>> noticed
> > >>>>>>> that a BIGINT can't be converted to "TIMESTAMP(3) WITH LOCAL TIME
> > >>>> ZONE".
> > >>>>>>> So maybe we need to support this, or use "TIMESTAMP(3) WITH LOCAL
> > >> TIME
> > >>>>>>> ZONE" as the defined type of Kafka timestamp? I think this makes
> > >> sense,
> > >>>>>>> because it represents the milli-seconds since epoch.
> > >>>>>>> Regarding "DeserializationSchema doesn't need TypeInfo", I don't
> > >> think
> > >>>> so.
> > >>>>>>> The DeserializationSchema implements ResultTypeQueryable, thus
> the
> > >>>>>>> implementation needs to return an output TypeInfo.
> > >>>>>>> Besides, FlinkKafkaConsumer also
> > >>>>>>> calls DeserializationSchema.getProducedType as the produced type
> of
> > >> the
> > >>>>>>> source function [1].
> > >>>>>>> Best,
> > >>>>>>> Jark
> > >>>>>>> [1]:
> > >>>>>>>
> > >>>>
> > >>
> >
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L1066
> > >>>>>>> On Tue, 8 Sep 2020 at 16:35, Timo Walther <twal...@apache.org>
> > >> wrote:
> > >>>>>>>> Hi everyone,
> > >>>>>>>>
> > >>>>>>>> I updated the FLIP again and hope that I could address the
> > mentioned
> > >>>>>>>> concerns.
> > >>>>>>>>
> > >>>>>>>> @Leonard: Thanks for the explanation. I wasn't aware that ts_ms
> > and
> > >>>>>>>> source.ts_ms have different semantics. I updated the FLIP and
> > expose
> > >>>> the
> > >>>>>>>> most commonly used properties separately. So frequently used
> > >>>> properties
> > >>>>>>>> are not hidden in the MAP anymore:
> > >>>>>>>>
> > >>>>>>>> debezium-json.ingestion-timestamp
> > >>>>>>>> debezium-json.source.timestamp
> > >>>>>>>> debezium-json.source.database
> > >>>>>>>> debezium-json.source.schema
> > >>>>>>>> debezium-json.source.table
> > >>>>>>>>
> > >>>>>>>> However, since other properties depend on the used
> > connector/vendor,
> > >>>> the
> > >>>>>>>> remaining options are stored in:
> > >>>>>>>>
> > >>>>>>>> debezium-json.source.properties
> > >>>>>>>>
> > >>>>>>>> And accessed with:
> > >>>>>>>>
> > >>>>>>>> CAST(SYSTEM_METADATA('debezium-json.source.properties') AS
> > >> MAP<STRING,
> > >>>>>>>> STRING>)['table']
> > >>>>>>>>
> > >>>>>>>> Otherwise it is not possible to figure out the value and column
> > type
> > >>>>>>>> during validation.
> > >>>>>>>>
> > >>>>>>>> @Jark: You convinced me in relaxing the CAST constraints. I
> added
> > a
> > >>>>>>>> dedicacated sub-section to the FLIP:
> > >>>>>>>>
> > >>>>>>>> For making the use of SYSTEM_METADATA easier and avoid nested
> > >> casting
> > >>>> we
> > >>>>>>>> allow explicit casting to a target data type:
> > >>>>>>>>
> > >>>>>>>> rowtime AS CAST(SYSTEM_METADATA("timestamp") AS TIMESTAMP(3)
> WITH
> > >>>> LOCAL
> > >>>>>>>> TIME ZONE)
> > >>>>>>>>
> > >>>>>>>> A connector still produces and consumes the data type returned
> by
> > >>>>>>>> `listMetadata()`. The planner will insert necessary explicit
> > casts.
> > >>>>>>>>
> > >>>>>>>> In any case, the user must provide a CAST such that the computed
> > >>>> column
> > >>>>>>>> receives a valid data type when constructing the table schema.
> > >>>>>>>>
> > >>>>>>>> "I don't see a reason why `DecodingFormat#applyReadableMetadata`
> > >>>> needs a
> > >>>>>>>> DataType argument."
> > >>>>>>>>
> > >>>>>>>> Correct he DeserializationSchema doesn't need TypeInfo, it is
> > always
> > >>>>>>>> executed locally. It is the source that needs TypeInfo for
> > >> serializing
> > >>>>>>>> the record to the next operator. And that's this is what we
> > provide.
> > >>>>>>>>
> > >>>>>>>> @Danny:
> > >>>>>>>>
> > >>>>>>>> “SYSTEM_METADATA("offset")` returns the NULL type by default”
> > >>>>>>>>
> > >>>>>>>> We can also use some other means to represent an UNKNOWN data
> > type.
> > >> In
> > >>>>>>>> the Flink type system, we use the NullType for it. The important
> > >> part
> > >>>> is
> > >>>>>>>> that the final data type is known for the entire computed
> column.
> > >> As I
> > >>>>>>>> mentioned before, I would avoid the suggested option b) that
> would
> > >> be
> > >>>>>>>> similar to your suggestion. The CAST should be enough and allows
> > for
> > >>>>>>>> complex expressions in the computed column. Option b) would need
> > >>>> parser
> > >>>>>>>> changes.
> > >>>>>>>>
> > >>>>>>>> Regards,
> > >>>>>>>> Timo
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> On 08.09.20 06:21, Leonard Xu wrote:
> > >>>>>>>>> Hi, Timo
> > >>>>>>>>>
> > >>>>>>>>> Thanks for you explanation and update,  I have only one
> question
> > >> for
> > >>>>>>>> the latest FLIP.
> > >>>>>>>>>
> > >>>>>>>>> About the MAP<STRING, STRING> DataType of key
> > >>>> 'debezium-json.source', if
> > >>>>>>>> user want to use the table name metadata, they need to write:
> > >>>>>>>>> tableName STRING AS CAST(SYSTEM_METADATA('debeuim-json.source')
> > AS
> > >>>>>>>> MAP<STRING, STRING>)['table']
> > >>>>>>>>>
> > >>>>>>>>> the expression is a little complex for user, Could we only
> > support
> > >>>>>>>> necessary metas with simple DataType as following?
> > >>>>>>>>> tableName STRING AS
> > >>>> CAST(SYSTEM_METADATA('debeuim-json.source.table') AS
> > >>>>>>>> STRING),
> > >>>>>>>>> transactionTime LONG AS
> > >>>>>>>> CAST(SYSTEM_METADATA('debeuim-json.source.ts_ms') AS BIGINT),
> > >>>>>>>>>
> > >>>>>>>>> In this way, we can simplify the expression, the mainly used
> > >>>> metadata in
> > >>>>>>>> changelog format may include
> > >>>> 'database','table','source.ts_ms','ts_ms' from
> > >>>>>>>> my side,
> > >>>>>>>>> maybe we could only support them at first version.
> > >>>>>>>>>
> > >>>>>>>>> Both Debezium and Canal have above four metadata, and I‘m
> willing
> > >> to
> > >>>>>>>> take some subtasks in next development if necessary.
> > >>>>>>>>>
> > >>>>>>>>> Debezium:
> > >>>>>>>>> {
> > >>>>>>>>>       "before": null,
> > >>>>>>>>>       "after": {  "id": 101,"name": "scooter"},
> > >>>>>>>>>       "source": {
> > >>>>>>>>>         "db": "inventory",                  # 1. database name
> > the
> > >>>>>>>> changelog belongs to.
> > >>>>>>>>>         "table": "products",                # 2. table name the
> > >>>> changelog
> > >>>>>>>> belongs to.
> > >>>>>>>>>         "ts_ms": 1589355504100,             # 3. timestamp of
> the
> > >>>> change
> > >>>>>>>> happened in database system, i.e.: transaction time in database.
> > >>>>>>>>>         "connector": "mysql",
> > >>>>>>>>>         ….
> > >>>>>>>>>       },
> > >>>>>>>>>       "ts_ms": 1589355606100,              # 4. timestamp when
> > the
> > >>>> debezium
> > >>>>>>>> processed the changelog.
> > >>>>>>>>>       "op": "c",
> > >>>>>>>>>       "transaction": null
> > >>>>>>>>> }
> > >>>>>>>>>
> > >>>>>>>>> Canal:
> > >>>>>>>>> {
> > >>>>>>>>>       "data": [{  "id": "102", "name": "car battery" }],
> > >>>>>>>>>       "database": "inventory",      # 1. database name the
> > changelog
> > >>>>>>>> belongs to.
> > >>>>>>>>>       "table": "products",          # 2. table name the
> changelog
> > >>>> belongs
> > >>>>>>>> to.
> > >>>>>>>>>       "es": 1589374013000,          # 3. execution time of the
> > >> change
> > >>>> in
> > >>>>>>>> database system, i.e.: transaction time in database.
> > >>>>>>>>>       "ts": 1589374013680,          # 4. timestamp when the
> > cannal
> > >>>>>>>> processed the changelog.
> > >>>>>>>>>       "isDdl": false,
> > >>>>>>>>>       "mysqlType": {},
> > >>>>>>>>>       ....
> > >>>>>>>>> }
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> Best
> > >>>>>>>>> Leonard
> > >>>>>>>>>
> > >>>>>>>>>> 在 2020年9月8日,11:57,Danny Chan <yuzhao....@gmail.com> 写道:
> > >>>>>>>>>>
> > >>>>>>>>>> Thanks Timo ~
> > >>>>>>>>>>
> > >>>>>>>>>> The FLIP was already in pretty good shape, I have only 2
> > questions
> > >>>> here:
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> 1. “`CAST(SYSTEM_METADATA("offset") AS INT)` would be a valid
> > >>>> read-only
> > >>>>>>>> computed column for Kafka and can be extracted by the planner.”
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> What is the pros we follow the SQL-SERVER syntax here ?
> Usually
> > an
> > >>>>>>>> expression return type can be inferred automatically. But I
> guess
> > >>>>>>>> SQL-SERVER does not have function like SYSTEM_METADATA which
> > >> actually
> > >>>> does
> > >>>>>>>> not have a specific return type.
> > >>>>>>>>>>
> > >>>>>>>>>> And why not use the Oracle or MySQL syntax there ?
> > >>>>>>>>>>
> > >>>>>>>>>> column_name [datatype] [GENERATED ALWAYS] AS (expression)
> > >> [VIRTUAL]
> > >>>>>>>>>> Which is more straight-forward.
> > >>>>>>>>>>
> > >>>>>>>>>> 2. “SYSTEM_METADATA("offset")` returns the NULL type by
> default”
> > >>>>>>>>>>
> > >>>>>>>>>> The default type should not be NULL because only NULL literal
> > does
> > >>>>>>>> that. Usually we use ANY as the type if we do not know the
> > specific
> > >>>> type in
> > >>>>>>>> the SQL context. ANY means the physical value can be any java
> > >> object.
> > >>>>>>>>>>
> > >>>>>>>>>> [1]
> https://oracle-base.com/articles/11g/virtual-columns-11gr1
> > >>>>>>>>>> [2]
> > >>>>>>>>
> > >>>>
> > >>
> >
> https://dev.mysql.com/doc/refman/5.7/en/create-table-generated-columns.html
> > >>>>>>>>>>
> > >>>>>>>>>> Best,
> > >>>>>>>>>> Danny Chan
> > >>>>>>>>>> 在 2020年9月4日 +0800 PM4:48,Timo Walther <twal...@apache.org
> >,写道:
> > >>>>>>>>>>> Hi everyone,
> > >>>>>>>>>>>
> > >>>>>>>>>>> I completely reworked FLIP-107. It now covers the full story
> > how
> > >> to
> > >>>>>>>> read
> > >>>>>>>>>>> and write metadata from/to connectors and formats. It
> considers
> > >>>> all of
> > >>>>>>>>>>> the latest FLIPs, namely FLIP-95, FLIP-132 and FLIP-122. It
> > >>>> introduces
> > >>>>>>>>>>> the concept of PERSISTED computed columns and leaves out
> > >>>> partitioning
> > >>>>>>>>>>> for now.
> > >>>>>>>>>>>
> > >>>>>>>>>>> Looking forward to your feedback.
> > >>>>>>>>>>>
> > >>>>>>>>>>> Regards,
> > >>>>>>>>>>> Timo
> > >>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>>> On 04.03.20 09:45, Kurt Young wrote:
> > >>>>>>>>>>>> Sorry, forgot one question.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> 4. Can we make the value.fields-include more orthogonal?
> Like
> > >> one
> > >>>> can
> > >>>>>>>>>>>> specify it as "EXCEPT_KEY, EXCEPT_TIMESTAMP".
> > >>>>>>>>>>>> With current EXCEPT_KEY and EXCEPT_KEY_TIMESTAMP, users can
> > not
> > >>>>>>>> config to
> > >>>>>>>>>>>> just ignore timestamp but keep key.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Best,
> > >>>>>>>>>>>> Kurt
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> On Wed, Mar 4, 2020 at 4:42 PM Kurt Young <ykt...@gmail.com
> >
> > >>>> wrote:
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>> Hi Dawid,
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> I have a couple of questions around key fields, actually I
> > also
> > >>>> have
> > >>>>>>>> some
> > >>>>>>>>>>>>> other questions but want to be focused on key fields first.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> 1. I don't fully understand the usage of "key.fields". Is
> > this
> > >>>>>>>> option only
> > >>>>>>>>>>>>> valid during write operation? Because for
> > >>>>>>>>>>>>> reading, I can't imagine how such options can be applied. I
> > >> would
> > >>>>>>>> expect
> > >>>>>>>>>>>>> that there might be a SYSTEM_METADATA("key")
> > >>>>>>>>>>>>> to read and assign the key to a normal field?
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> 2. If "key.fields" is only valid in write operation, I want
> > to
> > >>>>>>>> propose we
> > >>>>>>>>>>>>> can simplify the options to not introducing key.format.type
> > and
> > >>>>>>>>>>>>> other related options. I think a single "key.field" (not
> > >> fields)
> > >>>>>>>> would be
> > >>>>>>>>>>>>> enough, users can use UDF to calculate whatever key they
> > >>>>>>>>>>>>> want before sink.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> 3. Also I don't want to introduce "value.format.type" and
> > >>>>>>>>>>>>> "value.format.xxx" with the "value" prefix. Not every
> > connector
> > >>>> has a
> > >>>>>>>>>>>>> concept
> > >>>>>>>>>>>>> of key and values. The old parameter "format.type" already
> > good
> > >>>>>>>> enough to
> > >>>>>>>>>>>>> use.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Best,
> > >>>>>>>>>>>>> Kurt
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> On Tue, Mar 3, 2020 at 10:40 PM Jark Wu <imj...@gmail.com>
> > >>>> wrote:
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Thanks Dawid,
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> I have two more questions.
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> SupportsMetadata
> > >>>>>>>>>>>>>> Introducing SupportsMetadata sounds good to me. But I have
> > >> some
> > >>>>>>>> questions
> > >>>>>>>>>>>>>> regarding to this interface.
> > >>>>>>>>>>>>>> 1) How do the source know what the expected return type of
> > >> each
> > >>>>>>>> metadata?
> > >>>>>>>>>>>>>> 2) Where to put the metadata fields? Append to the
> existing
> > >>>> physical
> > >>>>>>>>>>>>>> fields?
> > >>>>>>>>>>>>>> If yes, I would suggest to change the signature to
> > >> `TableSource
> > >>>>>>>>>>>>>> appendMetadataFields(String[] metadataNames, DataType[]
> > >>>>>>>> metadataTypes)`
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> SYSTEM_METADATA("partition")
> > >>>>>>>>>>>>>> Can SYSTEM_METADATA() function be used nested in a
> computed
> > >>>> column
> > >>>>>>>>>>>>>> expression? If yes, how to specify the return type of
> > >>>>>>>> SYSTEM_METADATA?
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Best,
> > >>>>>>>>>>>>>> Jark
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> On Tue, 3 Mar 2020 at 17:06, Dawid Wysakowicz <
> > >>>>>>>> dwysakow...@apache.org>
> > >>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Hi,
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> 1. I thought a bit more on how the source would emit the
> > >>>> columns
> > >>>>>>>> and I
> > >>>>>>>>>>>>>>> now see its not exactly the same as regular columns. I
> see
> > a
> > >>>> need
> > >>>>>>>> to
> > >>>>>>>>>>>>>>> elaborate a bit more on that in the FLIP as you asked,
> > Jark.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> I do agree mostly with Danny on how we should do that.
> One
> > >>>>>>>> additional
> > >>>>>>>>>>>>>>> things I would introduce is an
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> interface SupportsMetadata {
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> boolean supportsMetadata(Set<String> metadataFields);
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> TableSource generateMetadataFields(Set<String>
> > >> metadataFields);
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> }
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> This way the source would have to declare/emit only the
> > >>>> requested
> > >>>>>>>>>>>>>>> metadata fields. In order not to clash with user defined
> > >>>> fields.
> > >>>>>>>> When
> > >>>>>>>>>>>>>>> emitting the metadata field I would prepend the column
> name
> > >>>> with
> > >>>>>>>>>>>>>>> __system_{property_name}. Therefore when requested
> > >>>>>>>>>>>>>>> SYSTEM_METADATA("partition") the source would append a
> > field
> > >>>>>>>>>>>>>>> __system_partition to the schema. This would be never
> > visible
> > >>>> to
> > >>>>>>>> the
> > >>>>>>>>>>>>>>> user as it would be used only for the subsequent computed
> > >>>> columns.
> > >>>>>>>> If
> > >>>>>>>>>>>>>>> that makes sense to you, I will update the FLIP with this
> > >>>>>>>> description.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> 2. CAST vs explicit type in computed columns
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Here I agree with Danny. It is also the current state of
> > the
> > >>>>>>>> proposal.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> 3. Partitioning on computed column vs function
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Here I also agree with Danny. I also think those are
> > >>>> orthogonal. I
> > >>>>>>>> would
> > >>>>>>>>>>>>>>> leave out the STORED computed columns out of the
> > discussion.
> > >> I
> > >>>>>>>> don't see
> > >>>>>>>>>>>>>>> how do they relate to the partitioning. I already put
> both
> > of
> > >>>> those
> > >>>>>>>>>>>>>>> cases in the document. We can either partition on a
> > computed
> > >>>>>>>> column or
> > >>>>>>>>>>>>>>> use a udf in a partioned by clause. I am fine with
> leaving
> > >> out
> > >>>> the
> > >>>>>>>>>>>>>>> partitioning by udf in the first version if you still
> have
> > >> some
> > >>>>>>>>>>>>>> concerns.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> As for your question Danny. It depends which partitioning
> > >>>> strategy
> > >>>>>>>> you
> > >>>>>>>>>>>>>> use.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> For the HASH partitioning strategy I thought it would
> work
> > as
> > >>>> you
> > >>>>>>>>>>>>>>> explained. It would be N = MOD(expr, num). I am not sure
> > >>>> though if
> > >>>>>>>> we
> > >>>>>>>>>>>>>>> should introduce the PARTITIONS clause. Usually Flink
> does
> > >> not
> > >>>> own
> > >>>>>>>> the
> > >>>>>>>>>>>>>>> data and the partitions are already an intrinsic property
> > of
> > >>>> the
> > >>>>>>>>>>>>>>> underlying source e.g. for kafka we do not create topics,
> > but
> > >>>> we
> > >>>>>>>> just
> > >>>>>>>>>>>>>>> describe pre-existing pre-partitioned topic.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> 4. timestamp vs timestamp.field vs connector.field vs ...
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> I am fine with changing it to timestamp.field to be
> > >> consistent
> > >>>> with
> > >>>>>>>>>>>>>>> other value.fields and key.fields. Actually that was also
> > my
> > >>>>>>>> initial
> > >>>>>>>>>>>>>>> proposal in a first draft I prepared. I changed it
> > afterwards
> > >>>> to
> > >>>>>>>> shorten
> > >>>>>>>>>>>>>>> the key.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Best,
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Dawid
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> On 03/03/2020 09:00, Danny Chan wrote:
> > >>>>>>>>>>>>>>>> Thanks Dawid for bringing up this discussion, I think it
> > is
> > >> a
> > >>>>>>>> useful
> > >>>>>>>>>>>>>>> feature ~
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> About how the metadata outputs from source
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> I think it is completely orthogonal, computed column
> push
> > >>>> down is
> > >>>>>>>>>>>>>>> another topic, this should not be a blocker but a
> > promotion,
> > >>>> if we
> > >>>>>>>> do
> > >>>>>>>>>>>>>> not
> > >>>>>>>>>>>>>>> have any filters on the computed column, there is no need
> > to
> > >>>> do any
> > >>>>>>>>>>>>>>> pushings; the source node just emit the complete record
> > with
> > >>>> full
> > >>>>>>>>>>>>>> metadata
> > >>>>>>>>>>>>>>> with the declared physical schema, then when generating
> the
> > >>>> virtual
> > >>>>>>>>>>>>>>> columns, we would extract the metadata info and output as
> > >> full
> > >>>>>>>>>>>>>> columns(with
> > >>>>>>>>>>>>>>> full schema).
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> About the type of metadata column
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Personally i prefer explicit type instead of CAST, they
> > are
> > >>>>>>>> symantic
> > >>>>>>>>>>>>>>> equivalent though, explict type is more straight-forward
> > and
> > >>>> we can
> > >>>>>>>>>>>>>> declare
> > >>>>>>>>>>>>>>> the nullable attribute there.
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> About option A: partitioning based on acomputed column
> VS
> > >>>> option
> > >>>>>>>> B:
> > >>>>>>>>>>>>>>> partitioning with just a function
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>     From the FLIP, it seems that B's partitioning is
> just
> > a
> > >>>> strategy
> > >>>>>>>> when
> > >>>>>>>>>>>>>>> writing data, the partiton column is not included in the
> > >> table
> > >>>>>>>> schema,
> > >>>>>>>>>>>>>> so
> > >>>>>>>>>>>>>>> it's just useless when reading from that.
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> - Compared to A, we do not need to generate the
> partition
> > >>>> column
> > >>>>>>>> when
> > >>>>>>>>>>>>>>> selecting from the table(but insert into)
> > >>>>>>>>>>>>>>>> - For A we can also mark the column as STORED when we
> want
> > >> to
> > >>>>>>>> persist
> > >>>>>>>>>>>>>>> that
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> So in my opition they are orthogonal, we can support
> > both, i
> > >>>> saw
> > >>>>>>>> that
> > >>>>>>>>>>>>>>> MySQL/Oracle[1][2] would suggest to also define the
> > >> PARTITIONS
> > >>>>>>>> num, and
> > >>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>> partitions are managed under a "tablenamespace", the
> > >> partition
> > >>>> in
> > >>>>>>>> which
> > >>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>> record is stored is partition number N, where N =
> MOD(expr,
> > >>>> num),
> > >>>>>>>> for
> > >>>>>>>>>>>>>> your
> > >>>>>>>>>>>>>>> design, which partiton the record would persist ?
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> [1]
> > >>>>>>>> https://dev.mysql.com/doc/refman/5.7/en/partitioning-hash.html
> > >>>>>>>>>>>>>>>> [2]
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>
> > >>>>
> > >>
> >
> https://docs.oracle.com/database/121/VLDBG/GUID-F023D3ED-262F-4B19-950A-D3C8F8CDB4F4.htm#VLDBG1270
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Best,
> > >>>>>>>>>>>>>>>> Danny Chan
> > >>>>>>>>>>>>>>>> 在 2020年3月2日 +0800 PM6:16,Dawid Wysakowicz <
> > >>>> dwysakow...@apache.org
> > >>>>>>>>>>>>>>> ,写道:
> > >>>>>>>>>>>>>>>>> Hi Jark,
> > >>>>>>>>>>>>>>>>> Ad. 2 I added a section to discuss relation to FLIP-63
> > >>>>>>>>>>>>>>>>> Ad. 3 Yes, I also tried to somewhat keep hierarchy of
> > >>>> properties.
> > >>>>>>>>>>>>>>> Therefore you have the key.format.type.
> > >>>>>>>>>>>>>>>>> I also considered exactly what you are suggesting
> > >> (prefixing
> > >>>> with
> > >>>>>>>>>>>>>>> connector or kafka). I should've put that into an
> > >>>> Option/Rejected
> > >>>>>>>>>>>>>>> alternatives.
> > >>>>>>>>>>>>>>>>> I agree timestamp, key.*, value.* are connector
> > properties.
> > >>>> Why I
> > >>>>>>>>>>>>>>> wanted to suggest not adding that prefix in the first
> > version
> > >>>> is
> > >>>>>>>> that
> > >>>>>>>>>>>>>>> actually all the properties in the WITH section are
> > connector
> > >>>>>>>>>>>>>> properties.
> > >>>>>>>>>>>>>>> Even format is in the end a connector property as some of
> > the
> > >>>>>>>> sources
> > >>>>>>>>>>>>>> might
> > >>>>>>>>>>>>>>> not have a format, imo. The benefit of not adding the
> > prefix
> > >> is
> > >>>>>>>> that it
> > >>>>>>>>>>>>>>> makes the keys a bit shorter. Imagine prefixing all the
> > >>>> properties
> > >>>>>>>> with
> > >>>>>>>>>>>>>>> connector (or if we go with FLINK-12557: elasticsearch):
> > >>>>>>>>>>>>>>>>> elasticsearch.key.format.type: csv
> > >>>>>>>>>>>>>>>>> elasticsearch.key.format.field: ....
> > >>>>>>>>>>>>>>>>> elasticsearch.key.format.delimiter: ....
> > >>>>>>>>>>>>>>>>> elasticsearch.key.format.*: ....
> > >>>>>>>>>>>>>>>>> I am fine with doing it though if this is a preferred
> > >>>> approach
> > >>>>>>>> in the
> > >>>>>>>>>>>>>>> community.
> > >>>>>>>>>>>>>>>>> Ad in-line comments:
> > >>>>>>>>>>>>>>>>> I forgot to update the `value.fields.include` property.
> > It
> > >>>>>>>> should be
> > >>>>>>>>>>>>>>> value.fields-include. Which I think you also suggested in
> > the
> > >>>>>>>> comment,
> > >>>>>>>>>>>>>>> right?
> > >>>>>>>>>>>>>>>>> As for the cast vs declaring output type of computed
> > >> column.
> > >>>> I
> > >>>>>>>> think
> > >>>>>>>>>>>>>>> it's better not to use CAST, but declare a type of an
> > >>>> expression
> > >>>>>>>> and
> > >>>>>>>>>>>>>> later
> > >>>>>>>>>>>>>>> on infer the output type of SYSTEM_METADATA. The reason
> is
> > I
> > >>>> think
> > >>>>>>>> this
> > >>>>>>>>>>>>>> way
> > >>>>>>>>>>>>>>> it will be easier to implement e.g. filter push downs
> when
> > >>>> working
> > >>>>>>>> with
> > >>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>> native types of the source, e.g. in case of Kafka's
> > offset, i
> > >>>>>>>> think it's
> > >>>>>>>>>>>>>>> better to pushdown long rather than string. This could
> let
> > us
> > >>>> push
> > >>>>>>>>>>>>>>> expression like e.g. offset > 12345 & offset < 59382.
> > >>>> Otherwise we
> > >>>>>>>> would
> > >>>>>>>>>>>>>>> have to push down cast(offset, long) > 12345 &&
> > cast(offset,
> > >>>> long)
> > >>>>>>>> <
> > >>>>>>>>>>>>>> 59382.
> > >>>>>>>>>>>>>>> Moreover I think we need to introduce the type for
> computed
> > >>>> columns
> > >>>>>>>>>>>>>> anyway
> > >>>>>>>>>>>>>>> to support functions that infer output type based on
> > expected
> > >>>>>>>> return
> > >>>>>>>>>>>>>> type.
> > >>>>>>>>>>>>>>>>> As for the computed column push down. Yes,
> > SYSTEM_METADATA
> > >>>> would
> > >>>>>>>> have
> > >>>>>>>>>>>>>>> to be pushed down to the source. If it is not possible
> the
> > >>>> planner
> > >>>>>>>>>>>>>> should
> > >>>>>>>>>>>>>>> fail. As far as I know computed columns push down will be
> > >> part
> > >>>> of
> > >>>>>>>> source
> > >>>>>>>>>>>>>>> rework, won't it? ;)
> > >>>>>>>>>>>>>>>>> As for the persisted computed column. I think it is
> > >>>> completely
> > >>>>>>>>>>>>>>> orthogonal. In my current proposal you can also partition
> > by
> > >> a
> > >>>>>>>> computed
> > >>>>>>>>>>>>>>> column. The difference between using a udf in partitioned
> > by
> > >> vs
> > >>>>>>>>>>>>>> partitioned
> > >>>>>>>>>>>>>>> by a computed column is that when you partition by a
> > computed
> > >>>>>>>> column
> > >>>>>>>>>>>>>> this
> > >>>>>>>>>>>>>>> column must be also computed when reading the table. If
> you
> > >>>> use a
> > >>>>>>>> udf in
> > >>>>>>>>>>>>>>> the partitioned by, the expression is computed only when
> > >>>> inserting
> > >>>>>>>> into
> > >>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>> table.
> > >>>>>>>>>>>>>>>>> Hope this answers some of your questions. Looking
> forward
> > >> for
> > >>>>>>>> further
> > >>>>>>>>>>>>>>> suggestions.
> > >>>>>>>>>>>>>>>>> Best,
> > >>>>>>>>>>>>>>>>> Dawid
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>> On 02/03/2020 05:18, Jark Wu wrote:
> > >>>>>>>>>>>>>>>>>> Hi,
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> Thanks Dawid for starting such a great discussion.
> > Reaing
> > >>>>>>>> metadata
> > >>>>>>>>>>>>>> and
> > >>>>>>>>>>>>>>>>>> key-part information from source is an important
> feature
> > >> for
> > >>>>>>>>>>>>>> streaming
> > >>>>>>>>>>>>>>>>>> users.
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> In general, I agree with the proposal of the FLIP.
> > >>>>>>>>>>>>>>>>>> I will leave my thoughts and comments here:
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> 1) +1 to use connector properties instead of
> introducing
> > >>>> HEADER
> > >>>>>>>>>>>>>>> keyword as
> > >>>>>>>>>>>>>>>>>> the reason you mentioned in the FLIP.
> > >>>>>>>>>>>>>>>>>> 2) we already introduced PARTITIONED BY in FLIP-63.
> > Maybe
> > >> we
> > >>>>>>>> should
> > >>>>>>>>>>>>>>> add a
> > >>>>>>>>>>>>>>>>>> section to explain what's the relationship between
> them.
> > >>>>>>>>>>>>>>>>>> Do their concepts conflict? Could INSERT PARTITION be
> > used
> > >>>> on
> > >>>>>>>> the
> > >>>>>>>>>>>>>>>>>> PARTITIONED table in this FLIP?
> > >>>>>>>>>>>>>>>>>> 3) Currently, properties are hierarchical in Flink
> SQL.
> > >>>> Shall we
> > >>>>>>>>>>>>>> make
> > >>>>>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>>>> new introduced properties more hierarchical?
> > >>>>>>>>>>>>>>>>>> For example, "timestamp" => "connector.timestamp"?
> > >>>> (actually, I
> > >>>>>>>>>>>>>>> prefer
> > >>>>>>>>>>>>>>>>>> "kafka.timestamp" which is another improvement for
> > >>>> properties
> > >>>>>>>>>>>>>>> FLINK-12557)
> > >>>>>>>>>>>>>>>>>> A single "timestamp" in properties may mislead users
> > that
> > >>>> the
> > >>>>>>>>>>>>>> field
> > >>>>>>>>>>>>>>> is
> > >>>>>>>>>>>>>>>>>> a rowtime attribute.
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> I also left some minor comments in the FLIP.
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> Thanks,
> > >>>>>>>>>>>>>>>>>> Jark
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>> On Sun, 1 Mar 2020 at 22:30, Dawid Wysakowicz <
> > >>>>>>>>>>>>>> dwysakow...@apache.org>
> > >>>>>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Hi,
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> I would like to propose an improvement that would
> > enable
> > >>>>>>>> reading
> > >>>>>>>>>>>>>> table
> > >>>>>>>>>>>>>>>>>>> columns from different parts of source records.
> Besides
> > >> the
> > >>>>>>>> main
> > >>>>>>>>>>>>>>> payload
> > >>>>>>>>>>>>>>>>>>> majority (if not all of the sources) expose
> additional
> > >>>>>>>>>>>>>> information. It
> > >>>>>>>>>>>>>>>>>>> can be simply a read-only metadata such as offset,
> > >>>> ingestion
> > >>>>>>>> time
> > >>>>>>>>>>>>>> or a
> > >>>>>>>>>>>>>>>>>>> read and write parts of the record that contain data
> > but
> > >>>>>>>>>>>>>> additionally
> > >>>>>>>>>>>>>>>>>>> serve different purposes (partitioning, compaction
> > etc.),
> > >>>> e.g.
> > >>>>>>>> key
> > >>>>>>>>>>>>>> or
> > >>>>>>>>>>>>>>>>>>> timestamp in Kafka.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> We should make it possible to read and write data
> from
> > >> all
> > >>>> of
> > >>>>>>>> those
> > >>>>>>>>>>>>>>>>>>> locations. In this proposal I discuss reading
> > >> partitioning
> > >>>>>>>> data,
> > >>>>>>>>>>>>>> for
> > >>>>>>>>>>>>>>>>>>> completeness this proposal discusses also the
> > >> partitioning
> > >>>> when
> > >>>>>>>>>>>>>>> writing
> > >>>>>>>>>>>>>>>>>>> data out.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> I am looking forward to your comments.
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> You can access the FLIP here:
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>
> > >>>>
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records?src=contextnavpagetreemode
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Best,
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>> Dawid
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>>
> > >>>>
> > >>>>
> > >>>
> > >>
> > >>
> > >
> >
> >
>

Reply via email to