godfreyhe commented on a change in pull request #13449:
URL: https://github.com/apache/flink/pull/13449#discussion_r497224466
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExpressionReducer.scala
##########
@@ -249,13 +249,15 @@ class ExpressionReducer(
/**
* Constant expression code generator context.
*/
-class ConstantCodeGeneratorContext(tableConfig: TableConfig)
+class ConstantCodeGeneratorContext(
+ tableConfig: TableConfig,
+ contextTerm: String = "parameters")
Review comment:
indent
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenerator.scala
##########
@@ -18,14 +18,16 @@
package org.apache.flink.table.planner.codegen
+import org.apache.calcite.rex.RexNode
Review comment:
nit: reorder imports
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenerator.scala
##########
@@ -44,11 +47,33 @@ object WatermarkGeneratorCodeGenerator {
" but is " + watermarkOutputType)
}
val funcName = newName("WatermarkGenerator")
- val ctx = CodeGeneratorContext(config)
+ val ctx = if (contextTerm != null) {
+ new ConstantCodeGeneratorContext(config, contextTerm)
Review comment:
why we use `ConstantCodeGeneratorContext` here ?
`ConstantCodeGeneratorContext` is used for constant reducer, we should create a
new special CodeGeneratorContext for watermark generator
##########
File path:
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRule.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import
org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.codegen.WatermarkGeneratorCodeGenerator;
+import
org.apache.flink.table.planner.plan.nodes.calcite.LogicalWatermarkAssigner;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.runtime.generated.GeneratedWatermarkGenerator;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Rule for PushWatermarkIntoTableSourceScan.
Review comment:
please add some comments to explain the purpose of this rule
##########
File path:
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRule.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import
org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.codegen.WatermarkGeneratorCodeGenerator;
+import
org.apache.flink.table.planner.plan.nodes.calcite.LogicalWatermarkAssigner;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.runtime.generated.GeneratedWatermarkGenerator;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Rule for PushWatermarkIntoTableSourceScan.
+ * */
+public class PushWatermarkIntoTableSourceScanRule extends RelOptRule {
+ public static final PushWatermarkIntoTableSourceScanRule INSTANCE = new
PushWatermarkIntoTableSourceScanRule();
+
+ public PushWatermarkIntoTableSourceScanRule() {
+ super(operand(LogicalWatermarkAssigner.class,
+ operand(LogicalTableScan.class, none())),
+ "PushWatermarkIntoTableSourceScanRule");
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ LogicalTableScan scan = call.rel(1);
+ TableSourceTable tableSourceTable =
scan.getTable().unwrap(TableSourceTable.class);
+ return tableSourceTable != null &&
tableSourceTable.tableSource() instanceof SupportsWatermarkPushDown;
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ LogicalWatermarkAssigner watermarkAssigner = call.rel(0);
+ LogicalTableScan scan = call.rel(1);
+ FlinkContext context = (FlinkContext)
call.getPlanner().getContext();
+ TableConfig config = context.getTableConfig();
+
+ // generate an inner watermark generator class that allows us
to pass FunctionContext and ClassLoader
+ GeneratedWatermarkGenerator generatedWatermarkGenerator =
+
WatermarkGeneratorCodeGenerator.generateWatermarkGenerator(
+ config,
+
FlinkTypeFactory.toLogicalRowType(scan.getRowType()),
+
watermarkAssigner.watermarkExpr(),
+ "context");
+ Configuration configuration =
context.getTableConfig().getConfiguration();
+
+ WatermarkGeneratorSupplier<RowData> supplier = new
DefaultWatermarkGeneratorSupplier(configuration, generatedWatermarkGenerator);
+ String digest = String.format("watermark=[%s]",
watermarkAssigner.watermarkExpr());
+
+ WatermarkStrategy<RowData> watermarkStrategy =
WatermarkStrategy.forGenerator(supplier);
+ Duration idleTimeout =
configuration.get(ExecutionConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT);
+ if (!idleTimeout.isZero() && !idleTimeout.isNegative()) {
+ watermarkStrategy.withIdleness(idleTimeout);
+ digest = String.format("%s idletimeout=[%s]", digest,
idleTimeout.toMillis());
+ }
+
+ TableSourceTable tableSourceTable =
scan.getTable().unwrap(TableSourceTable.class);
+ DynamicTableSource newDynamicTableSource =
tableSourceTable.tableSource().copy();
+
+ ((SupportsWatermarkPushDown)
newDynamicTableSource).applyWatermark(watermarkStrategy);
+
+ TableSourceTable newTableSourceTable = tableSourceTable.copy(
+ newDynamicTableSource,
+ watermarkAssigner.getRowType(),
+ new String[]{digest});
+ LogicalTableScan newScan = new LogicalTableScan(
+ scan.getCluster(), scan.getTraitSet(),
scan.getHints(), newTableSourceTable);
+
+ call.transformTo(newScan);
+ }
+
+ private static class DefaultWatermarkGeneratorSupplier implements
WatermarkGeneratorSupplier<RowData> {
Review comment:
add `serialVersionUID` field
##########
File path:
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRule.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import
org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.codegen.WatermarkGeneratorCodeGenerator;
+import
org.apache.flink.table.planner.plan.nodes.calcite.LogicalWatermarkAssigner;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.runtime.generated.GeneratedWatermarkGenerator;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Rule for PushWatermarkIntoTableSourceScan.
+ * */
+public class PushWatermarkIntoTableSourceScanRule extends RelOptRule {
+ public static final PushWatermarkIntoTableSourceScanRule INSTANCE = new
PushWatermarkIntoTableSourceScanRule();
+
+ public PushWatermarkIntoTableSourceScanRule() {
+ super(operand(LogicalWatermarkAssigner.class,
+ operand(LogicalTableScan.class, none())),
+ "PushWatermarkIntoTableSourceScanRule");
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ LogicalTableScan scan = call.rel(1);
+ TableSourceTable tableSourceTable =
scan.getTable().unwrap(TableSourceTable.class);
+ return tableSourceTable != null &&
tableSourceTable.tableSource() instanceof SupportsWatermarkPushDown;
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ LogicalWatermarkAssigner watermarkAssigner = call.rel(0);
+ LogicalTableScan scan = call.rel(1);
+ FlinkContext context = (FlinkContext)
call.getPlanner().getContext();
+ TableConfig config = context.getTableConfig();
+
+ // generate an inner watermark generator class that allows us
to pass FunctionContext and ClassLoader
+ GeneratedWatermarkGenerator generatedWatermarkGenerator =
+
WatermarkGeneratorCodeGenerator.generateWatermarkGenerator(
+ config,
+
FlinkTypeFactory.toLogicalRowType(scan.getRowType()),
+
watermarkAssigner.watermarkExpr(),
+ "context");
+ Configuration configuration =
context.getTableConfig().getConfiguration();
+
+ WatermarkGeneratorSupplier<RowData> supplier = new
DefaultWatermarkGeneratorSupplier(configuration, generatedWatermarkGenerator);
+ String digest = String.format("watermark=[%s]",
watermarkAssigner.watermarkExpr());
+
+ WatermarkStrategy<RowData> watermarkStrategy =
WatermarkStrategy.forGenerator(supplier);
+ Duration idleTimeout =
configuration.get(ExecutionConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT);
+ if (!idleTimeout.isZero() && !idleTimeout.isNegative()) {
+ watermarkStrategy.withIdleness(idleTimeout);
+ digest = String.format("%s idletimeout=[%s]", digest,
idleTimeout.toMillis());
+ }
+
+ TableSourceTable tableSourceTable =
scan.getTable().unwrap(TableSourceTable.class);
+ DynamicTableSource newDynamicTableSource =
tableSourceTable.tableSource().copy();
+
+ ((SupportsWatermarkPushDown)
newDynamicTableSource).applyWatermark(watermarkStrategy);
+
+ TableSourceTable newTableSourceTable = tableSourceTable.copy(
+ newDynamicTableSource,
+ watermarkAssigner.getRowType(),
+ new String[]{digest});
+ LogicalTableScan newScan = new LogicalTableScan(
+ scan.getCluster(), scan.getTraitSet(),
scan.getHints(), newTableSourceTable);
+
+ call.transformTo(newScan);
+ }
+
+ private static class DefaultWatermarkGeneratorSupplier implements
WatermarkGeneratorSupplier<RowData> {
+ private final Configuration configuration;
+ private final GeneratedWatermarkGenerator
generatedWatermarkGenerator;
+
+ public DefaultWatermarkGeneratorSupplier(Configuration
configuration, GeneratedWatermarkGenerator generatedWatermarkGenerator) {
+ this.configuration = configuration;
+ this.generatedWatermarkGenerator =
generatedWatermarkGenerator;
+ }
+
+ @Override
+ public WatermarkGenerator<RowData>
createWatermarkGenerator(Context context) {
+
+ List<Object> references = new
ArrayList<>(Arrays.asList(generatedWatermarkGenerator.getReferences()));
+ references.add(context);
+
+
org.apache.flink.table.runtime.generated.WatermarkGenerator
innerWatermarkGenerator =
+ new GeneratedWatermarkGenerator(
+
generatedWatermarkGenerator.getClassName(),
+
generatedWatermarkGenerator.getCode(),
+ references.toArray())
+
.newInstance(Thread.currentThread().getContextClassLoader());
+
+ try {
+ innerWatermarkGenerator.open(configuration);
+ } catch (Exception e) {
+ throw new RuntimeException("Fail to instantiate
generated watermark generator.", e);
+ }
+ return new
DefaultWatermarkGenerator(innerWatermarkGenerator);
+ }
+
+ private class DefaultWatermarkGenerator implements
WatermarkGenerator<RowData> {
Review comment:
add `static` identifier and add `serialVersionUID` field
##########
File path:
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRule.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import
org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.codegen.WatermarkGeneratorCodeGenerator;
+import
org.apache.flink.table.planner.plan.nodes.calcite.LogicalWatermarkAssigner;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.runtime.generated.GeneratedWatermarkGenerator;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Rule for PushWatermarkIntoTableSourceScan.
+ * */
+public class PushWatermarkIntoTableSourceScanRule extends RelOptRule {
+ public static final PushWatermarkIntoTableSourceScanRule INSTANCE = new
PushWatermarkIntoTableSourceScanRule();
+
+ public PushWatermarkIntoTableSourceScanRule() {
+ super(operand(LogicalWatermarkAssigner.class,
+ operand(LogicalTableScan.class, none())),
+ "PushWatermarkIntoTableSourceScanRule");
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ LogicalTableScan scan = call.rel(1);
+ TableSourceTable tableSourceTable =
scan.getTable().unwrap(TableSourceTable.class);
+ return tableSourceTable != null &&
tableSourceTable.tableSource() instanceof SupportsWatermarkPushDown;
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ LogicalWatermarkAssigner watermarkAssigner = call.rel(0);
+ LogicalTableScan scan = call.rel(1);
+ FlinkContext context = (FlinkContext)
call.getPlanner().getContext();
+ TableConfig config = context.getTableConfig();
+
+ // generate an inner watermark generator class that allows us
to pass FunctionContext and ClassLoader
+ GeneratedWatermarkGenerator generatedWatermarkGenerator =
+
WatermarkGeneratorCodeGenerator.generateWatermarkGenerator(
+ config,
+
FlinkTypeFactory.toLogicalRowType(scan.getRowType()),
+
watermarkAssigner.watermarkExpr(),
+ "context");
+ Configuration configuration =
context.getTableConfig().getConfiguration();
+
+ WatermarkGeneratorSupplier<RowData> supplier = new
DefaultWatermarkGeneratorSupplier(configuration, generatedWatermarkGenerator);
+ String digest = String.format("watermark=[%s]",
watermarkAssigner.watermarkExpr());
+
+ WatermarkStrategy<RowData> watermarkStrategy =
WatermarkStrategy.forGenerator(supplier);
+ Duration idleTimeout =
configuration.get(ExecutionConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT);
+ if (!idleTimeout.isZero() && !idleTimeout.isNegative()) {
+ watermarkStrategy.withIdleness(idleTimeout);
+ digest = String.format("%s idletimeout=[%s]", digest,
idleTimeout.toMillis());
+ }
+
+ TableSourceTable tableSourceTable =
scan.getTable().unwrap(TableSourceTable.class);
+ DynamicTableSource newDynamicTableSource =
tableSourceTable.tableSource().copy();
+
+ ((SupportsWatermarkPushDown)
newDynamicTableSource).applyWatermark(watermarkStrategy);
+
+ TableSourceTable newTableSourceTable = tableSourceTable.copy(
+ newDynamicTableSource,
+ watermarkAssigner.getRowType(),
+ new String[]{digest});
+ LogicalTableScan newScan = new LogicalTableScan(
Review comment:
nit: use `LogicalTableScan.create` instead
##########
File path:
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.java
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.planner.calcite.CalciteConfig;
+import
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgramBuilder;
+import org.apache.flink.table.planner.plan.optimize.program.FlinkStreamProgram;
+import
org.apache.flink.table.planner.plan.optimize.program.HEP_RULES_EXECUTION_TYPE;
+import
org.apache.flink.table.planner.plan.optimize.program.StreamOptimizeContext;
+import org.apache.flink.table.planner.utils.StreamTableTestUtil;
+import org.apache.flink.table.planner.utils.TableConfigUtils;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.apache.calcite.plan.hep.HepMatchOrder;
+import org.apache.calcite.tools.RuleSets;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+
+/**
+ * Test rule PushWatermarkIntoTableSourceScanRule.
+ * */
+public class PushWatermarkIntoTableSourceScanRuleTest extends TableTestBase {
+ private StreamTableTestUtil util = streamTestUtil(new TableConfig());
+
+ @Before
+ public void setup() {
+ util.buildStreamProgram(FlinkStreamProgram.DEFAULT_REWRITE());
+ CalciteConfig calciteConfig =
TableConfigUtils.getCalciteConfig(util.tableEnv().getConfig());
+ calciteConfig.getStreamProgram().get().addLast(
+ "PushWatermarkIntoTableSourceScanRule",
+
FlinkHepRuleSetProgramBuilder.<StreamOptimizeContext>newBuilder()
+
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE())
+
.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+ .add(RuleSets.ofList(
+
WatermarkAssignerProjectTransposeRule.INSTANCE,
+
PushWatermarkIntoTableSourceScanRule.INSTANCE))
+ .build()
+ );
+ }
+
+ @Test
+ public void testSimpleWatermark() {
+ String ddl = "create table MyTable(" +
+ " a int,\n" +
+ " b bigint,\n" +
+ " c timestamp(3),\n" +
+ " watermark for c as c - interval '5'
second\n" +
+ ") WITH (\n" +
+ " 'connector' = 'values',\n" +
+ " 'table-source-class' =
'WATERMARK_PUSH_DOWN',\n" +
+ " 'bounded' = 'false'\n" +
+ ")";
+ util.tableEnv().executeSql(ddl);
+ util.verifyPlan("select * from MyTable");
+ }
+
+ @Test
+ public void testSimpleTranspose() {
+ String ddl = "create table MyTable(" +
+ " a int,\n" +
+ " b bigint,\n" +
+ " c timestamp(3),\n" +
+ " d as c + interval '5' second,\n" +
+ " watermark for d as d - interval '5'
second\n" +
+ ") WITH (\n" +
+ " 'connector' = 'values',\n" +
+ " 'table-source-class' =
'WATERMARK_PUSH_DOWN',\n" +
+ " 'bounded' = 'false'\n" +
+ ")";
+ util.tableEnv().executeSql(ddl);
+ util.verifyPlan("select * from MyTable");
+ }
+
+ @Test
+ public void testSimpleTransposeNotNull() {
+ String ddl = "create table MyTable(" +
+ " a int,\n" +
+ " b bigint,\n" +
+ " c timestamp(3) not null,\n" +
+ " d as c + interval '5' second,\n" +
+ " watermark for d as d - interval '5'
second\n" +
+ ") WITH (\n" +
+ " 'connector' = 'values',\n" +
+ " 'table-source-class' =
'WATERMARK_PUSH_DOWN',\n" +
+ " 'bounded' = 'false'\n" +
+ ")";
+ util.tableEnv().executeSql(ddl);
+ util.verifyPlan("select * from MyTable");
+ }
+
+ @Test
+ public void testComputedColumnWithMultipleInputs() {
+ String ddl = "create table MyTable(" +
+ " a string,\n" +
+ " b string,\n" +
+ " c as to_timestamp(a, b),\n" +
+ " watermark for c as c - interval '5'
second\n" +
+ ") WITH (\n" +
+ " 'connector' = 'values',\n" +
+ " 'table-source-class' =
'WATERMARK_PUSH_DOWN',\n" +
+ " 'bounded' = 'false'\n" +
+ ")";
+ util.tableEnv().executeSql(ddl);
+ util.verifyPlan("select * from MyTable");
+ }
+
+ @Test
+ public void testTransposeWithRow() {
+ String ddl = "create table MyTable(" +
+ " a int,\n" +
+ " b bigint,\n" +
+ " c row<name string, d timestamp(3)>," +
+ " e as c.d," +
+ " watermark for e as e - interval '5'
second\n" +
+ ") WITH (\n" +
+ " 'connector' = 'values',\n" +
+ " 'table-source-class' =
'WATERMARK_PUSH_DOWN',\n" +
+ " 'bounded' = 'false'\n" +
+ ")";
+ util.tableEnv().executeSql(ddl);
+ util.verifyPlan("select * from MyTable");
+ }
+
+ @Test
+ public void testTransposeWithNestedRow() {
+ String ddl = "create table MyTable(" +
+ " a int,\n" +
+ " b bigint,\n" +
+ " c row<name string, d row<e string, f
timestamp(3)>>," +
+ " g as c.d.f," +
+ " watermark for g as g - interval '5'
second\n" +
+ ") WITH (\n" +
+ " 'connector' = 'values',\n" +
+ " 'table-source-class' =
'WATERMARK_PUSH_DOWN',\n" +
+ " 'bounded' = 'false'\n" +
+ ")";
+ util.tableEnv().executeSql(ddl);
+ util.verifyPlan("select * from MyTable");
+ }
+
+ @Test
+ public void testTransposeWithUdf() {
+ util.addFunction("func1", new InnerUdf());
+ String ddl = "create table MyTable(" +
+ " a int,\n" +
+ " b bigint,\n" +
+ " c timestamp(3)," +
+ " d as func1(c)," +
+ " watermark for d as d - interval '5'
second\n" +
+ ") WITH (\n" +
+ " 'connector' = 'values',\n" +
+ " 'table-source-class' =
'WATERMARK_PUSH_DOWN',\n" +
+ " 'bounded' = 'false'\n" +
+ ")";
+ util.tableEnv().executeSql(ddl);
+ util.verifyPlan("select * from MyTable");
Review comment:
add some tests about
1. projection/filter in `select`,
2. watermark expression contains two field references
##########
File path:
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/WatermarkAssignerProjectTransposeRule.java
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import
org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import
org.apache.flink.table.planner.plan.nodes.calcite.LogicalWatermarkAssigner;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * WatermarkAssignerProjectTransposeRule.
+ * */
+public class WatermarkAssignerProjectTransposeRule extends RelOptRule {
+ public static final WatermarkAssignerProjectTransposeRule INSTANCE =
new WatermarkAssignerProjectTransposeRule();
+
+ public WatermarkAssignerProjectTransposeRule() {
+ super(operand(LogicalWatermarkAssigner.class,
+ operand(LogicalProject.class,
+ operand(LogicalTableScan.class,
none()))),
+ "WatermarkAssignerProjectTransposeRule");
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ LogicalTableScan scan = call.rel(2);
+ TableSourceTable tableSourceTable =
scan.getTable().unwrap(TableSourceTable.class);
+ return tableSourceTable != null &&
tableSourceTable.tableSource() instanceof SupportsWatermarkPushDown;
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ LogicalWatermarkAssigner watermarkAssigner = call.rel(0);
+ LogicalProject project = call.rel(1);
+
+ RexNode computedColumn =
project.getProjects().get(watermarkAssigner.rowtimeFieldIndex());
+
+ RexNode newWatermarkExpr =
watermarkAssigner.watermarkExpr().accept(new RexShuttle() {
+ @Override
+ public RexNode visitInputRef(RexInputRef inputRef) {
+ return computedColumn;
+ }
+ });
+
+ // use -1 to indicate rowtime column is not in scan and
watermark generator has to calculate it.
+ LogicalWatermarkAssigner newWatermarkAssigner =
+ (LogicalWatermarkAssigner)
watermarkAssigner.copy(watermarkAssigner.getTraitSet(),
+ project.getInput(),
+ -1,
Review comment:
could this rule push watermark into scan directly? then we need not to
change the semantic of `LogicalWatermarkAssigner#rowtimeFieldIndex`. It's a
little strange `rowtimeFieldIndex` is -1. ( we can extract a base class for
`PushWatermarkIntoTableSourceScanRule` and
`WatermarkAssignerProjectTransposeRule`)
##########
File path:
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableSourceBase.java
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.factories;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.io.CollectionInputFormat;
+import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.RuntimeConverter;
+import org.apache.flink.table.connector.source.AsyncTableFunctionProvider;
+import org.apache.flink.table.connector.source.InputFormatProvider;
+import org.apache.flink.table.connector.source.LookupTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceFunctionProvider;
+import org.apache.flink.table.connector.source.TableFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.InstantiationUtil;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Base class.
+ * */
+public abstract class TestValuesTableSourceBase implements ScanTableSource,
LookupTableSource {
+
+ protected TableSchema physicalSchema;
+ protected final ChangelogMode changelogMode;
+ protected final boolean bounded;
+ protected final String runtimeSource;
+ /* If source table is not partitionable, we will put all data into a
emptyMap. */
+ protected Map<Map<String, String>, Collection<Row>> data;
+ protected final boolean isAsync;
+ protected final @Nullable String lookupFunctionClass;
+
+ protected TestValuesTableSourceBase(
+ TableSchema physicalSchema,
+ ChangelogMode changelogMode,
+ boolean bounded,
+ String runtimeSource,
+ Map<Map<String, String>, Collection<Row>> data,
+ boolean isAsync,
+ @Nullable String lookupFunctionClass) {
+ this.physicalSchema = physicalSchema;
+ this.changelogMode = changelogMode;
+ this.bounded = bounded;
+ this.runtimeSource = runtimeSource;
+ this.data = data;
+ this.isAsync = isAsync;
+ this.lookupFunctionClass = lookupFunctionClass;
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ return changelogMode;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext
runtimeProviderContext) {
+ TypeSerializer<RowData> serializer = (TypeSerializer<RowData>)
runtimeProviderContext
+
.createTypeInformation(physicalSchema.toRowDataType())
+ .createSerializer(new ExecutionConfig());
+ DataStructureConverter converter =
runtimeProviderContext.createDataStructureConverter(physicalSchema.toRowDataType());
+
converter.open(RuntimeConverter.Context.create(TestValuesTableFactory.class.getClassLoader()));
+ Collection<RowData> values = convertToRowData(converter);
+
+ if (runtimeSource.equals("SourceFunction")) {
+ try {
+ return SourceFunctionProvider.of(
+ new
FromElementsFunction<>(serializer, values),
+ bounded);
+ } catch (IOException e) {
+ throw new TableException("Fail to init source
function", e);
+ }
+ } else if (runtimeSource.equals("InputFormat")) {
+ return InputFormatProvider.of(new
CollectionInputFormat<>(values, serializer));
+ } else {
+ throw new IllegalArgumentException("Unsupported runtime
source class: " + runtimeSource);
+ }
+ }
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ @Override
+ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext
context) {
+ if (lookupFunctionClass != null) {
+ // use the specified lookup function
+ try {
+ Class<?> clazz =
Class.forName(lookupFunctionClass);
+ Object udtf =
InstantiationUtil.instantiate(clazz);
+ if (udtf instanceof TableFunction) {
+ return
TableFunctionProvider.of((TableFunction) udtf);
+ } else {
+ return
AsyncTableFunctionProvider.of((AsyncTableFunction) udtf);
+ }
+ } catch (ClassNotFoundException e) {
+ throw new IllegalArgumentException("Could not
instantiate class: " + lookupFunctionClass);
+ }
+ }
+
+ int[] lookupIndices = Arrays.stream(context.getKeys())
+ .mapToInt(k -> k[0])
+ .toArray();
+ Map<Row, List<Row>> mapping = new HashMap<>();
+
+ data.get(Collections.emptyMap()).forEach(record -> {
+ Row key = Row.of(Arrays.stream(lookupIndices)
+ .mapToObj(record::getField)
+ .toArray());
+ List<Row> list = mapping.get(key);
+ if (list != null) {
+ list.add(record);
+ } else {
+ list = new ArrayList<>();
+ list.add(record);
+ mapping.put(key, list);
+ }
+ });
+ if (isAsync) {
+ return AsyncTableFunctionProvider.of(new
TestValuesRuntimeFunctions.AsyncTestValueLookupFunction(mapping));
+ } else {
+ return TableFunctionProvider.of(new
TestValuesRuntimeFunctions.TestValuesLookupFunction(mapping));
+ }
+ }
+
+ protected Collection<RowData> convertToRowData(DataStructureConverter
converter) {
+ List<RowData> result = new ArrayList<>();
+ for (Row value : handle()) {
+ RowData rowData = (RowData) converter.toInternal(value);
+ if (rowData != null) {
+ rowData.setRowKind(value.getKind());
+ result.add(rowData);
+ }
+ }
+ return result;
+ }
+
+ /*
+ * Used by apply method to deal with.
+ * */
Review comment:
```
/**
*
*/
```
##########
File path:
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/SourceWatermarkITCase.scala
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.stream.sql
+
+import java.sql.Timestamp
+import java.time.LocalDateTime
+
+import org.apache.flink.api.common.eventtime.Watermark
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.bridge.scala._
+import org.apache.flink.table.data.TimestampData
+import org.apache.flink.table.planner.factories.TestValuesTableFactory
+import
org.apache.flink.table.planner.plan.rules.logical.PushWatermarkIntoTableSourceScanRuleTest
+import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
+import org.apache.flink.table.planner.runtime.utils.{StreamingTestBase,
TestingAppendSink}
+import org.apache.flink.types.Row
+import org.junit.Assert.assertEquals
+import org.junit.{Before, Test}
+
+import scala.collection.JavaConverters._
+
+class SourceWatermarkITCase extends StreamingTestBase{
+ @Before
+ override def before(): Unit = {
+ super.before()
+ env.setParallelism(1)
Review comment:
why parallelism is 1 ?
##########
File path:
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenTest.scala
##########
@@ -32,21 +32,27 @@ import
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctio
import org.apache.flink.table.runtime.generated.WatermarkGenerator
import org.apache.flink.table.types.logical.{IntType, TimestampType}
import org.apache.flink.table.utils.CatalogManagerMocks
-
import org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema
import org.apache.calcite.plan.ConventionTraitDef
import org.apache.calcite.rel.`type`.RelDataType
import org.junit.Assert.{assertEquals, assertTrue}
import org.junit.Test
-
import java.lang.{Integer => JInt, Long => JLong}
+import java.util
import java.util.Collections
import java.util.function.{Function => JFunction, Supplier => JSupplier}
+import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier
Review comment:
reorder imports
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenerator.scala
##########
@@ -35,7 +37,8 @@ object WatermarkGeneratorCodeGenerator {
def generateWatermarkGenerator(
config: TableConfig,
inputType: RowType,
- watermarkExpr: RexNode): GeneratedWatermarkGenerator = {
+ watermarkExpr: RexNode,
+ contextTerm: String = null): GeneratedWatermarkGenerator = {
Review comment:
please use `None` instead of `null` in Scala
##########
File path:
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/source/abilities/PeriodicWatermarkAssignerProvider.java
##########
@@ -28,6 +28,7 @@
* generating watermarks in {@link ScanTableSource}.
*/
@PublicEvolving
+@Deprecated
Review comment:
it's better to add some comments about "@Deprecated" action
##########
File path:
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenTest.scala
##########
@@ -154,7 +199,9 @@ class WatermarkGeneratorCodeGenTest {
assertTrue(JavaFunc5.closeCalled)
}
- private def generateWatermarkGenerator(expr: String): WatermarkGenerator = {
+
+ private def generateWatermarkGenerator(expr: String,
+ useDefinedConstructor: Boolean = true): WatermarkGenerator = {
Review comment:
nit: remove the default value
##########
File path:
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/WatermarkAssignerProjectTransposeRule.java
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import
org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import
org.apache.flink.table.planner.plan.nodes.calcite.LogicalWatermarkAssigner;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * WatermarkAssignerProjectTransposeRule.
+ * */
+public class WatermarkAssignerProjectTransposeRule extends RelOptRule {
+ public static final WatermarkAssignerProjectTransposeRule INSTANCE =
new WatermarkAssignerProjectTransposeRule();
+
+ public WatermarkAssignerProjectTransposeRule() {
+ super(operand(LogicalWatermarkAssigner.class,
+ operand(LogicalProject.class,
+ operand(LogicalTableScan.class,
none()))),
+ "WatermarkAssignerProjectTransposeRule");
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ LogicalTableScan scan = call.rel(2);
+ TableSourceTable tableSourceTable =
scan.getTable().unwrap(TableSourceTable.class);
+ return tableSourceTable != null &&
tableSourceTable.tableSource() instanceof SupportsWatermarkPushDown;
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ LogicalWatermarkAssigner watermarkAssigner = call.rel(0);
+ LogicalProject project = call.rel(1);
+
+ RexNode computedColumn =
project.getProjects().get(watermarkAssigner.rowtimeFieldIndex());
+
+ RexNode newWatermarkExpr =
watermarkAssigner.watermarkExpr().accept(new RexShuttle() {
+ @Override
+ public RexNode visitInputRef(RexInputRef inputRef) {
+ return computedColumn;
Review comment:
do all input refs in watermark expression are from rowtime field ?
##########
File path:
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/WatermarkAssignerProjectTransposeRule.java
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import
org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import
org.apache.flink.table.planner.plan.nodes.calcite.LogicalWatermarkAssigner;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * WatermarkAssignerProjectTransposeRule.
+ * */
+public class WatermarkAssignerProjectTransposeRule extends RelOptRule {
Review comment:
please add some comments to explain the purpose of this rule
##########
File path:
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableSourceBase.java
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.factories;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.io.CollectionInputFormat;
+import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.RuntimeConverter;
+import org.apache.flink.table.connector.source.AsyncTableFunctionProvider;
+import org.apache.flink.table.connector.source.InputFormatProvider;
+import org.apache.flink.table.connector.source.LookupTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceFunctionProvider;
+import org.apache.flink.table.connector.source.TableFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.InstantiationUtil;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Base class.
Review comment:
give some more meaningful comments
##########
File path:
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.java
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.planner.calcite.CalciteConfig;
+import
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgramBuilder;
+import org.apache.flink.table.planner.plan.optimize.program.FlinkStreamProgram;
+import
org.apache.flink.table.planner.plan.optimize.program.HEP_RULES_EXECUTION_TYPE;
+import
org.apache.flink.table.planner.plan.optimize.program.StreamOptimizeContext;
+import org.apache.flink.table.planner.utils.StreamTableTestUtil;
+import org.apache.flink.table.planner.utils.TableConfigUtils;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.apache.calcite.plan.hep.HepMatchOrder;
+import org.apache.calcite.tools.RuleSets;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+
+/**
+ * Test rule PushWatermarkIntoTableSourceScanRule.
+ * */
+public class PushWatermarkIntoTableSourceScanRuleTest extends TableTestBase {
+ private StreamTableTestUtil util = streamTestUtil(new TableConfig());
+
+ @Before
+ public void setup() {
+ util.buildStreamProgram(FlinkStreamProgram.DEFAULT_REWRITE());
+ CalciteConfig calciteConfig =
TableConfigUtils.getCalciteConfig(util.tableEnv().getConfig());
+ calciteConfig.getStreamProgram().get().addLast(
+ "PushWatermarkIntoTableSourceScanRule",
+
FlinkHepRuleSetProgramBuilder.<StreamOptimizeContext>newBuilder()
+
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE())
+
.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+ .add(RuleSets.ofList(
+
WatermarkAssignerProjectTransposeRule.INSTANCE,
+
PushWatermarkIntoTableSourceScanRule.INSTANCE))
+ .build()
+ );
Review comment:
it's better to build a specific program that only contains the rules
needed for the current test, this could avoid interference with other rules.
##########
File path:
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.java
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.planner.calcite.CalciteConfig;
+import
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgramBuilder;
+import org.apache.flink.table.planner.plan.optimize.program.FlinkStreamProgram;
+import
org.apache.flink.table.planner.plan.optimize.program.HEP_RULES_EXECUTION_TYPE;
+import
org.apache.flink.table.planner.plan.optimize.program.StreamOptimizeContext;
+import org.apache.flink.table.planner.utils.StreamTableTestUtil;
+import org.apache.flink.table.planner.utils.TableConfigUtils;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.apache.calcite.plan.hep.HepMatchOrder;
+import org.apache.calcite.tools.RuleSets;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+
+/**
+ * Test rule PushWatermarkIntoTableSourceScanRule.
Review comment:
Test for [[PushWatermarkIntoTableSourceScanRule]] and
[[WatermarkAssignerProjectTransposeRule]]
##########
File path:
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.scala
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.stream.sql
+
+import org.apache.flink.table.planner.utils.TableTestBase
+import
org.apache.flink.table.planner.plan.rules.logical.PushWatermarkIntoTableSourceScanRuleTest
+
+import org.junit.Test
+
+class SourceWatermarkTest extends TableTestBase {
Review comment:
extract a base test class for `PushWatermarkIntoTableSourceScanRuleTest`
and `SourceWatermarkTest ` ?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]