[
https://issues.apache.org/jira/browse/FLINK-33670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17793232#comment-17793232
]
Jeyhun Karimov commented on FLINK-33670:
----------------------------------------
Hi [~zicat] I tried with Flink master branch, below is the plan I get for your
case. Looking at the Optimized execution plan, it seems the deduplicate part
(reuse_id=1) is reused between the two sinks. Do you confirm?
== Abstract Syntax Tree ==
LogicalSink(table=[default_catalog.default_database.print1], fields=[id, ts])
+- LogicalProject(id=[$0], ts=[$1])
+- LogicalProject(id=[$0], ts=[$1], v=[$2], row_nu=[$3])
+- LogicalFilter(condition=[=($3, 1)])
+- LogicalProject(id=[$0], ts=[$1], v=[$2], row_nu=[ROW_NUMBER() OVER
(PARTITION BY $0 ORDER BY PROCTIME() NULLS FIRST)])
+- LogicalTableScan(table=[[default_catalog, default_database,
source]])
LogicalSink(table=[default_catalog.default_database.print2], fields=[id,
EXPR$1, EXPR$2])
+- LogicalProject(id=[$1], EXPR$1=[TUMBLE_START($0)], EXPR$2=[$2])
+- LogicalAggregate(group=[\{0, 1}], EXPR$2=[SUM($2)])
+- LogicalProject($f0=[$TUMBLE($1, 20000:INTERVAL SECOND)], id=[$0],
v=[$2])
+- LogicalProject(id=[$0], ts=[$1], v=[$2], row_nu=[$3])
+- LogicalFilter(condition=[=($3, 1)])
+- LogicalProject(id=[$0], ts=[$1], v=[$2], row_nu=[ROW_NUMBER()
OVER (PARTITION BY $0 ORDER BY PROCTIME() NULLS FIRST)])
+- LogicalTableScan(table=[[default_catalog,
default_database, source]])
== Optimized Physical Plan ==
Sink(table=[default_catalog.default_database.print1], fields=[id, ts])
+- Calc(select=[id, ts], where=[=(w0$o0, 1)])
+- OverAggregate(partitionBy=[id], orderBy=[$3 ASC], window#0=[ROW_NUMBER(*)
AS w0$o0 ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[id, ts, v,
$3, w0$o0])
+- Sort(orderBy=[id ASC, $3 ASC])
+- Calc(select=[id, ts, v, PROCTIME() AS $3])
+- Exchange(distribution=[hash[id]])
+- TableSourceScan(table=[[default_catalog, default_database,
source]], fields=[id, ts, v])
Sink(table=[default_catalog.default_database.print2], fields=[id, EXPR$1,
EXPR$2])
+- Calc(select=[id, w$start AS EXPR$1, EXPR$2])
+- HashWindowAggregate(groupBy=[id], window=[TumblingGroupWindow('w$, ts,
20000)], properties=[w$start, w$end, w$rowtime], select=[id, SUM(v) AS EXPR$2])
+- Calc(select=[ts, id, v], where=[=(w0$o0, 1)])
+- OverAggregate(partitionBy=[id], orderBy=[$3 ASC],
window#0=[ROW_NUMBER(*) AS w0$o0 ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT
ROW], select=[id, ts, v, $3, w0$o0])
+- Sort(orderBy=[id ASC, $3 ASC])
+- Calc(select=[id, ts, v, PROCTIME() AS $3])
+- Exchange(distribution=[hash[id]])
+- TableSourceScan(table=[[default_catalog,
default_database, source]], fields=[id, ts, v])
== Optimized Execution Plan ==
OverAggregate(partitionBy=[id], orderBy=[$3 ASC], window#0=[ROW_NUMBER(*) AS
w0$o0 ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[id, ts, v, $3,
w0$o0])(reuse_id=[1])
+- Exchange(distribution=[forward])
+- Sort(orderBy=[id ASC, $3 ASC])
+- Exchange(distribution=[keep_input_as_is[hash[id]]])
+- Calc(select=[id, ts, v, PROCTIME() AS $3])
+- Exchange(distribution=[hash[id]])
+- TableSourceScan(table=[[default_catalog, default_database,
source]], fields=[id, ts, v])
Sink(table=[default_catalog.default_database.print1], fields=[id, ts])
+- Calc(select=[id, ts], where=[(w0$o0 = 1)])
+- Reused(reference_id=[1])
Sink(table=[default_catalog.default_database.print2], fields=[id, EXPR$1,
EXPR$2])
+- Calc(select=[id, w$start AS EXPR$1, EXPR$2])
+- HashWindowAggregate(groupBy=[id], window=[TumblingGroupWindow('w$, ts,
20000)], properties=[w$start, w$end, w$rowtime], select=[id, SUM(v) AS EXPR$2])
+- Exchange(distribution=[keep_input_as_is[hash[id]]])
+- Calc(select=[ts, id, v], where=[(w0$o0 = 1)])
+- Exchange(distribution=[keep_input_as_is[hash[ts]]])
+- Reused(reference_id=[1])
> Public operators cannot be reused in multi sinks
> ------------------------------------------------
>
> Key: FLINK-33670
> URL: https://issues.apache.org/jira/browse/FLINK-33670
> Project: Flink
> Issue Type: Improvement
> Components: Table SQL / Planner
> Affects Versions: 1.18.0
> Reporter: Lyn Zhang
> Priority: Major
> Attachments: image-2023-11-28-14-31-30-153.png
>
>
> Dear all:
> I find that some public operators cannot be reused when submit a job with
> multi sinks. I have an example as follows:
> {code:java}
> CREATE TABLE source (
> id STRING,
> ts TIMESTAMP(3),
> v BIGINT,
> WATERMARK FOR ts AS ts - INTERVAL '3' SECOND
> ) WITH (...);
> CREATE VIEW source_distinct AS
> SELECT * FROM (
> SELECT *, ROW_NUMBER() OVER w AS row_nu
> FROM source
> WINDOW w AS (PARTITION BY id ORDER BY proctime() ASC)
> ) WHERE row_nu = 1;
> CREATE TABLE print1 (
> id STRING,
> ts TIMESTAMP(3)
> ) WITH('connector' = 'blackhole');
> INSERT INTO print1 SELECT id, ts FROM source_distinct;
> CREATE TABLE print2 (
> id STRING,
> ts TIMESTAMP(3),
> v BIGINT
> ) WITH('connector' = 'blackhole');
> INSERT INTO print2
> SELECT id, TUMBLE_START(ts, INTERVAL '20' SECOND), SUM(v)
> FROM source_distinct
> GROUP BY TUMBLE(ts, INTERVAL '20' SECOND), id; {code}
> !image-2023-11-28-14-31-30-153.png|width=384,height=145!
> I try to check the code, Flink add the rule of CoreRules.PROJECT_MERGE by
> default, This will create different digests of the deduplicate operator and
> finally fail to match same sub plan.
> In real production environment, Reuse same sub plan like deduplicate is more
> worthy than project merge. A good solution is to interrupt the project merge
> crossing shuffle operators in multi sinks cases.
> How did you consider it? Looking forward to your reply.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)