[ 
https://issues.apache.org/jira/browse/FLINK-23911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17403535#comment-17403535
 ] 

Jingsong Lee commented on FLINK-23911:
--------------------------------------

> I don't think I understand why a separate interface would be needed here

Hi [~airblader], Implements SupportsReadableMetadata does not mean the 
connector want to support metadata column projection push down.
For example:
CREATE TABLE kafka (..., meta1, meta2, meta3) WITH ('connector' = 'kafka', 
'group.id' = 'a_id');
INSERT INTO sink_1 SELECT ..., meta1 FROM kafka;
INSERT INTO sink_2 SELECT ..., meta2 FROM kafka;
This job will failed because two kafka source instances use same group.id. Why 
not only one instance? Because their meta column projection are different.

> Projections not selecting any metadata columns cause all declared metadata 
> columns to be applied to the source
> --------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-23911
>                 URL: https://issues.apache.org/jira/browse/FLINK-23911
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.13.2
>            Reporter: Ingo Bürk
>            Assignee: Ingo Bürk
>            Priority: Critical
>             Fix For: 1.14.0
>
>
> h1. New Description
> If a source implements SupportsReadingMetadata and 
> SupportsProjectionPushDown, a table is created declaring some metadata 
> columns and then queried with none of the metadata columns selected, 
> #applyReadableMetadata will be called with all metadata keys declared in the 
> schema. This causes unnecessary (and potentially expensive) calculations in 
> the source.
>  
> ----
> h1. Original Description
> Given a table with a declared schema containing some metadata columns, if we 
> select only some of those metadata columns (or none), the interface of 
> SupportsReadableMetadata states that the planner will perform the projection 
> and only push required metadata keys into the source:
> {quote}The planner will select required metadata columns (i.e. perform 
> projection push down) and will call \{@link #applyReadableMetadata(List, 
> DataType)} with a list of metadata keys.
> {quote}
> However, it seems that this doesn't happen, and the planner always applies 
> all metadata declared in the schema instead. This can be a problem because 
> the source has to do unnecessary work, and some metadata might be more 
> expensive to compute than others.
> For reference, SupportsProjectionPushDown can not be used to workaround this 
> because it operates only on physical columns, i.e. #applyProjections will 
> never be called with a projection for the metadata columns, even if they are 
> selected.
> The following test case can be executed to debug into #applyReadableMetadata 
> of the values table source:
> {code:java}
> @Test
> def test(): Unit = {
>   val tableId = TestValuesTableFactory.registerData(Seq())
>   tEnv.createTemporaryTable("T", TableDescriptor.forConnector("values")
>     .schema(Schema.newBuilder()
>       .column("f0", DataTypes.INT())
>       .columnByMetadata("m1", DataTypes.STRING())
>       .columnByMetadata("m2", DataTypes.STRING())
>       .build())
>     .option("data-id", tableId)
>     .option("bounded", "true")
>     .option("readable-metadata", "m1:STRING,m2:STRING")
>     .build())
>   tEnv.sqlQuery("SELECT f0, m1 FROM T").execute().collect().toList
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to