[ 
https://issues.apache.org/jira/browse/FLINK-23911?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ingo Bürk updated FLINK-23911:
------------------------------
    Description: 
h1. New Description

If a source implements SupportsReadingMetadata and SupportsProjectionPushDown, 
a table is created declaring some metadata columns and then queried with none 
of the metadata columns selected, #applyReadableMetadata will be called with 
all metadata keys declared in the schema. This causes unnecessary (and 
potentially expensive) calculations in the source.

 
----
h1. Original Description

Given a table with a declared schema containing some metadata columns, if we 
select only some of those metadata columns (or none), the interface of 
SupportsReadableMetadata states that the planner will perform the projection 
and only push required metadata keys into the source:
{quote}The planner will select required metadata columns (i.e. perform 
projection push down) and will call \{@link #applyReadableMetadata(List, 
DataType)} with a list of metadata keys.
{quote}
However, it seems that this doesn't happen, and the planner always applies all 
metadata declared in the schema instead. This can be a problem because the 
source has to do unnecessary work, and some metadata might be more expensive to 
compute than others.

For reference, SupportsProjectionPushDown can not be used to workaround this 
because it operates only on physical columns, i.e. #applyProjections will never 
be called with a projection for the metadata columns, even if they are selected.

The following test case can be executed to debug into #applyReadableMetadata of 
the values table source:
{code:java}
@Test
def test(): Unit = {
  val tableId = TestValuesTableFactory.registerData(Seq())

  tEnv.createTemporaryTable("T", TableDescriptor.forConnector("values")
    .schema(Schema.newBuilder()
      .column("f0", DataTypes.INT())
      .columnByMetadata("m1", DataTypes.STRING())
      .columnByMetadata("m2", DataTypes.STRING())
      .build())
    .option("data-id", tableId)
    .option("bounded", "true")
    .option("readable-metadata", "m1:STRING,m2:STRING")
    .build())

  tEnv.sqlQuery("SELECT f0, m1 FROM T").execute().collect().toList
}
{code}

  was:
Given a table with a declared schema containing some metadata columns, if we 
select only some of those metadata columns (or none), the interface of 
SupportsReadableMetadata states that the planner will perform the projection 
and only push required metadata keys into the source:
{quote}The planner will select required metadata columns (i.e. perform 
projection push down) and will call \{@link #applyReadableMetadata(List, 
DataType)} with a list of metadata keys.{quote}
However, it seems that this doesn't happen, and the planner always applies all 
metadata declared in the schema instead. This can be a problem because the 
source has to do unnecessary work, and some metadata might be more expensive to 
compute than others.

For reference, SupportsProjectionPushDown can not be used to workaround this 
because it operates only on physical columns, i.e. #applyProjections will never 
be called with a projection for the metadata columns, even if they are selected.

The following test case can be executed to debug into #applyReadableMetadata of 
the values table source:
{code:java}
@Test
def test(): Unit = {
  val tableId = TestValuesTableFactory.registerData(Seq())

  tEnv.createTemporaryTable("T", TableDescriptor.forConnector("values")
    .schema(Schema.newBuilder()
      .column("f0", DataTypes.INT())
      .columnByMetadata("m1", DataTypes.STRING())
      .columnByMetadata("m2", DataTypes.STRING())
      .build())
    .option("data-id", tableId)
    .option("bounded", "true")
    .option("readable-metadata", "m1:STRING,m2:STRING")
    .build())

  tEnv.sqlQuery("SELECT f0, m1 FROM T").execute().collect().toList
}
{code}


> Projections not selecting any metadata columns cause all declared metadata 
> columns to be applied to the source
> --------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-23911
>                 URL: https://issues.apache.org/jira/browse/FLINK-23911
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.13.2
>            Reporter: Ingo Bürk
>            Assignee: Ingo Bürk
>            Priority: Critical
>             Fix For: 1.14.0
>
>
> h1. New Description
> If a source implements SupportsReadingMetadata and 
> SupportsProjectionPushDown, a table is created declaring some metadata 
> columns and then queried with none of the metadata columns selected, 
> #applyReadableMetadata will be called with all metadata keys declared in the 
> schema. This causes unnecessary (and potentially expensive) calculations in 
> the source.
>  
> ----
> h1. Original Description
> Given a table with a declared schema containing some metadata columns, if we 
> select only some of those metadata columns (or none), the interface of 
> SupportsReadableMetadata states that the planner will perform the projection 
> and only push required metadata keys into the source:
> {quote}The planner will select required metadata columns (i.e. perform 
> projection push down) and will call \{@link #applyReadableMetadata(List, 
> DataType)} with a list of metadata keys.
> {quote}
> However, it seems that this doesn't happen, and the planner always applies 
> all metadata declared in the schema instead. This can be a problem because 
> the source has to do unnecessary work, and some metadata might be more 
> expensive to compute than others.
> For reference, SupportsProjectionPushDown can not be used to workaround this 
> because it operates only on physical columns, i.e. #applyProjections will 
> never be called with a projection for the metadata columns, even if they are 
> selected.
> The following test case can be executed to debug into #applyReadableMetadata 
> of the values table source:
> {code:java}
> @Test
> def test(): Unit = {
>   val tableId = TestValuesTableFactory.registerData(Seq())
>   tEnv.createTemporaryTable("T", TableDescriptor.forConnector("values")
>     .schema(Schema.newBuilder()
>       .column("f0", DataTypes.INT())
>       .columnByMetadata("m1", DataTypes.STRING())
>       .columnByMetadata("m2", DataTypes.STRING())
>       .build())
>     .option("data-id", tableId)
>     .option("bounded", "true")
>     .option("readable-metadata", "m1:STRING,m2:STRING")
>     .build())
>   tEnv.sqlQuery("SELECT f0, m1 FROM T").execute().collect().toList
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to