[
https://issues.apache.org/jira/browse/FLINK-33622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17794590#comment-17794590
]
Jeyhun Karimov commented on FLINK-33622:
----------------------------------------
Hi [~sap1ens] thanks for reporting the issue.
First, indeed conversion of {{Table}} to {{DataStream}} and back to {{Table}}
leads to some medadata loss. As a result, the optimizer cannot reuse the common
plan(s).
Second, why don't you apply the filtering (that you did with {{DataStream}})
with the {{Table}} API?
> table.optimizer.reuse-sub-plan-enabled doesn't work when a Table is converted
> to a DataStream
> ----------------------------------------------------------------------------------------------
>
> Key: FLINK-33622
> URL: https://issues.apache.org/jira/browse/FLINK-33622
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Reporter: Yaroslav Tkachenko
> Priority: Major
> Attachments: Screenshot 2023-11-22 at 11.09.46 AM.png, Screenshot
> 2023-11-22 at 11.10.29 AM.png
>
>
> I have a source (a DataStream converted to a Table), a SQL transformation
> (really anything, could be a join or a simple "SELECT * FROM"), and *two*
> Table API sinks (added via a
> StatementSet).
> Here's the execution plan for this case:
> {code:java}
> Calc(select=[id, address, amount])(reuse_id=[1])
> +- DropUpdateBefore
> +- TableSourceScan(table=[[*anonymous_datastream_source$1*]], fields=[id,
> event_signature, address, amount,
> contract_address])Sink(table=[default_catalog.default_database.clickhouse_sink_for_factory_with_topologyId_clcy8vodu000108l6002fcsdm_],
> fields=[id, address, amount])
> +-
> Reused(reference_id=[1])Sink(table=[default_catalog.default_database.blackhole_sink_for_factory_with_topologyId_clcy8vodu000108l6002fcsdm_],
> fields=[id, address, amount])
> +- Reused(reference_id=[1]) {code}
> As you can see, a transformation is reused by both sinks.
> In another case, before writing a transformation to one of the sinks, I
> convert the Table to a DataStream and then back to a Table (I actually apply
> some filtering on the DataStream, but the problem persists even after
> removing it, so it's irrelevant).
> In this case, sinks don't reuse the results of the transformation; here's an
> execution plan:
> {code:java}
> Sink(table=[default_catalog.default_database.clickhouse_sink_for_factory_with_topologyId_clcy8vodu000108l6002fcsdm_],
> fields=[id, address, amount])
> +- TableSourceScan(table=[[*anonymous_datastream_source$3*]], fields=[id,
> address,
> amount])Sink(table=[default_catalog.default_database.blackhole_sink_for_factory_with_topologyId_clcy8vodu000108l6002fcsdm_],
> fields=[id, address, amount])
> +- Calc(select=[id, address, amount])
> +- DropUpdateBefore
> +- TableSourceScan(table=[[*anonymous_datastream_source$1*]],
> fields=[id, event_signature, address, amount, contract_address]) {code}
> So, the data is processed twice. It could be a big problem for a heavy
> stateful operation.
> This feels like a bug in the optimizer. The same situation can be achieved by
> turning off *table.optimizer.reuse-sub-plan-enabled* option.
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)