[ 
https://issues.apache.org/jira/browse/FLINK-23911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17403021#comment-17403021
 ] 

Ingo Bürk edited comment on FLINK-23911 at 8/23/21, 8:13 AM:
-------------------------------------------------------------

We should also still document the fact that #applyReadableMetadata (and other 
abilities, I assume) should be idempotent. The source is copied before the 
specs are applied, but an implementation like this looks totally fine based on 
the interface docs, but is actually broken (pseudo-code):
{code:java}
class MySource implements SupportsReadableMetadata, SupportsProjectionPushDown {
  private final List<String> metadataKeys = new ArrayList<>();

  public MySource copy() {
    MySource copy = new MySource();
    copy.metadataKeys.addAll(metadataKeys);
    return copy;
  }

  public void applyReadableMetadata(List<String> metadataKeys, DataType 
producedType) {
    this.metadataKeys.addAll(metadataKeys);
  }
}{code}
Since the source is created and applyReadableMetadata is called with all 
metadata keys, copying it and then calling it with a subset of keys would only 
add duplicates.


was (Author: airblader):
We should also still document the fact that #applyReadableMetadata (and other 
abilities, I assume) should be idempotent. The source is copied before the 
specs are applied, but an implementation like this looks totally fine based on 
the interface docs, but is actually broken (pseudo-code):
{code:java}
class MySource implements SupportsReadableMetadata, SupportsProjectionPushDown {
  private final List<String> metadataKeys = new ArrayList<>();

  public MySource copy() {
    MySource copy = new MySource();
    copy.metadataKeys.addAll(metadataKeys);
  }

  public void applyReadableMetadata(List<String> metadataKeys, DataType 
producedType) {
    this.metadataKeys.addAll(metadataKeys);
  }
}{code}
Since the source is created and applyReadableMetadata is called with all 
metadata keys, copying it and then calling it with a subset of keys would only 
add duplicates.

> Projections are not considered when pushing readable metadata into a source
> ---------------------------------------------------------------------------
>
>                 Key: FLINK-23911
>                 URL: https://issues.apache.org/jira/browse/FLINK-23911
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.13.2
>            Reporter: Ingo Bürk
>            Priority: Major
>
> Given a table with a declared schema containing some metadata columns, if we 
> select only some of those metadata columns (or none), the interface of 
> SupportsReadableMetadata states that the planner will perform the projection 
> and only push required metadata keys into the source:
> {quote}The planner will select required metadata columns (i.e. perform 
> projection push down) and will call \{@link #applyReadableMetadata(List, 
> DataType)} with a list of metadata keys.{quote}
> However, it seems that this doesn't happen, and the planner always applies 
> all metadata declared in the schema instead. This can be a problem because 
> the source has to do unnecessary work, and some metadata might be more 
> expensive to compute than others.
> For reference, SupportsProjectionPushDown can not be used to workaround this 
> because it operates only on physical columns, i.e. #applyProjections will 
> never be called with a projection for the metadata columns, even if they are 
> selected.
> The following test case can be executed to debug into #applyReadableMetadata 
> of the values table source:
> {code:java}
> @Test
> def test(): Unit = {
>   val tableId = TestValuesTableFactory.registerData(Seq())
>   tEnv.createTemporaryTable("T", TableDescriptor.forConnector("values")
>     .schema(Schema.newBuilder()
>       .column("f0", DataTypes.INT())
>       .columnByMetadata("m1", DataTypes.STRING())
>       .columnByMetadata("m2", DataTypes.STRING())
>       .build())
>     .option("data-id", tableId)
>     .option("bounded", "true")
>     .option("readable-metadata", "m1:STRING,m2:STRING")
>     .build())
>   tEnv.sqlQuery("SELECT f0, m1 FROM T").execute().collect().toList
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to