[ 
https://issues.apache.org/jira/browse/FLINK-33670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17791426#comment-17791426
 ] 

Lyn Zhang commented on FLINK-33670:
-----------------------------------

Making "temporary view" as a real "common table expression" in the whole 
optimization process is a perfect solution to resolve this issue. It's can be 
controlled by sql developers.

I would like to know which stage the CTE feature is currently in.

> Public operators cannot be reused in multi sinks
> ------------------------------------------------
>
>                 Key: FLINK-33670
>                 URL: https://issues.apache.org/jira/browse/FLINK-33670
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / Planner
>    Affects Versions: 1.18.0
>            Reporter: Lyn Zhang
>            Priority: Major
>         Attachments: image-2023-11-28-14-31-30-153.png
>
>
> Dear all:
>    I find that some public operators cannot be reused when submit a job with 
> multi sinks. I have an example as follows:
> {code:java}
> CREATE TABLE source (
>     id              STRING,
>     ts              TIMESTAMP(3),
>     v              BIGINT,
>     WATERMARK FOR ts AS ts - INTERVAL '3' SECOND
> ) WITH (...);
> CREATE VIEW source_distinct AS
> SELECT * FROM (
>     SELECT *, ROW_NUMBER() OVER w AS row_nu
>     FROM source
>     WINDOW w AS (PARTITION BY id ORDER BY proctime() ASC)
> ) WHERE row_nu = 1;
> CREATE TABLE print1 (
>      id             STRING,
>      ts             TIMESTAMP(3)
> ) WITH('connector' = 'blackhole');
> INSERT INTO print1 SELECT id, ts FROM source_distinct;
> CREATE TABLE print2 (
>      id              STRING,
>      ts              TIMESTAMP(3),
>      v               BIGINT
> ) WITH('connector' = 'blackhole');
> INSERT INTO print2
> SELECT id, TUMBLE_START(ts, INTERVAL '20' SECOND), SUM(v)
> FROM source_distinct
> GROUP BY TUMBLE(ts, INTERVAL '20' SECOND), id; {code}
> !image-2023-11-28-14-31-30-153.png|width=384,height=145!
> I try to check the code, Flink add the rule of CoreRules.PROJECT_MERGE by 
> default, This will create different digests of  the deduplicate operator and 
> finally fail to match same sub plan.
> In real production environment, Reuse same sub plan like deduplicate is more 
> worthy than project merge. A good solution is to interrupt the project merge 
> crossing shuffle operators in multi sinks cases. 
> How did you consider it? Looking forward to your reply.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to