Yaroslav Tkachenko created FLINK-33622:
------------------------------------------

             Summary: table.optimizer.reuse-sub-plan-enabled doesn't work when 
a Table is converted to a DataStream 
                 Key: FLINK-33622
                 URL: https://issues.apache.org/jira/browse/FLINK-33622
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
            Reporter: Yaroslav Tkachenko
         Attachments: Screenshot 2023-11-22 at 11.09.46 AM.png, Screenshot 
2023-11-22 at 11.10.29 AM.png

I have a source (a DataStream converted to a Table), a SQL transformation 
(really anything, could be a join or a simple "SELECT * FROM"), and *two* Table 
API sinks (added via a 
StatementSet).
Here's the execution plan for this case:
{code:java}
Calc(select=[id, address, amount])(reuse_id=[1])
+- DropUpdateBefore
   +- TableSourceScan(table=[[*anonymous_datastream_source$1*]], fields=[id, 
event_signature, address, amount, 
contract_address])Sink(table=[default_catalog.default_database.clickhouse_sink_for_factory_with_topologyId_clcy8vodu000108l6002fcsdm_],
 fields=[id, address, amount])
+- 
Reused(reference_id=[1])Sink(table=[default_catalog.default_database.blackhole_sink_for_factory_with_topologyId_clcy8vodu000108l6002fcsdm_],
 fields=[id, address, amount])
+- Reused(reference_id=[1]) {code}
As you can see, a transformation is reused by both sinks. 

In another case, before writing a transformation to one of the sinks, I convert 
the Table to a DataStream and then back to a Table (I actually apply some 
filtering on the DataStream, but the problem persists even after removing it, 
so it's irrelevant).

In this case, sinks don't reuse the results of the transformation; here's an 
execution plan:
{code:java}
Sink(table=[default_catalog.default_database.clickhouse_sink_for_factory_with_topologyId_clcy8vodu000108l6002fcsdm_],
 fields=[id, address, amount])
+- TableSourceScan(table=[[*anonymous_datastream_source$3*]], fields=[id, 
address, 
amount])Sink(table=[default_catalog.default_database.blackhole_sink_for_factory_with_topologyId_clcy8vodu000108l6002fcsdm_],
 fields=[id, address, amount])
+- Calc(select=[id, address, amount])
   +- DropUpdateBefore
      +- TableSourceScan(table=[[*anonymous_datastream_source$1*]], fields=[id, 
event_signature, address, amount, contract_address]) {code}
So, the data is processed twice. It could be a big problem for a heavy stateful 
operation. 

This feels like a bug in the optimizer. The same situation can be achieved by 
turning off *table.optimizer.reuse-sub-plan-enabled* option.

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to