Yaroslav Tkachenko created FLINK-33622:
------------------------------------------
Summary: table.optimizer.reuse-sub-plan-enabled doesn't work when
a Table is converted to a DataStream
Key: FLINK-33622
URL: https://issues.apache.org/jira/browse/FLINK-33622
Project: Flink
Issue Type: Bug
Components: Table SQL / Planner
Reporter: Yaroslav Tkachenko
Attachments: Screenshot 2023-11-22 at 11.09.46 AM.png, Screenshot
2023-11-22 at 11.10.29 AM.png
I have a source (a DataStream converted to a Table), a SQL transformation
(really anything, could be a join or a simple "SELECT * FROM"), and *two* Table
API sinks (added via a
StatementSet).
Here's the execution plan for this case:
{code:java}
Calc(select=[id, address, amount])(reuse_id=[1])
+- DropUpdateBefore
+- TableSourceScan(table=[[*anonymous_datastream_source$1*]], fields=[id,
event_signature, address, amount,
contract_address])Sink(table=[default_catalog.default_database.clickhouse_sink_for_factory_with_topologyId_clcy8vodu000108l6002fcsdm_],
fields=[id, address, amount])
+-
Reused(reference_id=[1])Sink(table=[default_catalog.default_database.blackhole_sink_for_factory_with_topologyId_clcy8vodu000108l6002fcsdm_],
fields=[id, address, amount])
+- Reused(reference_id=[1]) {code}
As you can see, a transformation is reused by both sinks.
In another case, before writing a transformation to one of the sinks, I convert
the Table to a DataStream and then back to a Table (I actually apply some
filtering on the DataStream, but the problem persists even after removing it,
so it's irrelevant).
In this case, sinks don't reuse the results of the transformation; here's an
execution plan:
{code:java}
Sink(table=[default_catalog.default_database.clickhouse_sink_for_factory_with_topologyId_clcy8vodu000108l6002fcsdm_],
fields=[id, address, amount])
+- TableSourceScan(table=[[*anonymous_datastream_source$3*]], fields=[id,
address,
amount])Sink(table=[default_catalog.default_database.blackhole_sink_for_factory_with_topologyId_clcy8vodu000108l6002fcsdm_],
fields=[id, address, amount])
+- Calc(select=[id, address, amount])
+- DropUpdateBefore
+- TableSourceScan(table=[[*anonymous_datastream_source$1*]], fields=[id,
event_signature, address, amount, contract_address]) {code}
So, the data is processed twice. It could be a big problem for a heavy stateful
operation.
This feels like a bug in the optimizer. The same situation can be achieved by
turning off *table.optimizer.reuse-sub-plan-enabled* option.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)