godfreyhe commented on a change in pull request #15997: URL: https://github.com/apache/flink/pull/15997#discussion_r640260862
########## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRuleTest.scala ########## @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.physical.stream + +import org.apache.flink.table.api.ExplainDetail +import org.apache.flink.table.planner.utils.{StreamTableTestUtil, TableTestBase} + +import org.junit.{Before, Test} + +/** + * Tests for [[WatermarkAssignerChangelogNormalizeTransposeRule]] + */ +class WatermarkAssignerChangelogNormalizeTransposeRuleTest extends TableTestBase { + private val util: StreamTableTestUtil = streamTestUtil() + + @Before + def setup(): Unit = { + util.addTable( + s""" + |CREATE TABLE simple_src ( + | currency STRING, + | currency_no STRING, + | rate BIGINT, + | currency_time TIMESTAMP(3), + | WATERMARK FOR currency_time AS currency_time - interval '5' SECOND, + | PRIMARY KEY(currency) NOT ENFORCED + |) WITH ( + | 'connector' = 'values', + | 'changelog-mode' = 'UA,D', + | 'enable-watermark-push-down' = 'true' + |) + |""".stripMargin) + + util.addTable( + s""" + |CREATE TABLE src_with_computed_column ( + | currency STRING, + | currency_no STRING, + | rate BIGINT, + | c STRING, + | currency_time as to_timestamp(c), + | WATERMARK FOR currency_time AS currency_time - interval '5' SECOND, + | PRIMARY KEY(currency) NOT ENFORCED + |) WITH ( + | 'connector' = 'values', + | 'changelog-mode' = 'UA,D', + | 'enable-watermark-push-down' = 'true' + |) + |""".stripMargin) + } + + // ---------------------------------------------------------------------------------------- + // Tests for queries matches WITHOUT_CALC patten + // Rewrite always happens in the case + // ---------------------------------------------------------------------------------------- + @Test + def testPushdownWatermarkWithoutCalc(): Unit = { + val sql = + """ + |SELECT + |currency, + |COUNT(1) AS cnt, + |TUMBLE_START(currency_time, INTERVAL '5' SECOND) as w_start, + |TUMBLE_END(currency_time, INTERVAL '5' SECOND) as w_end + |FROM simple_src + |GROUP BY currency, TUMBLE(currency_time, INTERVAL '5' SECOND) + |""".stripMargin + util.verifyRelPlan(sql, ExplainDetail.CHANGELOG_MODE) + } + + // ---------------------------------------------------------------------------------------- + // Tests for queries matches WITH_CALC patten + // ---------------------------------------------------------------------------------------- + + /** push down calc and watermark assigner as a whole if shuffle keys are kept after Calc. */ + @Test + def testPushdownCalcAndWatermarkAssignerWithCalc(): Unit = { + val sql = + """ + |SELECT + |currency, + |COUNT(1) AS cnt, + |TUMBLE_START(currency_time, INTERVAL '5' SECOND) as w_start, + |TUMBLE_END(currency_time, INTERVAL '5' SECOND) as w_end + |FROM src_with_computed_column + |GROUP BY currency, TUMBLE(currency_time, INTERVAL '5' SECOND) + |""".stripMargin + util.verifyRelPlan(sql, ExplainDetail.CHANGELOG_MODE) + } + + /** only push down watermark assigner if satisfy all the following condition: + * 1. shuffle keys are not kept after Calc + * 2. row time field does not depend on computed column + */ + @Test + def testPushdownWatermarkAssignerWithCalc(): Unit = { + val sql = + """ + |SELECT + |TUMBLE_START(currency_time, INTERVAL '5' SECOND) as w_start, + |TUMBLE_END(currency_time, INTERVAL '5' SECOND) as w_end, + |MAX(rate) AS max_rate + |FROM simple_src + |GROUP BY TUMBLE(currency_time, INTERVAL '5' SECOND) + |""".stripMargin + util.verifyRelPlan(sql, ExplainDetail.CHANGELOG_MODE) + } + + /** + * rewrites has no effect if does not satisfy any of the following condition: + * 1. shuffle keys are all kept after Calc + * 2. row time field does not depend on computed column + */ + @Test + def testNothingHappenWithCalc(): Unit = { + val sql = + """ + |SELECT + |TUMBLE_START(currency_time, INTERVAL '5' SECOND) as w_start, + |TUMBLE_END(currency_time, INTERVAL '5' SECOND) as w_end, + |MAX(rate) AS max_rate + |FROM src_with_computed_column + |GROUP BY TUMBLE(currency_time, INTERVAL '5' SECOND) + |""".stripMargin + util.verifyRelPlan(sql, ExplainDetail.CHANGELOG_MODE) + } +} Review comment: It's better to add a test which group key is from computed column, such as ` util.addTable( s""" |CREATE TABLE src_with_computed_column2 ( | currency int, | currency2 as currency + 2, | currency_no STRING, | rate BIGINT, | c STRING, | currency_time as to_timestamp(c), | WATERMARK FOR currency_time AS currency_time - interval '5' SECOND, | PRIMARY KEY(currency) NOT ENFORCED |) WITH ( | 'connector' = 'values', | 'changelog-mode' = 'UA,D', | 'enable-watermark-push-down' = 'true' |) |""".stripMargin) val sql = """ |SELECT |currency2, |COUNT(1) AS cnt, |TUMBLE_START(currency_time, INTERVAL '5' SECOND) as w_start, |TUMBLE_END(currency_time, INTERVAL '5' SECOND) as w_end |FROM src_with_computed_column2 |GROUP BY currency2, TUMBLE(currency_time, INTERVAL '5' SECOND) |""".stripMargin util.verifyRelPlan(sql, ExplainDetail.CHANGELOG_MODE) ` ########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRule.java ########## @@ -67,18 +84,96 @@ public void onMatch(RelOptRuleCall call) { final StreamPhysicalCalc calc = call.rel(1); final StreamPhysicalChangelogNormalize changelogNormalize = call.rel(2); final StreamPhysicalExchange exchange = call.rel(3); - - final RelNode newTree = - buildTreeInOrder( - changelogNormalize, exchange, watermark, calc, exchange.getInput()); - call.transformTo(newTree); + final Mappings.TargetMapping calcMapping = buildCalcMapping(calc); Review comment: the added code can be extracted into a new method. then the `onMatch` method will be more clean. ########## File path: flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRuleTest.xml ########## @@ -0,0 +1,154 @@ +<?xml version="1.0" ?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to you under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> +<Root> + <TestCase name="testNothingHappenWithCalc"> + <Resource name="sql"> + <![CDATA[ +SELECT +TUMBLE_START(currency_time, INTERVAL '5' SECOND) as w_start, +TUMBLE_END(currency_time, INTERVAL '5' SECOND) as w_end, +MAX(rate) AS max_rate +FROM src_with_computed_column +GROUP BY TUMBLE(currency_time, INTERVAL '5' SECOND) +]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(w_start=[TUMBLE_START($0)], w_end=[TUMBLE_END($0)], max_rate=[$1]) ++- LogicalAggregate(group=[{0}], max_rate=[MAX($1)]) + +- LogicalProject($f0=[$TUMBLE($4, 5000:INTERVAL SECOND)], rate=[$2]) + +- LogicalWatermarkAssigner(rowtime=[currency_time], watermark=[-($4, 5000:INTERVAL SECOND)]) + +- LogicalProject(currency=[$0], currency_no=[$1], rate=[$2], c=[$3], currency_time=[TO_TIMESTAMP($3)]) + +- LogicalTableScan(table=[[default_catalog, default_database, src_with_computed_column]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +Calc(select=[w$start AS w_start, w$end AS w_end, max_rate], changelogMode=[I]) ++- GroupWindowAggregate(window=[TumblingGroupWindow('w$, currency_time, 5000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[MAX(rate) AS max_rate, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime], changelogMode=[I]) + +- Exchange(distribution=[single], changelogMode=[I,UB,UA,D]) + +- WatermarkAssigner(rowtime=[currency_time], watermark=[-(currency_time, 5000:INTERVAL SECOND)], changelogMode=[I,UB,UA,D]) Review comment: watermark can always be pushed down closed to TableSourceScan (may be with Calc). then PushWatermarkIntoTableSourceScanRule can be applied. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
