Yaroslav Tkachenko created FLINK-33622: ------------------------------------------
Summary: table.optimizer.reuse-sub-plan-enabled doesn't work when a Table is converted to a DataStream Key: FLINK-33622 URL: https://issues.apache.org/jira/browse/FLINK-33622 Project: Flink Issue Type: Bug Components: Table SQL / Planner Reporter: Yaroslav Tkachenko Attachments: Screenshot 2023-11-22 at 11.09.46 AM.png, Screenshot 2023-11-22 at 11.10.29 AM.png I have a source (a DataStream converted to a Table), a SQL transformation (really anything, could be a join or a simple "SELECT * FROM"), and *two* Table API sinks (added via a StatementSet). Here's the execution plan for this case: {code:java} Calc(select=[id, address, amount])(reuse_id=[1]) +- DropUpdateBefore +- TableSourceScan(table=[[*anonymous_datastream_source$1*]], fields=[id, event_signature, address, amount, contract_address])Sink(table=[default_catalog.default_database.clickhouse_sink_for_factory_with_topologyId_clcy8vodu000108l6002fcsdm_], fields=[id, address, amount]) +- Reused(reference_id=[1])Sink(table=[default_catalog.default_database.blackhole_sink_for_factory_with_topologyId_clcy8vodu000108l6002fcsdm_], fields=[id, address, amount]) +- Reused(reference_id=[1]) {code} As you can see, a transformation is reused by both sinks. In another case, before writing a transformation to one of the sinks, I convert the Table to a DataStream and then back to a Table (I actually apply some filtering on the DataStream, but the problem persists even after removing it, so it's irrelevant). In this case, sinks don't reuse the results of the transformation; here's an execution plan: {code:java} Sink(table=[default_catalog.default_database.clickhouse_sink_for_factory_with_topologyId_clcy8vodu000108l6002fcsdm_], fields=[id, address, amount]) +- TableSourceScan(table=[[*anonymous_datastream_source$3*]], fields=[id, address, amount])Sink(table=[default_catalog.default_database.blackhole_sink_for_factory_with_topologyId_clcy8vodu000108l6002fcsdm_], fields=[id, address, amount]) +- Calc(select=[id, address, amount]) +- DropUpdateBefore +- TableSourceScan(table=[[*anonymous_datastream_source$1*]], fields=[id, event_signature, address, amount, contract_address]) {code} So, the data is processed twice. It could be a big problem for a heavy stateful operation. This feels like a bug in the optimizer. The same situation can be achieved by turning off *table.optimizer.reuse-sub-plan-enabled* option. -- This message was sent by Atlassian Jira (v8.20.10#820010)