[ https://issues.apache.org/jira/browse/FLINK-33622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17794590#comment-17794590 ]
Jeyhun Karimov commented on FLINK-33622: ---------------------------------------- Hi [~sap1ens] thanks for reporting the issue. First, indeed conversion of {{Table}} to {{DataStream}} and back to {{Table}} leads to some medadata loss. As a result, the optimizer cannot reuse the common plan(s). Second, why don't you apply the filtering (that you did with {{DataStream}}) with the {{Table}} API? > table.optimizer.reuse-sub-plan-enabled doesn't work when a Table is converted > to a DataStream > ---------------------------------------------------------------------------------------------- > > Key: FLINK-33622 > URL: https://issues.apache.org/jira/browse/FLINK-33622 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Reporter: Yaroslav Tkachenko > Priority: Major > Attachments: Screenshot 2023-11-22 at 11.09.46 AM.png, Screenshot > 2023-11-22 at 11.10.29 AM.png > > > I have a source (a DataStream converted to a Table), a SQL transformation > (really anything, could be a join or a simple "SELECT * FROM"), and *two* > Table API sinks (added via a > StatementSet). > Here's the execution plan for this case: > {code:java} > Calc(select=[id, address, amount])(reuse_id=[1]) > +- DropUpdateBefore > +- TableSourceScan(table=[[*anonymous_datastream_source$1*]], fields=[id, > event_signature, address, amount, > contract_address])Sink(table=[default_catalog.default_database.clickhouse_sink_for_factory_with_topologyId_clcy8vodu000108l6002fcsdm_], > fields=[id, address, amount]) > +- > Reused(reference_id=[1])Sink(table=[default_catalog.default_database.blackhole_sink_for_factory_with_topologyId_clcy8vodu000108l6002fcsdm_], > fields=[id, address, amount]) > +- Reused(reference_id=[1]) {code} > As you can see, a transformation is reused by both sinks. > In another case, before writing a transformation to one of the sinks, I > convert the Table to a DataStream and then back to a Table (I actually apply > some filtering on the DataStream, but the problem persists even after > removing it, so it's irrelevant). > In this case, sinks don't reuse the results of the transformation; here's an > execution plan: > {code:java} > Sink(table=[default_catalog.default_database.clickhouse_sink_for_factory_with_topologyId_clcy8vodu000108l6002fcsdm_], > fields=[id, address, amount]) > +- TableSourceScan(table=[[*anonymous_datastream_source$3*]], fields=[id, > address, > amount])Sink(table=[default_catalog.default_database.blackhole_sink_for_factory_with_topologyId_clcy8vodu000108l6002fcsdm_], > fields=[id, address, amount]) > +- Calc(select=[id, address, amount]) > +- DropUpdateBefore > +- TableSourceScan(table=[[*anonymous_datastream_source$1*]], > fields=[id, event_signature, address, amount, contract_address]) {code} > So, the data is processed twice. It could be a big problem for a heavy > stateful operation. > This feels like a bug in the optimizer. The same situation can be achieved by > turning off *table.optimizer.reuse-sub-plan-enabled* option. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)