[
https://issues.apache.org/jira/browse/FLINK-23911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17403012#comment-17403012
]
Jingsong Lee commented on FLINK-23911:
--------------------------------------
I'm not sure if we need to separate Metadata projection push down from
SupportProjectionPushDown. If we want to, we may need to introduce an interface
for this.
The (physical/metadata) projection push down should an option. Because it may
be a dangerous thing. Maybe we cannot start multiple instances of a same stream
source. If the project causes the source to not be reused and multiple
instances occur, this may lead to exceptions. For example Kafka consume group
can not be reused by multiple instances.
> Projections are not considered when pushing readable metadata into a source
> ---------------------------------------------------------------------------
>
> Key: FLINK-23911
> URL: https://issues.apache.org/jira/browse/FLINK-23911
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.13.2
> Reporter: Ingo Bürk
> Priority: Major
>
> Given a table with a declared schema containing some metadata columns, if we
> select only some of those metadata columns (or none), the interface of
> SupportsReadableMetadata states that the planner will perform the projection
> and only push required metadata keys into the source:
> {quote}The planner will select required metadata columns (i.e. perform
> projection push down) and will call \{@link #applyReadableMetadata(List,
> DataType)} with a list of metadata keys.{quote}
> However, it seems that this doesn't happen, and the planner always applies
> all metadata declared in the schema instead. This can be a problem because
> the source has to do unnecessary work, and some metadata might be more
> expensive to compute than others.
> For reference, SupportsProjectionPushDown can not be used to workaround this
> because it operates only on physical columns, i.e. #applyProjections will
> never be called with a projection for the metadata columns, even if they are
> selected.
> The following test case can be executed to debug into #applyReadableMetadata
> of the values table source:
> {code:java}
> @Test
> def test(): Unit = {
> val tableId = TestValuesTableFactory.registerData(Seq())
> tEnv.createTemporaryTable("T", TableDescriptor.forConnector("values")
> .schema(Schema.newBuilder()
> .column("f0", DataTypes.INT())
> .columnByMetadata("m1", DataTypes.STRING())
> .columnByMetadata("m2", DataTypes.STRING())
> .build())
> .option("data-id", tableId)
> .option("bounded", "true")
> .option("readable-metadata", "m1:STRING,m2:STRING")
> .build())
> tEnv.sqlQuery("SELECT f0, m1 FROM T").execute().collect().toList
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)