I'm wishing we had column DEFAULTs as was discussed in this thread way back in 2022.
I found FLIP-261 [1], but it doesn't appear to have been voted on or implemented. Is that correct? [1] https://cwiki.apache.org/confluence/x/x4ueDQ Regards, David On Thu, Sep 1, 2022 at 1:32 AM Ran Tao <[email protected]> wrote: > Hi, Jark & Timo. I'm glad to support this feature, and if you guys agree, > I'll be ready to create a FLIP, and then you guys and other developers can > review and check some specifics. > > Thanks. > > Jark Wu <[email protected]> 于2022年8月30日周二 20:24写道: > > > Thank you Ran for the explanation. > > > > The column DEFAULT is a reasonable feature and can also help in other > > cases. > > I’m fine with adding this feature. > > Do you want to prepare a FLIP for it? > > > > Best, > > Jark > > > > > 2022年8月29日 15:02,Ran Tao <[email protected]> 写道: > > > > > > Hi Jack. Timo summed it up very well. In fact, my problem is that the > > > current flink table metadata is fixed and cannot be compatible with the > > > connector's changes in metadata columns. > > > A metadata column that did not exist in the past, does exist at some > > point > > > in the future, and vice versa. > > > There is forward and backward compatibility here. > > > > > > Jark Wu <[email protected]> 于2022年8月26日周五 16:28写道: > > > > > >> Hi Ran, > > >> > > >> If the metadata is from the message properties, then you can manually > > cast > > >> it to your preferred types, > > >> such as `my_dyanmic_meta AS CAST(properties['my-new-property’] AS > > >> TIMESTAMP)`. > > >> > > >> If the metadata is not from the message properties, how does the > > connector > > >> know which field to convert from? > > >> Shouldn’t the connector be modified to support this new metadata > column? > > >> > > >> Best, > > >> Jark > > >> > > >> > > >> > > >>> 2022年8月26日 15:30,Ran Tao <[email protected]> 写道: > > >>> > > >>> Hi, TiMo. I think using one map column in the debezium format you > > >>> illustrated above can't cover the discussed scenario. > > >>> It's not the same thing. > > >>> > > >>> Here is a debezium format example from flink docs: [1] > > >>> > > >>> ``` > > >>> CREATE TABLE KafkaTable ( > > >>> origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' > > >> VIRTUAL, > > >>> origin_properties MAP<STRING, STRING> METADATA FROM > > >>> 'value.source.properties' VIRTUAL, > > >>> user_id BIGINT, > > >>> ) WITH ( > > >>> 'connector' = 'kafka', > > >>> 'value.format' = 'debezium-json' > > >>> ... > > >>> ); > > >>> ``` > > >>> > > >>> *the `origin_properties` is a column used for properties. So we > define > > it > > >>> at MAP *(just like you respond). But the other metadata columns have > > >> their > > >>> own data types. > > >>> e.g. `origin_ts` is TIMESTAMP. We can not flatmap all metadata > columns > > >>> within one MAP<STRING, STRING> column. it's not a good idea. > > >>> > > >>> My suggestion is that if kafka above *add some new metadatas*(just > for > > >>> example, kafka maybe stable, but a certain connector or middleware > > might > > >> be > > >>> developing, so its metadatas could be added or changed) > > >>> e.g. at some time, kafka added a `host_name` metadata (indicate the > > >> address > > >>> of message broker). > > >>> > > >>> We can define sql like this: > > >>> ``` > > >>> CREATE TABLE KafkaTable ( > > >>> origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' > > >> VIRTUAL, > > >>> host_name STRING METADATA VIRTUAL DYNAMIC, > > >>> origin_properties MAP<STRING, STRING> METADATA FROM > > >>> 'value.source.properties' VIRTUAL, > > >>> user_id BIGINT, > > >>> ) WITH ( > > >>> 'connector' = 'kafka', > > >>> 'value.format' = 'debezium-json' > > >>> ... > > >>> ); > > >>> ``` > > >>> Then users can use `host_name` this metadata, because it's a DYNAMIC > > >>> metacolumn, flink dont't throw exception although `host_name` > > >>> not belongs to kafka before, and the developers don't need to modify > or > > >>> rebuild flink source code and publish flink to online environment (it > > >> comes > > >>> at a high cost). > > >>> > > >>> Considering the return value: > > >>> kafka before (no this metadata): null > > >>> kafka now (added this metadata already): the concrete value > > >>> > > >>> Same user sql works well in the past and now even in the future > rather > > >> than > > >>> check and deny these new metadata columns or modify connector > > >>> implementation frequently to support it. > > >>> And it's an option to configure by using 'DYNAMIC' at the metadata > > >>> column(or other better implementations). > > >>> > > >>> [1] > > >>> > > >> > > > https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/connectors/table/formats/debezium/ > > >>> > > >>> Timo Walther <[email protected]> 于2022年8月25日周四 21:07写道: > > >>> > > >>>> Hi Ran, > > >>>> > > >>>> what would be the data type of this dynamic metadata column? The > > planner > > >>>> and many parts of the stack will require a data type. > > >>>> > > >>>> Personally, I feel connector developers can already have the same > > >>>> functionality by declaring a metadata column as `MAP<STRING, > STRING>`. > > >>>> This is what we expose already as `debezium.source.properties`. > > Whatever > > >>>> Debezium adds will be available through this property and can be > > >>>> accessed via `SELECT col['my-new-property'] FROM x` including being > > NULL > > >>>> be default if not present. > > >>>> > > >>>> Regards, > > >>>> Timo > > >>>> > > >>>> > > >>>> On 25.08.22 14:04, Ran Tao wrote: > > >>>>> ``` > > >>>>> create table test_source( > > >>>>> __test_metadata__ varchar METADATA, > > >>>>> f0 varchar, > > >>>>> f1 varchar, > > >>>>> f2 bigint, > > >>>>> ts as CURRENT_TIMESTAMP > > >>>>> ) with( > > >>>>> 'connector'='test', > > >>>>> ... > > >>>>> ) > > >>>>> ``` > > >>>>> > > >>>>> If we not pre define `__test_metadata__` as meta keys by > implementing > > >>>>> listReadableMetadata, run the above sql, it will cause exception > like > > >>>> this: > > >>>>> > > >>>>> org.apache.flink.table.api.ValidationException: Invalid metadata > key > > >>>>> '__test_metadata__' in column '__test_metadata__' of table > > >>>>> 'default_catalog.default_database.test_source'. The > > DynamicTableSource > > >>>>> class > > 'com.alipay.flink.connectors.test.source.TestDynamicTableSource' > > >>>>> supports the following metadata keys for reading: > > >>>>> xxx, yyy > > >>>>> > > >>>>> at > > >>>>> > > >>>> > > >> > > > org.apache.flink.table.planner.connectors.DynamicSourceUtils.lambda$validateAndApplyMetadata$5(DynamicSourceUtils.java:409) > > >>>>> > > >>>>> Because the current flink metadata column must exist in results > > >> returned > > >>>> by > > >>>>> `listReadableMetadata`. But when a certain connector adds some > > >>>> metadatas, > > >>>>> we can not use it directly unless we modify this connector code and > > >>>> support > > >>>>> it. In some situations, It can be intolerable. Can we support > > 'DYNAMIC > > >>>>> MetadataColumn'? Its basic mechanism is not to check a column with > > >>>>> existing metadatas and users can define it dynamically. If a > certain > > >>>>> connector without this metadata, the column value will return null > > >>>>> otherwise return it's concrete value. It has great benefits in some > > >>>>> scenarios. > > >>>>> > > >>>>> Looking forward to your opinions. > > >>>>> > > >>>>> > > >>>> > > >>>> > > >>> > > >>> -- > > >>> Best Regards, > > >>> Ran Tao > > >>> https://github.com/chucheng92 > > >> > > >> > > > > > > -- > > > Best Regards, > > > Ran Tao > > > https://github.com/chucheng92 > > > > > > -- > Best Regards, > Ran Tao > https://github.com/chucheng92 >
