[ 
https://issues.apache.org/jira/browse/FLINK-33670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17793232#comment-17793232
 ] 

Jeyhun Karimov commented on FLINK-33670:
----------------------------------------

Hi [~zicat] I tried with Flink master branch, below is the plan I get for your 
case. Looking at the Optimized execution plan, it seems the deduplicate part 
(reuse_id=1) is reused between the two sinks. Do you confirm?

 

== Abstract Syntax Tree ==
LogicalSink(table=[default_catalog.default_database.print1], fields=[id, ts])
+- LogicalProject(id=[$0], ts=[$1])
   +- LogicalProject(id=[$0], ts=[$1], v=[$2], row_nu=[$3])
      +- LogicalFilter(condition=[=($3, 1)])
         +- LogicalProject(id=[$0], ts=[$1], v=[$2], row_nu=[ROW_NUMBER() OVER 
(PARTITION BY $0 ORDER BY PROCTIME() NULLS FIRST)])
            +- LogicalTableScan(table=[[default_catalog, default_database, 
source]])

LogicalSink(table=[default_catalog.default_database.print2], fields=[id, 
EXPR$1, EXPR$2])
+- LogicalProject(id=[$1], EXPR$1=[TUMBLE_START($0)], EXPR$2=[$2])
   +- LogicalAggregate(group=[\{0, 1}], EXPR$2=[SUM($2)])
      +- LogicalProject($f0=[$TUMBLE($1, 20000:INTERVAL SECOND)], id=[$0], 
v=[$2])
         +- LogicalProject(id=[$0], ts=[$1], v=[$2], row_nu=[$3])
            +- LogicalFilter(condition=[=($3, 1)])
               +- LogicalProject(id=[$0], ts=[$1], v=[$2], row_nu=[ROW_NUMBER() 
OVER (PARTITION BY $0 ORDER BY PROCTIME() NULLS FIRST)])
                  +- LogicalTableScan(table=[[default_catalog, 
default_database, source]])

== Optimized Physical Plan ==
Sink(table=[default_catalog.default_database.print1], fields=[id, ts])
+- Calc(select=[id, ts], where=[=(w0$o0, 1)])
   +- OverAggregate(partitionBy=[id], orderBy=[$3 ASC], window#0=[ROW_NUMBER(*) 
AS w0$o0 ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[id, ts, v, 
$3, w0$o0])
      +- Sort(orderBy=[id ASC, $3 ASC])
         +- Calc(select=[id, ts, v, PROCTIME() AS $3])
            +- Exchange(distribution=[hash[id]])
               +- TableSourceScan(table=[[default_catalog, default_database, 
source]], fields=[id, ts, v])

Sink(table=[default_catalog.default_database.print2], fields=[id, EXPR$1, 
EXPR$2])
+- Calc(select=[id, w$start AS EXPR$1, EXPR$2])
   +- HashWindowAggregate(groupBy=[id], window=[TumblingGroupWindow('w$, ts, 
20000)], properties=[w$start, w$end, w$rowtime], select=[id, SUM(v) AS EXPR$2])
      +- Calc(select=[ts, id, v], where=[=(w0$o0, 1)])
         +- OverAggregate(partitionBy=[id], orderBy=[$3 ASC], 
window#0=[ROW_NUMBER(*) AS w0$o0 ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT 
ROW], select=[id, ts, v, $3, w0$o0])
            +- Sort(orderBy=[id ASC, $3 ASC])
               +- Calc(select=[id, ts, v, PROCTIME() AS $3])
                  +- Exchange(distribution=[hash[id]])
                     +- TableSourceScan(table=[[default_catalog, 
default_database, source]], fields=[id, ts, v])

== Optimized Execution Plan ==
OverAggregate(partitionBy=[id], orderBy=[$3 ASC], window#0=[ROW_NUMBER(*) AS 
w0$o0 ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[id, ts, v, $3, 
w0$o0])(reuse_id=[1])
+- Exchange(distribution=[forward])
   +- Sort(orderBy=[id ASC, $3 ASC])
      +- Exchange(distribution=[keep_input_as_is[hash[id]]])
         +- Calc(select=[id, ts, v, PROCTIME() AS $3])
            +- Exchange(distribution=[hash[id]])
               +- TableSourceScan(table=[[default_catalog, default_database, 
source]], fields=[id, ts, v])

Sink(table=[default_catalog.default_database.print1], fields=[id, ts])
+- Calc(select=[id, ts], where=[(w0$o0 = 1)])
   +- Reused(reference_id=[1])

Sink(table=[default_catalog.default_database.print2], fields=[id, EXPR$1, 
EXPR$2])
+- Calc(select=[id, w$start AS EXPR$1, EXPR$2])
   +- HashWindowAggregate(groupBy=[id], window=[TumblingGroupWindow('w$, ts, 
20000)], properties=[w$start, w$end, w$rowtime], select=[id, SUM(v) AS EXPR$2])
      +- Exchange(distribution=[keep_input_as_is[hash[id]]])
         +- Calc(select=[ts, id, v], where=[(w0$o0 = 1)])
            +- Exchange(distribution=[keep_input_as_is[hash[ts]]])
               +- Reused(reference_id=[1])

 

 

 

> Public operators cannot be reused in multi sinks
> ------------------------------------------------
>
>                 Key: FLINK-33670
>                 URL: https://issues.apache.org/jira/browse/FLINK-33670
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / Planner
>    Affects Versions: 1.18.0
>            Reporter: Lyn Zhang
>            Priority: Major
>         Attachments: image-2023-11-28-14-31-30-153.png
>
>
> Dear all:
>    I find that some public operators cannot be reused when submit a job with 
> multi sinks. I have an example as follows:
> {code:java}
> CREATE TABLE source (
>     id              STRING,
>     ts              TIMESTAMP(3),
>     v              BIGINT,
>     WATERMARK FOR ts AS ts - INTERVAL '3' SECOND
> ) WITH (...);
> CREATE VIEW source_distinct AS
> SELECT * FROM (
>     SELECT *, ROW_NUMBER() OVER w AS row_nu
>     FROM source
>     WINDOW w AS (PARTITION BY id ORDER BY proctime() ASC)
> ) WHERE row_nu = 1;
> CREATE TABLE print1 (
>      id             STRING,
>      ts             TIMESTAMP(3)
> ) WITH('connector' = 'blackhole');
> INSERT INTO print1 SELECT id, ts FROM source_distinct;
> CREATE TABLE print2 (
>      id              STRING,
>      ts              TIMESTAMP(3),
>      v               BIGINT
> ) WITH('connector' = 'blackhole');
> INSERT INTO print2
> SELECT id, TUMBLE_START(ts, INTERVAL '20' SECOND), SUM(v)
> FROM source_distinct
> GROUP BY TUMBLE(ts, INTERVAL '20' SECOND), id; {code}
> !image-2023-11-28-14-31-30-153.png|width=384,height=145!
> I try to check the code, Flink add the rule of CoreRules.PROJECT_MERGE by 
> default, This will create different digests of  the deduplicate operator and 
> finally fail to match same sub plan.
> In real production environment, Reuse same sub plan like deduplicate is more 
> worthy than project merge. A good solution is to interrupt the project merge 
> crossing shuffle operators in multi sinks cases. 
> How did you consider it? Looking forward to your reply.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to