beyond1920 commented on a change in pull request #15997: URL: https://github.com/apache/flink/pull/15997#discussion_r640267297
########## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRuleTest.scala ########## @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.physical.stream + +import org.apache.flink.table.api.ExplainDetail +import org.apache.flink.table.planner.utils.{StreamTableTestUtil, TableTestBase} + +import org.junit.{Before, Test} + +/** + * Tests for [[WatermarkAssignerChangelogNormalizeTransposeRule]] + */ +class WatermarkAssignerChangelogNormalizeTransposeRuleTest extends TableTestBase { + private val util: StreamTableTestUtil = streamTestUtil() + + @Before + def setup(): Unit = { + util.addTable( + s""" + |CREATE TABLE simple_src ( + | currency STRING, + | currency_no STRING, + | rate BIGINT, + | currency_time TIMESTAMP(3), + | WATERMARK FOR currency_time AS currency_time - interval '5' SECOND, + | PRIMARY KEY(currency) NOT ENFORCED + |) WITH ( + | 'connector' = 'values', + | 'changelog-mode' = 'UA,D', + | 'enable-watermark-push-down' = 'true' + |) + |""".stripMargin) + + util.addTable( + s""" + |CREATE TABLE src_with_computed_column ( + | currency STRING, + | currency_no STRING, + | rate BIGINT, + | c STRING, + | currency_time as to_timestamp(c), + | WATERMARK FOR currency_time AS currency_time - interval '5' SECOND, + | PRIMARY KEY(currency) NOT ENFORCED + |) WITH ( + | 'connector' = 'values', + | 'changelog-mode' = 'UA,D', + | 'enable-watermark-push-down' = 'true' + |) + |""".stripMargin) + } + + // ---------------------------------------------------------------------------------------- + // Tests for queries matches WITHOUT_CALC patten + // Rewrite always happens in the case + // ---------------------------------------------------------------------------------------- + @Test + def testPushdownWatermarkWithoutCalc(): Unit = { + val sql = + """ + |SELECT + |currency, + |COUNT(1) AS cnt, + |TUMBLE_START(currency_time, INTERVAL '5' SECOND) as w_start, + |TUMBLE_END(currency_time, INTERVAL '5' SECOND) as w_end + |FROM simple_src + |GROUP BY currency, TUMBLE(currency_time, INTERVAL '5' SECOND) + |""".stripMargin + util.verifyRelPlan(sql, ExplainDetail.CHANGELOG_MODE) + } + + // ---------------------------------------------------------------------------------------- + // Tests for queries matches WITH_CALC patten + // ---------------------------------------------------------------------------------------- + + /** push down calc and watermark assigner as a whole if shuffle keys are kept after Calc. */ + @Test + def testPushdownCalcAndWatermarkAssignerWithCalc(): Unit = { + val sql = + """ + |SELECT + |currency, + |COUNT(1) AS cnt, + |TUMBLE_START(currency_time, INTERVAL '5' SECOND) as w_start, + |TUMBLE_END(currency_time, INTERVAL '5' SECOND) as w_end + |FROM src_with_computed_column + |GROUP BY currency, TUMBLE(currency_time, INTERVAL '5' SECOND) + |""".stripMargin + util.verifyRelPlan(sql, ExplainDetail.CHANGELOG_MODE) + } + + /** only push down watermark assigner if satisfy all the following condition: + * 1. shuffle keys are not kept after Calc + * 2. row time field does not depend on computed column + */ + @Test + def testPushdownWatermarkAssignerWithCalc(): Unit = { + val sql = + """ + |SELECT + |TUMBLE_START(currency_time, INTERVAL '5' SECOND) as w_start, + |TUMBLE_END(currency_time, INTERVAL '5' SECOND) as w_end, + |MAX(rate) AS max_rate + |FROM simple_src + |GROUP BY TUMBLE(currency_time, INTERVAL '5' SECOND) + |""".stripMargin + util.verifyRelPlan(sql, ExplainDetail.CHANGELOG_MODE) + } + + /** + * rewrites has no effect if does not satisfy any of the following condition: + * 1. shuffle keys are all kept after Calc + * 2. row time field does not depend on computed column + */ + @Test + def testNothingHappenWithCalc(): Unit = { + val sql = + """ + |SELECT + |TUMBLE_START(currency_time, INTERVAL '5' SECOND) as w_start, + |TUMBLE_END(currency_time, INTERVAL '5' SECOND) as w_end, + |MAX(rate) AS max_rate + |FROM src_with_computed_column + |GROUP BY TUMBLE(currency_time, INTERVAL '5' SECOND) + |""".stripMargin + util.verifyRelPlan(sql, ExplainDetail.CHANGELOG_MODE) + } +} Review comment: good point ########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRule.java ########## @@ -67,18 +84,96 @@ public void onMatch(RelOptRuleCall call) { final StreamPhysicalCalc calc = call.rel(1); final StreamPhysicalChangelogNormalize changelogNormalize = call.rel(2); final StreamPhysicalExchange exchange = call.rel(3); - - final RelNode newTree = - buildTreeInOrder( - changelogNormalize, exchange, watermark, calc, exchange.getInput()); - call.transformTo(newTree); + final Mappings.TargetMapping calcMapping = buildCalcMapping(calc); Review comment: good point -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
