Thanks Timo for the confirmation. I've also raised FLINK-23911[1] for this.
[1] https://issues.apache.org/jira/browse/FLINK-23911 Best Ingo On Mon, Aug 23, 2021 at 8:34 AM Timo Walther <twal...@apache.org> wrote: > Hi everyone, > > this sounds definitely like a bug to me. Computing metadata might be > very expensive and a connector might expose a long list of metadata > keys. It was therefore intended to project the metadata if possible. I'm > pretty sure that this worked before (at least when implementing > SupportsProjectionPushDown). Maybe a bug was introduced when adding the > Spec support. > > Regards, > Timo > > > On 23.08.21 08:24, Ingo Bürk wrote: > > Hi Jingsong, > > > > thanks for your answer. Even if the source implements > > SupportsProjectionPushDown, #applyProjections will never be called with > > projections for metadata columns. For example, I have the following test: > > > > @Test > > def test(): Unit = { > > val tableId = TestValuesTableFactory.registerData(Seq()) > > > > tEnv.createTemporaryTable("T", TableDescriptor.forConnector("values") > > .schema(Schema.newBuilder() > > .column("f0", DataTypes.INT()) > > .columnByMetadata("m1", DataTypes.STRING()) > > .columnByMetadata("m2", DataTypes.STRING()) > > .build()) > > .option("data-id", tableId) > > .option("bounded", "true") > > .option("readable-metadata", "m1:STRING,m2:STRING") > > .build()) > > > > tEnv.sqlQuery("SELECT f0, m1 FROM T").execute().collect().toList > > } > > > > Regardless of whether I select only f0 or f0 + m1, #applyReadableMetadata > > is always called with m1 + m2, and #applyProjections only ever sees f0. > So > > as far as I can tell, the source has no way of knowing which metadata > > columns are actually needed (under the projection), it always has to > > produce metadata for all metadata columns declared in the table's schema. > > > > In PushProjectIntoTableSourceScanRule I also haven't yet found anything > > that would suggest that metadata are first projected and only then pushed > > to the source. I think the correct behavior should be to call > > #applyReadableMetadata only after they have been considered in the > > projection. > > > > > > Best > > Ingo > > > > > > On Mon, Aug 23, 2021 at 5:05 AM Jingsong Li <jingsongl...@gmail.com> > wrote: > > > >> Hi, > >> > >> I remember the projection only works with SupportsProjectionPushDown. > >> > >> You can take a look at > >> `PushProjectIntoTableSourceScanRuleTest.testNestProjectWithMetadata`. > >> > >> Will applyReadableMetadata again in the > PushProjectIntoTableSourceScanRule. > >> > >> But there may be bug in > >> PushProjectIntoTableSourceScanRule.applyPhysicalAndMetadataPushDown: > >> > >> if (!usedMetadataNames.isEmpty()) { > >> sourceAbilitySpecs.add(new ReadingMetadataSpec(usedMetadataNames, > >> newProducedType)); > >> } > >> > >> If there is no meta column left, we should apply again, We should tell > >> the source that there is no meta column left after projection. > >> > >> Best, > >> Jingsong > >> > >> On Fri, Aug 20, 2021 at 7:56 PM Ingo Bürk <i...@ververica.com> wrote: > >>> > >>> Hi everyone, > >>> > >>> according to the SupportsReadableMetadata interface, the planner is > >>> supposed to project required metadata columns prior to applying them: > >>> > >>>> The planner will select required metadata columns (i.e. perform > >>> projection push down) and will call applyReadableMetadata(List, > DataType) > >>> with a list of metadata keys. > >>> > >>> However, from my experiments it seems that this is not true: regardless > >> of > >>> what columns I select from a table, #applyReadableMetadata always seems > >> to > >>> be called with all metadata declared in the schema of the table. > Metadata > >>> columns are also excluded from > >> SupportsProjectionPushDown#applyProjection, > >>> so the source cannot perform the projection either. > >>> > >>> This is in Flink 1.13.2. Am I misreading the docs here or is this not > >>> working as intended? > >>> > >>> > >>> Best > >>> Ingo > >> > >> > >> > >> -- > >> Best, Jingsong Lee > >> > > > >