[ 
https://issues.apache.org/jira/browse/FLINK-33622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17794590#comment-17794590
 ] 

Jeyhun Karimov commented on FLINK-33622:
----------------------------------------

Hi [~sap1ens] thanks for reporting the issue. 

First, indeed conversion of {{Table}} to {{DataStream}} and back to {{Table}} 
leads to some medadata loss. As a result, the optimizer cannot reuse the common 
plan(s).
Second, why don't you apply the filtering (that you did with {{DataStream}}) 
with the {{Table}} API?


> table.optimizer.reuse-sub-plan-enabled doesn't work when a Table is converted 
> to a DataStream 
> ----------------------------------------------------------------------------------------------
>
>                 Key: FLINK-33622
>                 URL: https://issues.apache.org/jira/browse/FLINK-33622
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>            Reporter: Yaroslav Tkachenko
>            Priority: Major
>         Attachments: Screenshot 2023-11-22 at 11.09.46 AM.png, Screenshot 
> 2023-11-22 at 11.10.29 AM.png
>
>
> I have a source (a DataStream converted to a Table), a SQL transformation 
> (really anything, could be a join or a simple "SELECT * FROM"), and *two* 
> Table API sinks (added via a 
> StatementSet).
> Here's the execution plan for this case:
> {code:java}
> Calc(select=[id, address, amount])(reuse_id=[1])
> +- DropUpdateBefore
>    +- TableSourceScan(table=[[*anonymous_datastream_source$1*]], fields=[id, 
> event_signature, address, amount, 
> contract_address])Sink(table=[default_catalog.default_database.clickhouse_sink_for_factory_with_topologyId_clcy8vodu000108l6002fcsdm_],
>  fields=[id, address, amount])
> +- 
> Reused(reference_id=[1])Sink(table=[default_catalog.default_database.blackhole_sink_for_factory_with_topologyId_clcy8vodu000108l6002fcsdm_],
>  fields=[id, address, amount])
> +- Reused(reference_id=[1]) {code}
> As you can see, a transformation is reused by both sinks. 
> In another case, before writing a transformation to one of the sinks, I 
> convert the Table to a DataStream and then back to a Table (I actually apply 
> some filtering on the DataStream, but the problem persists even after 
> removing it, so it's irrelevant).
> In this case, sinks don't reuse the results of the transformation; here's an 
> execution plan:
> {code:java}
> Sink(table=[default_catalog.default_database.clickhouse_sink_for_factory_with_topologyId_clcy8vodu000108l6002fcsdm_],
>  fields=[id, address, amount])
> +- TableSourceScan(table=[[*anonymous_datastream_source$3*]], fields=[id, 
> address, 
> amount])Sink(table=[default_catalog.default_database.blackhole_sink_for_factory_with_topologyId_clcy8vodu000108l6002fcsdm_],
>  fields=[id, address, amount])
> +- Calc(select=[id, address, amount])
>    +- DropUpdateBefore
>       +- TableSourceScan(table=[[*anonymous_datastream_source$1*]], 
> fields=[id, event_signature, address, amount, contract_address]) {code}
> So, the data is processed twice. It could be a big problem for a heavy 
> stateful operation. 
> This feels like a bug in the optimizer. The same situation can be achieved by 
> turning off *table.optimizer.reuse-sub-plan-enabled* option.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to