[
https://issues.apache.org/jira/browse/FLINK-20289?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Flink Jira Bot updated FLINK-20289:
-----------------------------------
Labels: auto-deprioritized-major stale-minor (was:
auto-deprioritized-major)
I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help
the community manage its development. I see this issues has been marked as
Minor but is unassigned and neither itself nor its Sub-Tasks have been updated
for 180 days. I have gone ahead and marked it "stale-minor". If this ticket is
still Minor, please either assign yourself or give an update. Afterwards,
please remove the label or in 7 days the issue will be deprioritized.
> Computed columns can be calculated after ChangelogNormalize to reduce shuffle
> -----------------------------------------------------------------------------
>
> Key: FLINK-20289
> URL: https://issues.apache.org/jira/browse/FLINK-20289
> Project: Flink
> Issue Type: Improvement
> Components: Table SQL / Planner
> Reporter: Jark Wu
> Priority: Minor
> Labels: auto-deprioritized-major, stale-minor
>
> In FLINK-19878, we improved that the ChangelogNormalize is applied after
> WatermarkAssigner to make the watermark to be close to the source. This helps
> the watermark to be more fine-grained.
> However, in some cases, this may shuffle more data, because we may apply all
> computed column expressions before ChangelogNormalize. As follows, {{a+1}}
> can be applied after ChangelogNormalize to reduce the shuffles.
> {code:sql}
> CREATE TABLE src (
> id STRING,
> a INT,
> b AS a + 1,
> c STRING,
> ts as to_timestamp(c),
> PRIMARY KEY (id) NOT ENFORCED,
> WATERMARK FOR ts AS ts - INTERVAL '1' SECOND
> ) WITH (
> 'connector' = 'values',
> 'changelog-mode' = 'UA,D'
> );
> SELECT a, b, c FROM src WHERE a > 1
> {code}
> {code}
> Calc(select=[a, b, c], where=[>(a, 1)], changelogMode=[I,UB,UA,D])
> +- ChangelogNormalize(key=[id], changelogMode=[I,UB,UA,D])
> +- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
> +- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL
> SECOND)], changelogMode=[UA,D])
> +- Calc(select=[id, a, +(a, 1) AS b, c, TO_TIMESTAMP(c) AS ts],
> changelogMode=[UA,D])
> +- TableSourceScan(table=[[default_catalog, default_database,
> src]], fields=[id, a, c], changelogMode=[UA,D])
> {code}
> A better plan should be:
> {code}
> Calc(select=[a, +(a, 1) AS b, c], where=[>(a, 1)], changelogMode=[I,UB,UA,D])
> +- ChangelogNormalize(key=[id], changelogMode=[I,UB,UA,D])
> +- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
> +- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL
> SECOND)], changelogMode=[UA,D])
> +- Calc(select=[id, a, c, TO_TIMESTAMP(c) AS ts],
> changelogMode=[UA,D])
> +- TableSourceScan(table=[[default_catalog, default_database,
> src]], fields=[id, a, c], changelogMode=[UA,D])
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)