godfreyhe commented on a change in pull request #13449: URL: https://github.com/apache/flink/pull/13449#discussion_r518045062
########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoFlinkTableSourceScanRule.java ########## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown; +import org.apache.flink.table.planner.calcite.FlinkContext; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableSourceScan; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalWatermarkAssigner; +import org.apache.flink.table.planner.plan.schema.TableSourceTable; + +import org.apache.calcite.plan.RelOptRuleCall; + + +/** + * Rule to push the {@link FlinkLogicalWatermarkAssigner} into the {@link FlinkLogicalTableSourceScan}. + */ +public class PushWatermarkIntoFlinkTableSourceScanRule extends PushWatermarkIntoFlinkLogicalTableSourceScanBaseRule { Review comment: PushWatermarkIntoTableSourceScanRule ########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoFlinkLogicalTableSourceScanBaseRule.java ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.flink.api.common.eventtime.Watermark; +import org.apache.flink.api.common.eventtime.WatermarkGenerator; +import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier; +import org.apache.flink.api.common.eventtime.WatermarkOutput; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.planner.calcite.FlinkContext; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.codegen.WatermarkGeneratorCodeGenerator; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableSourceScan; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalWatermarkAssigner; +import org.apache.flink.table.planner.plan.schema.TableSourceTable; +import org.apache.flink.table.runtime.generated.GeneratedWatermarkGenerator; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleOperand; +import org.apache.calcite.rex.RexNode; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import scala.Option; + +/** + * Base rule for interface {@link SupportsWatermarkPushDown}. It offers a util to push the {@link FlinkLogicalWatermarkAssigner} + * into the {@link FlinkLogicalTableSourceScan}. + */ +public abstract class PushWatermarkIntoFlinkLogicalTableSourceScanBaseRule extends RelOptRule { + + public PushWatermarkIntoFlinkLogicalTableSourceScanBaseRule(RelOptRuleOperand operand, + String description) { + super(operand, description); + } + + /** + * It uses the input watermark expression to generate the {@link WatermarkGeneratorSupplier}. After the {@link WatermarkStrategy} + * is pushed into the scan, it will build a new scan. However, when {@link FlinkLogicalWatermarkAssigner} is the parent of the + * {@link FlinkLogicalTableSourceScan} it should modify the rowtime type to keep the type of plan is consistent. In other cases, + * it just keep the data type of the scan as same as before and leave the work when rewriting the projection. + */ + protected FlinkLogicalTableSourceScan getNewScan( + FlinkLogicalWatermarkAssigner watermarkAssigner, RexNode watermarkExpr, FlinkLogicalTableSourceScan scan, FlinkContext context) { + + GeneratedWatermarkGenerator generatedWatermarkGenerator = + WatermarkGeneratorCodeGenerator.generateWatermarkGenerator( + context.getTableConfig(), + FlinkTypeFactory.toLogicalRowType(scan.getRowType()), + watermarkExpr, + Option.apply("context")); + Configuration configuration = context.getTableConfig().getConfiguration(); + + WatermarkGeneratorSupplier<RowData> supplier = new DefaultWatermarkGeneratorSupplier(configuration, generatedWatermarkGenerator); + String digest = String.format("watermark=[%s]", watermarkExpr); + + WatermarkStrategy<RowData> watermarkStrategy = WatermarkStrategy.forGenerator(supplier); + Duration idleTimeout = configuration.get(ExecutionConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT); + if (!idleTimeout.isZero() && !idleTimeout.isNegative()) { + watermarkStrategy.withIdleness(idleTimeout); + digest = String.format("%s idletimeout=[%s]", digest, idleTimeout.toMillis()); + } + + TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class); + DynamicTableSource newDynamicTableSource = tableSourceTable.tableSource().copy(); + + ((SupportsWatermarkPushDown) newDynamicTableSource).applyWatermark(watermarkStrategy); + // scan row type: set rowtime type + TableSourceTable newTableSourceTable; + if (scan.getRowType().equals(watermarkAssigner.getInput().getRowType())) { + // without projection or project doesn't project or project doesn't add new computed columns + newTableSourceTable = tableSourceTable.copy( + newDynamicTableSource, + watermarkAssigner.getRowType(), + new String[]{digest}); + } else { + // project exists. make the project's rowtype is consistent with the origin plan. + newTableSourceTable = tableSourceTable.copy( + newDynamicTableSource, + scan.getRowType(), + new String[]{digest}); + } + + FlinkLogicalTableSourceScan newScan = FlinkLogicalTableSourceScan.create(scan.getCluster(), + newTableSourceTable); + return newScan; + } + + /** + * Wrapper of the {@link GeneratedWatermarkGenerator} that is used to create {@link WatermarkGenerator}. + * The {@link DefaultWatermarkGeneratorSupplier} uses the {@link WatermarkGeneratorSupplier.Context} to init + * the generated watermark generator. + */ + private static class DefaultWatermarkGeneratorSupplier implements WatermarkGeneratorSupplier<RowData> { + + private static final long serialVersionUID = 1L; + + private final Configuration configuration; + private final GeneratedWatermarkGenerator generatedWatermarkGenerator; + + public DefaultWatermarkGeneratorSupplier(Configuration configuration, + GeneratedWatermarkGenerator generatedWatermarkGenerator) { + this.configuration = configuration; + this.generatedWatermarkGenerator = generatedWatermarkGenerator; + } + + @Override + public WatermarkGenerator<RowData> createWatermarkGenerator(Context context) { + + List<Object> references = new ArrayList<>(Arrays.asList(generatedWatermarkGenerator.getReferences())); + references.add(context); + + org.apache.flink.table.runtime.generated.WatermarkGenerator innerWatermarkGenerator = + new GeneratedWatermarkGenerator( + generatedWatermarkGenerator.getClassName(), + generatedWatermarkGenerator.getCode(), + references.toArray()) + .newInstance(Thread.currentThread().getContextClassLoader()); + + try { + innerWatermarkGenerator.open(configuration); + } catch (Exception e) { + throw new RuntimeException("Fail to instantiate generated watermark generator.", e); + } + return new DefaultWatermarkGeneratorSupplier.DefaultWatermarkGenerator(innerWatermarkGenerator); + } + + /** + * Wrapper of the code-generated {@link org.apache.flink.table.runtime.generated.WatermarkGenerator}. + */ + private class DefaultWatermarkGenerator implements WatermarkGenerator<RowData> { Review comment: add `static` ########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoFlinkTableSourceScanRule.java ########## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown; +import org.apache.flink.table.planner.calcite.FlinkContext; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableSourceScan; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalWatermarkAssigner; +import org.apache.flink.table.planner.plan.schema.TableSourceTable; + +import org.apache.calcite.plan.RelOptRuleCall; + + +/** + * Rule to push the {@link FlinkLogicalWatermarkAssigner} into the {@link FlinkLogicalTableSourceScan}. + */ +public class PushWatermarkIntoFlinkTableSourceScanRule extends PushWatermarkIntoFlinkLogicalTableSourceScanBaseRule { + public static final PushWatermarkIntoFlinkTableSourceScanRule INSTANCE = new PushWatermarkIntoFlinkTableSourceScanRule(); + + public PushWatermarkIntoFlinkTableSourceScanRule() { + super(operand(FlinkLogicalWatermarkAssigner.class, + operand(FlinkLogicalTableSourceScan.class, none())), + "PushWatermarkIntoFlinkTableSourceScan"); Review comment: "PushWatermarkIntoFlinkTableSourceScan" -> "PushWatermarkIntoTableSourceScanRule" ########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoFlinkTableSourceScanAcrossCalcRule.java ########## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown; +import org.apache.flink.table.planner.calcite.FlinkContext; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableSourceScan; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalWatermarkAssigner; +import org.apache.flink.table.planner.plan.schema.TableSourceTable; + +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexLocalRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexProgram; +import org.apache.calcite.rex.RexProgramBuilder; +import org.apache.calcite.rex.RexShuttle; +import org.apache.calcite.util.Pair; + +import java.util.List; + +/** + * Rule to push the {@link FlinkLogicalWatermarkAssigner} across the {@link FlinkLogicalCalc} to the {@link FlinkLogicalTableSourceScan}. + * The rule will first look for the computed column in the {@link FlinkLogicalCalc} and then translate the watermark expression + * and the computed column into a {@link WatermarkStrategy}. With the new scan the rule will build a new {@link FlinkLogicalCalc}. + */ +public class PushWatermarkIntoFlinkTableSourceScanAcrossCalcRule extends PushWatermarkIntoFlinkLogicalTableSourceScanBaseRule { Review comment: PushWatermarkIntoTableSourceScanAcrossCalcRule ########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoFlinkTableSourceScanAcrossCalcRule.java ########## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown; +import org.apache.flink.table.planner.calcite.FlinkContext; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableSourceScan; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalWatermarkAssigner; +import org.apache.flink.table.planner.plan.schema.TableSourceTable; + +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexLocalRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexProgram; +import org.apache.calcite.rex.RexProgramBuilder; +import org.apache.calcite.rex.RexShuttle; +import org.apache.calcite.util.Pair; + +import java.util.List; + +/** + * Rule to push the {@link FlinkLogicalWatermarkAssigner} across the {@link FlinkLogicalCalc} to the {@link FlinkLogicalTableSourceScan}. + * The rule will first look for the computed column in the {@link FlinkLogicalCalc} and then translate the watermark expression + * and the computed column into a {@link WatermarkStrategy}. With the new scan the rule will build a new {@link FlinkLogicalCalc}. + */ +public class PushWatermarkIntoFlinkTableSourceScanAcrossCalcRule extends PushWatermarkIntoFlinkLogicalTableSourceScanBaseRule { + public static final PushWatermarkIntoFlinkTableSourceScanAcrossCalcRule INSTANCE = new PushWatermarkIntoFlinkTableSourceScanAcrossCalcRule(); + + public PushWatermarkIntoFlinkTableSourceScanAcrossCalcRule() { + super(operand(FlinkLogicalWatermarkAssigner.class, + operand(FlinkLogicalCalc.class, + operand(FlinkLogicalTableSourceScan.class, none()))), + "PushWatermarkIntoFlinkTableSourceScanAcrossProjectRule"); + } + + @Override + public boolean matches(RelOptRuleCall call) { + FlinkLogicalTableSourceScan scan = call.rel(2); + TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class); + return tableSourceTable != null && tableSourceTable.tableSource() instanceof SupportsWatermarkPushDown; + } + + @Override + public void onMatch(RelOptRuleCall call) { + FlinkLogicalWatermarkAssigner watermarkAssigner = call.rel(0); + FlinkLogicalCalc calc = call.rel(1); + + RexProgram originProgram = calc.getProgram(); + List<RexLocalRef> projectList = originProgram.getProjectList(); + + //get watermark expression + RexNode computedColumn = originProgram.expandLocalRef(projectList.get(watermarkAssigner.rowtimeFieldIndex())); + RexNode newWatermarkExpr = watermarkAssigner.watermarkExpr().accept(new RexShuttle() { + @Override + public RexNode visitInputRef(RexInputRef inputRef) { Review comment: we should only replace the rowtime field ref and other refs should be re-indexed based on TableScan's rowtype. ########## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.scala ########## @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.stream.sql + +import org.apache.flink.table.planner.utils.TableTestBase +import org.apache.flink.table.planner.plan.rules.logical._ +import org.junit.Test + +/** + * Tests for [[PushWatermarkIntoFlinkTableSourceScanRule]] and + * [[PushWatermarkIntoFlinkTableSourceScanAcrossCalcRule]]. Review comment: Tests for watermark push down. ########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoFlinkTableSourceScanAcrossCalcRule.java ########## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown; +import org.apache.flink.table.planner.calcite.FlinkContext; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableSourceScan; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalWatermarkAssigner; +import org.apache.flink.table.planner.plan.schema.TableSourceTable; + +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexLocalRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexProgram; +import org.apache.calcite.rex.RexProgramBuilder; +import org.apache.calcite.rex.RexShuttle; +import org.apache.calcite.util.Pair; + +import java.util.List; + +/** + * Rule to push the {@link FlinkLogicalWatermarkAssigner} across the {@link FlinkLogicalCalc} to the {@link FlinkLogicalTableSourceScan}. + * The rule will first look for the computed column in the {@link FlinkLogicalCalc} and then translate the watermark expression + * and the computed column into a {@link WatermarkStrategy}. With the new scan the rule will build a new {@link FlinkLogicalCalc}. + */ +public class PushWatermarkIntoFlinkTableSourceScanAcrossCalcRule extends PushWatermarkIntoFlinkLogicalTableSourceScanBaseRule { + public static final PushWatermarkIntoFlinkTableSourceScanAcrossCalcRule INSTANCE = new PushWatermarkIntoFlinkTableSourceScanAcrossCalcRule(); + + public PushWatermarkIntoFlinkTableSourceScanAcrossCalcRule() { + super(operand(FlinkLogicalWatermarkAssigner.class, + operand(FlinkLogicalCalc.class, + operand(FlinkLogicalTableSourceScan.class, none()))), + "PushWatermarkIntoFlinkTableSourceScanAcrossProjectRule"); + } + + @Override + public boolean matches(RelOptRuleCall call) { + FlinkLogicalTableSourceScan scan = call.rel(2); + TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class); + return tableSourceTable != null && tableSourceTable.tableSource() instanceof SupportsWatermarkPushDown; + } + + @Override + public void onMatch(RelOptRuleCall call) { + FlinkLogicalWatermarkAssigner watermarkAssigner = call.rel(0); + FlinkLogicalCalc calc = call.rel(1); + + RexProgram originProgram = calc.getProgram(); + List<RexLocalRef> projectList = originProgram.getProjectList(); + + //get watermark expression + RexNode computedColumn = originProgram.expandLocalRef(projectList.get(watermarkAssigner.rowtimeFieldIndex())); + RexNode newWatermarkExpr = watermarkAssigner.watermarkExpr().accept(new RexShuttle() { + @Override + public RexNode visitInputRef(RexInputRef inputRef) { + // replace the input ref with the computed column + return computedColumn; + } + }); + + // push watermark assigner into the scan + FlinkLogicalTableSourceScan newScan = + getNewScan(watermarkAssigner, newWatermarkExpr, call.rel(2), (FlinkContext) call.getPlanner().getContext()); + + FlinkTypeFactory typeFactory = (FlinkTypeFactory) watermarkAssigner.getCluster().getTypeFactory(); + RexBuilder builder = call.builder().getRexBuilder(); + // cast timestamp type to rowtime type. + RexNode newRexNode = builder.makeReinterpretCast( Review comment: newComputedColumn ########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanBaseRule.java ########## @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.flink.api.common.eventtime.Watermark; +import org.apache.flink.api.common.eventtime.WatermarkGenerator; +import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier; +import org.apache.flink.api.common.eventtime.WatermarkOutput; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.planner.calcite.FlinkContext; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.codegen.WatermarkGeneratorCodeGenerator; +import org.apache.flink.table.planner.plan.nodes.calcite.LogicalWatermarkAssigner; +import org.apache.flink.table.planner.plan.schema.TableSourceTable; +import org.apache.flink.table.runtime.generated.GeneratedWatermarkGenerator; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleOperand; +import org.apache.calcite.rel.logical.LogicalTableScan; +import org.apache.calcite.rex.RexNode; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import scala.Option; + +/** + * Base Planner rule for {@link PushWatermarkIntoTableSourceScanRule} and {@link PushWatermarkIntoTableSourceScanAcrossProjectRule}. + */ +public abstract class PushWatermarkIntoTableSourceScanBaseRule extends RelOptRule { + public PushWatermarkIntoTableSourceScanBaseRule(RelOptRuleOperand operand, String description) { + super(operand, description); + } + + protected LogicalTableScan getNewScan(LogicalWatermarkAssigner watermarkAssigner, RexNode watermarkExpr, LogicalTableScan scan, FlinkContext context) { Review comment: if only `TableConfig` is used in this method, I suggest to pass `TableConfig` instead `FlinkContext `. ########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/sources/DynamicSourceUtils.java ########## @@ -331,19 +329,6 @@ private static void validateWatermarks( } } - private static void validateAbilities(DynamicTableSource source) { - UNSUPPORTED_ABILITIES.forEach(ability -> { Review comment: UNSUPPORTED_ABILITIES should also be removed ########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoFlinkLogicalTableSourceScanBaseRule.java ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.flink.api.common.eventtime.Watermark; +import org.apache.flink.api.common.eventtime.WatermarkGenerator; +import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier; +import org.apache.flink.api.common.eventtime.WatermarkOutput; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.planner.calcite.FlinkContext; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.codegen.WatermarkGeneratorCodeGenerator; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableSourceScan; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalWatermarkAssigner; +import org.apache.flink.table.planner.plan.schema.TableSourceTable; +import org.apache.flink.table.runtime.generated.GeneratedWatermarkGenerator; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleOperand; +import org.apache.calcite.rex.RexNode; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import scala.Option; + +/** + * Base rule for interface {@link SupportsWatermarkPushDown}. It offers a util to push the {@link FlinkLogicalWatermarkAssigner} + * into the {@link FlinkLogicalTableSourceScan}. + */ +public abstract class PushWatermarkIntoFlinkLogicalTableSourceScanBaseRule extends RelOptRule { + + public PushWatermarkIntoFlinkLogicalTableSourceScanBaseRule(RelOptRuleOperand operand, + String description) { + super(operand, description); + } + + /** + * It uses the input watermark expression to generate the {@link WatermarkGeneratorSupplier}. After the {@link WatermarkStrategy} + * is pushed into the scan, it will build a new scan. However, when {@link FlinkLogicalWatermarkAssigner} is the parent of the + * {@link FlinkLogicalTableSourceScan} it should modify the rowtime type to keep the type of plan is consistent. In other cases, + * it just keep the data type of the scan as same as before and leave the work when rewriting the projection. + */ + protected FlinkLogicalTableSourceScan getNewScan( + FlinkLogicalWatermarkAssigner watermarkAssigner, RexNode watermarkExpr, FlinkLogicalTableSourceScan scan, FlinkContext context) { + + GeneratedWatermarkGenerator generatedWatermarkGenerator = + WatermarkGeneratorCodeGenerator.generateWatermarkGenerator( + context.getTableConfig(), + FlinkTypeFactory.toLogicalRowType(scan.getRowType()), + watermarkExpr, + Option.apply("context")); + Configuration configuration = context.getTableConfig().getConfiguration(); + + WatermarkGeneratorSupplier<RowData> supplier = new DefaultWatermarkGeneratorSupplier(configuration, generatedWatermarkGenerator); + String digest = String.format("watermark=[%s]", watermarkExpr); + + WatermarkStrategy<RowData> watermarkStrategy = WatermarkStrategy.forGenerator(supplier); + Duration idleTimeout = configuration.get(ExecutionConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT); + if (!idleTimeout.isZero() && !idleTimeout.isNegative()) { + watermarkStrategy.withIdleness(idleTimeout); + digest = String.format("%s idletimeout=[%s]", digest, idleTimeout.toMillis()); + } + + TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class); + DynamicTableSource newDynamicTableSource = tableSourceTable.tableSource().copy(); + + ((SupportsWatermarkPushDown) newDynamicTableSource).applyWatermark(watermarkStrategy); + // scan row type: set rowtime type + TableSourceTable newTableSourceTable; + if (scan.getRowType().equals(watermarkAssigner.getInput().getRowType())) { Review comment: add a method parameter determine the condition ########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoFlinkTableSourceScanAcrossCalcRule.java ########## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown; +import org.apache.flink.table.planner.calcite.FlinkContext; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableSourceScan; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalWatermarkAssigner; +import org.apache.flink.table.planner.plan.schema.TableSourceTable; + +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexLocalRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexProgram; +import org.apache.calcite.rex.RexProgramBuilder; +import org.apache.calcite.rex.RexShuttle; +import org.apache.calcite.util.Pair; + +import java.util.List; + +/** + * Rule to push the {@link FlinkLogicalWatermarkAssigner} across the {@link FlinkLogicalCalc} to the {@link FlinkLogicalTableSourceScan}. + * The rule will first look for the computed column in the {@link FlinkLogicalCalc} and then translate the watermark expression + * and the computed column into a {@link WatermarkStrategy}. With the new scan the rule will build a new {@link FlinkLogicalCalc}. + */ +public class PushWatermarkIntoFlinkTableSourceScanAcrossCalcRule extends PushWatermarkIntoFlinkLogicalTableSourceScanBaseRule { + public static final PushWatermarkIntoFlinkTableSourceScanAcrossCalcRule INSTANCE = new PushWatermarkIntoFlinkTableSourceScanAcrossCalcRule(); + + public PushWatermarkIntoFlinkTableSourceScanAcrossCalcRule() { + super(operand(FlinkLogicalWatermarkAssigner.class, + operand(FlinkLogicalCalc.class, + operand(FlinkLogicalTableSourceScan.class, none()))), + "PushWatermarkIntoFlinkTableSourceScanAcrossProjectRule"); + } + + @Override + public boolean matches(RelOptRuleCall call) { + FlinkLogicalTableSourceScan scan = call.rel(2); + TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class); + return tableSourceTable != null && tableSourceTable.tableSource() instanceof SupportsWatermarkPushDown; + } + + @Override + public void onMatch(RelOptRuleCall call) { + FlinkLogicalWatermarkAssigner watermarkAssigner = call.rel(0); + FlinkLogicalCalc calc = call.rel(1); + + RexProgram originProgram = calc.getProgram(); + List<RexLocalRef> projectList = originProgram.getProjectList(); Review comment: we can expand the local ref first ########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoFlinkLogicalTableSourceScanBaseRule.java ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.flink.api.common.eventtime.Watermark; +import org.apache.flink.api.common.eventtime.WatermarkGenerator; +import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier; +import org.apache.flink.api.common.eventtime.WatermarkOutput; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.planner.calcite.FlinkContext; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.codegen.WatermarkGeneratorCodeGenerator; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableSourceScan; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalWatermarkAssigner; +import org.apache.flink.table.planner.plan.schema.TableSourceTable; +import org.apache.flink.table.runtime.generated.GeneratedWatermarkGenerator; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleOperand; +import org.apache.calcite.rex.RexNode; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import scala.Option; + +/** + * Base rule for interface {@link SupportsWatermarkPushDown}. It offers a util to push the {@link FlinkLogicalWatermarkAssigner} + * into the {@link FlinkLogicalTableSourceScan}. + */ +public abstract class PushWatermarkIntoFlinkLogicalTableSourceScanBaseRule extends RelOptRule { + + public PushWatermarkIntoFlinkLogicalTableSourceScanBaseRule(RelOptRuleOperand operand, + String description) { + super(operand, description); + } + + /** + * It uses the input watermark expression to generate the {@link WatermarkGeneratorSupplier}. After the {@link WatermarkStrategy} + * is pushed into the scan, it will build a new scan. However, when {@link FlinkLogicalWatermarkAssigner} is the parent of the + * {@link FlinkLogicalTableSourceScan} it should modify the rowtime type to keep the type of plan is consistent. In other cases, + * it just keep the data type of the scan as same as before and leave the work when rewriting the projection. + */ + protected FlinkLogicalTableSourceScan getNewScan( Review comment: wrap to multiple lines? ########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoFlinkLogicalTableSourceScanBaseRule.java ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.flink.api.common.eventtime.Watermark; +import org.apache.flink.api.common.eventtime.WatermarkGenerator; +import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier; +import org.apache.flink.api.common.eventtime.WatermarkOutput; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.planner.calcite.FlinkContext; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.codegen.WatermarkGeneratorCodeGenerator; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableSourceScan; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalWatermarkAssigner; +import org.apache.flink.table.planner.plan.schema.TableSourceTable; +import org.apache.flink.table.runtime.generated.GeneratedWatermarkGenerator; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleOperand; +import org.apache.calcite.rex.RexNode; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import scala.Option; + +/** + * Base rule for interface {@link SupportsWatermarkPushDown}. It offers a util to push the {@link FlinkLogicalWatermarkAssigner} + * into the {@link FlinkLogicalTableSourceScan}. + */ +public abstract class PushWatermarkIntoFlinkLogicalTableSourceScanBaseRule extends RelOptRule { + + public PushWatermarkIntoFlinkLogicalTableSourceScanBaseRule(RelOptRuleOperand operand, + String description) { + super(operand, description); + } + + /** + * It uses the input watermark expression to generate the {@link WatermarkGeneratorSupplier}. After the {@link WatermarkStrategy} + * is pushed into the scan, it will build a new scan. However, when {@link FlinkLogicalWatermarkAssigner} is the parent of the + * {@link FlinkLogicalTableSourceScan} it should modify the rowtime type to keep the type of plan is consistent. In other cases, + * it just keep the data type of the scan as same as before and leave the work when rewriting the projection. + */ + protected FlinkLogicalTableSourceScan getNewScan( + FlinkLogicalWatermarkAssigner watermarkAssigner, RexNode watermarkExpr, FlinkLogicalTableSourceScan scan, FlinkContext context) { + + GeneratedWatermarkGenerator generatedWatermarkGenerator = + WatermarkGeneratorCodeGenerator.generateWatermarkGenerator( + context.getTableConfig(), + FlinkTypeFactory.toLogicalRowType(scan.getRowType()), + watermarkExpr, + Option.apply("context")); + Configuration configuration = context.getTableConfig().getConfiguration(); + + WatermarkGeneratorSupplier<RowData> supplier = new DefaultWatermarkGeneratorSupplier(configuration, generatedWatermarkGenerator); + String digest = String.format("watermark=[%s]", watermarkExpr); + + WatermarkStrategy<RowData> watermarkStrategy = WatermarkStrategy.forGenerator(supplier); + Duration idleTimeout = configuration.get(ExecutionConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT); + if (!idleTimeout.isZero() && !idleTimeout.isNegative()) { + watermarkStrategy.withIdleness(idleTimeout); + digest = String.format("%s idletimeout=[%s]", digest, idleTimeout.toMillis()); + } + + TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class); + DynamicTableSource newDynamicTableSource = tableSourceTable.tableSource().copy(); + + ((SupportsWatermarkPushDown) newDynamicTableSource).applyWatermark(watermarkStrategy); + // scan row type: set rowtime type + TableSourceTable newTableSourceTable; + if (scan.getRowType().equals(watermarkAssigner.getInput().getRowType())) { + // without projection or project doesn't project or project doesn't add new computed columns + newTableSourceTable = tableSourceTable.copy( + newDynamicTableSource, + watermarkAssigner.getRowType(), + new String[]{digest}); + } else { + // project exists. make the project's rowtype is consistent with the origin plan. + newTableSourceTable = tableSourceTable.copy( + newDynamicTableSource, + scan.getRowType(), + new String[]{digest}); + } + + FlinkLogicalTableSourceScan newScan = FlinkLogicalTableSourceScan.create(scan.getCluster(), + newTableSourceTable); + return newScan; Review comment: return the result directly ########## File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoFlinkTableSourceScanRuleTest.java ########## @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.functions.ScalarFunction; +import org.apache.flink.table.planner.calcite.CalciteConfig; +import org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgramBuilder; +import org.apache.flink.table.planner.plan.optimize.program.FlinkStreamProgram; +import org.apache.flink.table.planner.plan.optimize.program.HEP_RULES_EXECUTION_TYPE; +import org.apache.flink.table.planner.plan.optimize.program.StreamOptimizeContext; +import org.apache.flink.table.planner.utils.StreamTableTestUtil; +import org.apache.flink.table.planner.utils.TableConfigUtils; +import org.apache.flink.table.planner.utils.TableTestBase; + +import org.apache.calcite.plan.hep.HepMatchOrder; +import org.apache.calcite.tools.RuleSets; +import org.junit.Before; +import org.junit.Test; + +import java.time.LocalDateTime; +import java.time.ZoneOffset; + +/** + * Test rule {@link PushWatermarkIntoFlinkTableSourceScanAcrossCalcRule} and {@link PushWatermarkIntoFlinkTableSourceScanRule}. + * */ +public class PushWatermarkIntoFlinkTableSourceScanRuleTest extends TableTestBase { + private StreamTableTestUtil util = streamTestUtil(new TableConfig()); + + @Before + public void setup() { + util.buildStreamProgram(FlinkStreamProgram.LOGICAL_REWRITE()); + CalciteConfig calciteConfig = TableConfigUtils.getCalciteConfig(util.tableEnv().getConfig()); + calciteConfig.getStreamProgram().get().addLast( + "PushWatermarkIntoTableSourceScanRule", + FlinkHepRuleSetProgramBuilder.<StreamOptimizeContext>newBuilder() + .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE()) + .setHepMatchOrder(HepMatchOrder.BOTTOM_UP) + .add(RuleSets.ofList( + PushWatermarkIntoFlinkTableSourceScanRule.INSTANCE, + PushWatermarkIntoFlinkTableSourceScanAcrossCalcRule.INSTANCE)) + .build() + ); + } + + @Test + public void testSimpleWatermark() { + String ddl = + "create table MyTable(" + + " a int,\n" + + " b bigint,\n" + + " c timestamp(3),\n" + + " watermark for c as c - interval '5' second\n" + + ") WITH (\n" + + " 'connector' = 'values',\n" + + " 'enable-watermark-push-down' = 'true',\n" + + " 'bounded' = 'false',\n" + + " 'disable-lookup' = 'true'" + + ")"; + util.tableEnv().executeSql(ddl); + util.verifyPlan("select a, c from MyTable"); + } + + @Test + public void testWatermarkOnComputedColumn() { + String ddl = + "create table MyTable(" + + " a int,\n" + + " b bigint,\n" + + " c timestamp(3),\n" + + " d as c + interval '5' second,\n" + + " watermark for d as d - interval '5' second\n" + + ") WITH (\n" + + " 'connector' = 'values',\n" + + " 'enable-watermark-push-down' = 'true',\n" + + " 'bounded' = 'false',\n" + + " 'disable-lookup' = 'true'" + + ")"; + util.tableEnv().executeSql(ddl); + util.verifyPlan("select * from MyTable"); + } + + @Test + public void testWatermarkOnComputedColumnWithQuery() { + String ddl = + "create table MyTable(" + + " a int,\n" + + " b bigint,\n" + + " c timestamp(3) not null,\n" + + " d as c + interval '5' second,\n" + + " watermark for d as d - interval '5' second\n" + Review comment: add a test about `watermark for d as d - interval '5' second + other timestamp field` ########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanBaseRule.java ########## @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.flink.api.common.eventtime.Watermark; +import org.apache.flink.api.common.eventtime.WatermarkGenerator; +import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier; +import org.apache.flink.api.common.eventtime.WatermarkOutput; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.planner.calcite.FlinkContext; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.codegen.WatermarkGeneratorCodeGenerator; +import org.apache.flink.table.planner.plan.nodes.calcite.LogicalWatermarkAssigner; +import org.apache.flink.table.planner.plan.schema.TableSourceTable; +import org.apache.flink.table.runtime.generated.GeneratedWatermarkGenerator; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleOperand; +import org.apache.calcite.rel.logical.LogicalTableScan; +import org.apache.calcite.rex.RexNode; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import scala.Option; + +/** + * Base Planner rule for {@link PushWatermarkIntoTableSourceScanRule} and {@link PushWatermarkIntoTableSourceScanAcrossProjectRule}. + */ +public abstract class PushWatermarkIntoTableSourceScanBaseRule extends RelOptRule { + public PushWatermarkIntoTableSourceScanBaseRule(RelOptRuleOperand operand, String description) { + super(operand, description); + } + + protected LogicalTableScan getNewScan(LogicalWatermarkAssigner watermarkAssigner, RexNode watermarkExpr, LogicalTableScan scan, FlinkContext context) { + // generate an inner watermark generator class that allows us to pass FunctionContext and ClassLoader + GeneratedWatermarkGenerator generatedWatermarkGenerator = + WatermarkGeneratorCodeGenerator.generateWatermarkGenerator( + context.getTableConfig(), + FlinkTypeFactory.toLogicalRowType(scan.getRowType()), + watermarkExpr, + Option.apply("context")); + Configuration configuration = context.getTableConfig().getConfiguration(); + + WatermarkGeneratorSupplier<RowData> supplier = new DefaultWatermarkGeneratorSupplier(configuration, generatedWatermarkGenerator); + String digest = String.format("watermark=[%s]", watermarkExpr); + + WatermarkStrategy<RowData> watermarkStrategy = WatermarkStrategy.forGenerator(supplier); + Duration idleTimeout = configuration.get(ExecutionConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT); + if (!idleTimeout.isZero() && !idleTimeout.isNegative()) { + watermarkStrategy.withIdleness(idleTimeout); + digest = String.format("%s idletimeout=[%s]", digest, idleTimeout.toMillis()); Review comment: `%s, idletimeout=[%s]` ########## File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoFlinkTableSourceScanRuleTest.java ########## @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.functions.ScalarFunction; +import org.apache.flink.table.planner.calcite.CalciteConfig; +import org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgramBuilder; +import org.apache.flink.table.planner.plan.optimize.program.FlinkStreamProgram; +import org.apache.flink.table.planner.plan.optimize.program.HEP_RULES_EXECUTION_TYPE; +import org.apache.flink.table.planner.plan.optimize.program.StreamOptimizeContext; +import org.apache.flink.table.planner.utils.StreamTableTestUtil; +import org.apache.flink.table.planner.utils.TableConfigUtils; +import org.apache.flink.table.planner.utils.TableTestBase; + +import org.apache.calcite.plan.hep.HepMatchOrder; +import org.apache.calcite.tools.RuleSets; +import org.junit.Before; +import org.junit.Test; + +import java.time.LocalDateTime; +import java.time.ZoneOffset; + +/** + * Test rule {@link PushWatermarkIntoFlinkTableSourceScanAcrossCalcRule} and {@link PushWatermarkIntoFlinkTableSourceScanRule}. + * */ +public class PushWatermarkIntoFlinkTableSourceScanRuleTest extends TableTestBase { + private StreamTableTestUtil util = streamTestUtil(new TableConfig()); + + @Before + public void setup() { + util.buildStreamProgram(FlinkStreamProgram.LOGICAL_REWRITE()); Review comment: ditto ########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkProjectWatermarkAssignerTransposeRule.java ########## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.plan.nodes.calcite.LogicalWatermarkAssigner; +import org.apache.flink.table.planner.plan.utils.NestedColumn; +import org.apache.flink.table.planner.plan.utils.NestedProjectionUtil; +import org.apache.flink.table.planner.plan.utils.NestedSchema; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.logical.LogicalProject; +import org.apache.calcite.rel.rules.ProjectRemoveRule; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexShuttle; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Transpose between the {@link LogicalWatermarkAssigner} and {@link LogicalProject}. If the top level {@link LogicalProject} + * doesn't need rowtime column that is used by {@link LogicalWatermarkAssigner}, the rule will still keep the top level + * {@link LogicalProject} as a pruner. + */ +public class FlinkProjectWatermarkAssignerTransposeRule extends RelOptRule { + + public static final FlinkProjectWatermarkAssignerTransposeRule INSTANCE = new FlinkProjectWatermarkAssignerTransposeRule(); + + public FlinkProjectWatermarkAssignerTransposeRule() { + super(operand(LogicalProject.class, + operand(LogicalWatermarkAssigner.class, any())), + "FlinkProjectWatermarkAssignerTransposeRule"); + } + + /** + * If the rule has been applied, the projection and filter will be moved to the low level {@link LogicalProject} and + * the top level {@link LogicalProject} will only works as a pruner if exists. Therefore, only when top level + * {@link LogicalProject} has calculation or needs to prune columns except for rowtime, the process will apply the rule. + * In some situations, the query keeps the rowtime info and the top level {@link LogicalProject} has the same field + * count as the input of the {@link LogicalWatermarkAssigner}. + * */ + @Override + public boolean matches(RelOptRuleCall call) { + LogicalProject project = call.rel(0); + LogicalWatermarkAssigner watermarkAssigner = call.rel(1); + // check RexNode type + boolean allRef = project.getProjects().stream().allMatch(node -> (node instanceof RexInputRef)); + if (!allRef) { + return true; + } + + int rowTimeIndex = watermarkAssigner.rowtimeFieldIndex(); + int rowTimeIndexInProject = indexOfRowtime(project.getProjects(), rowTimeIndex); + if (rowTimeIndexInProject != -1) { + return project.getRowType().getFieldCount() != watermarkAssigner.getRowType().getFieldCount(); + } else { + return (watermarkAssigner.getRowType().getFieldCount() - project.getRowType().getFieldCount()) != 1; + } + + } + + @Override + public void onMatch(RelOptRuleCall call) { + LogicalProject project = call.rel(0); + LogicalWatermarkAssigner watermarkAssigner = call.rel(1); + + // whether rowtime field is in the top level projection + RelNode originInput = watermarkAssigner.getInput(); + int rowTimeIndex = watermarkAssigner.rowtimeFieldIndex(); + RelDataType originRowTimeType = originInput.getRowType().getFieldList().get(rowTimeIndex).getValue(); + int rowTimeIndexInTopLevelProject = indexOfRowtime(project.getProjects(), rowTimeIndex); + + // get projects and data type of the transposed LogicalProject + FlinkTypeFactory typeFactory = FlinkTypeFactory.INSTANCE(); + List<RexNode> projectsWithRowtime = rewriteRowtimeType(project.getProjects(), rowTimeIndex, originRowTimeType); + List<String> transposedProjectFieldNames = + project.getNamedProjects().stream().map(pair -> pair.right).collect(Collectors.toList()); + String rowTimeName = originInput.getRowType().getFieldNames().get(rowTimeIndex); + int rowTimeIndexInTranposedProject; + if (rowTimeIndexInTopLevelProject == -1) { + projectsWithRowtime.add(new RexInputRef(rowTimeIndex, originRowTimeType)); + + rowTimeIndexInTranposedProject = transposedProjectFieldNames.size(); + transposedProjectFieldNames.add(rowTimeName); + } else { + rowTimeIndexInTranposedProject = rowTimeIndexInTopLevelProject; + } + RelDataType transposedProjectType = typeFactory.createStructType( + projectsWithRowtime.stream().map(RexNode::getType).collect(Collectors.toList()), + transposedProjectFieldNames); + + // build the transposed LogicalProjection + LogicalProject transposedProject = project.copy(project.getTraitSet(), originInput, projectsWithRowtime, transposedProjectType); + + // prepare for rewrite + NestedSchema nestedSchema = NestedProjectionUtil.build(projectsWithRowtime, originInput.getRowType()); + if (rowTimeIndexInTopLevelProject == -1) { + NestedColumn rowtimeColumn = NestedProjectionUtil.createNestedColumnLeaf(rowTimeName, rowTimeIndex, originRowTimeType); + nestedSchema.columns().put(rowTimeName, rowtimeColumn); + } + // label by hand + nestedSchema.columns().get(rowTimeName).setIndex(rowTimeIndexInTranposedProject); + + // build the LogicalWatermarkAssigner + RexBuilder builder = call.builder().getRexBuilder(); + RexNode newWatermarkExpr = + NestedProjectionUtil.rewrite( + Collections.singletonList(watermarkAssigner.watermarkExpr()), + nestedSchema, + builder).get(0); + LogicalWatermarkAssigner newWatermarkAssigner = LogicalWatermarkAssigner.create( + watermarkAssigner.getCluster(), + transposedProject, + rowTimeIndexInTranposedProject, + newWatermarkExpr); + + // build the origin top level LogicalProjection + List<RexNode> newProjects = new ArrayList<>(project.getProjects().size()); + for (int i = 0; i < project.getProjects().size(); i++) { + newProjects.add(new RexInputRef(i, project.getProjects().get(i).getType())); + } + LogicalProject newProject = project.copy(project.getTraitSet(), newWatermarkAssigner, newProjects, project.getRowType()); + + if (ProjectRemoveRule.isTrivial(newProject)) { + // drop project if the transformed program merely returns its input + call.transformTo(newWatermarkAssigner); + } else { + call.transformTo(newProject); + } + } + + private static int indexOfRowtime(List<RexNode> projects, int rowTimeIndex) { + for (int i = 0; i < projects.size(); i++) { + RexNode project = projects.get(i); + if (project instanceof RexInputRef) { + if (((RexInputRef) project).getIndex() == rowTimeIndex) { + return i; + } + } + } + return -1; + } + + private static List<RexNode> rewriteRowtimeType(List<RexNode> projects, int rowTimeIndex, RelDataType originType) { + List<RexNode> projectsWithRowtime = projects.stream().map(node -> node.accept(new RexShuttle(){ Review comment: return the result directly ########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoFlinkLogicalTableSourceScanBaseRule.java ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.flink.api.common.eventtime.Watermark; +import org.apache.flink.api.common.eventtime.WatermarkGenerator; +import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier; +import org.apache.flink.api.common.eventtime.WatermarkOutput; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.planner.calcite.FlinkContext; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.codegen.WatermarkGeneratorCodeGenerator; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableSourceScan; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalWatermarkAssigner; +import org.apache.flink.table.planner.plan.schema.TableSourceTable; +import org.apache.flink.table.runtime.generated.GeneratedWatermarkGenerator; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleOperand; +import org.apache.calcite.rex.RexNode; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import scala.Option; + +/** + * Base rule for interface {@link SupportsWatermarkPushDown}. It offers a util to push the {@link FlinkLogicalWatermarkAssigner} + * into the {@link FlinkLogicalTableSourceScan}. + */ +public abstract class PushWatermarkIntoFlinkLogicalTableSourceScanBaseRule extends RelOptRule { Review comment: the class name can be simplified as `PushWatermarkIntoTableSourceScanRuleBase` ########## File path: flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.xml ########## @@ -0,0 +1,152 @@ +<?xml version="1.0" ?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to you under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> +<Root> + <TestCase name="testSimpleWatermark"> + <Resource name="sql"> + <![CDATA[select a, c from MyTable]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(a=[$0], c=[$2]) ++- LogicalWatermarkAssigner(rowtime=[c], watermark=[-($2, 5000:INTERVAL SECOND)]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +WatermarkAssigner(rowtime=[c], watermark=[-(c, 5000:INTERVAL SECOND)]) Review comment: why the watermark node can't be pushed down ########## File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/rules/logical/FlinkProjectWatermarkAssignerTransposeRuleTest.java ########## @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.planner.calcite.CalciteConfig; +import org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgramBuilder; +import org.apache.flink.table.planner.plan.optimize.program.FlinkStreamProgram; +import org.apache.flink.table.planner.plan.optimize.program.HEP_RULES_EXECUTION_TYPE; +import org.apache.flink.table.planner.plan.optimize.program.StreamOptimizeContext; +import org.apache.flink.table.planner.utils.StreamTableTestUtil; +import org.apache.flink.table.planner.utils.TableConfigUtils; +import org.apache.flink.table.planner.utils.TableTestBase; + +import org.apache.calcite.plan.hep.HepMatchOrder; +import org.apache.calcite.tools.RuleSets; +import org.junit.Before; +import org.junit.Test; + +/** + * Test for {@link FlinkProjectWatermarkAssignerTransposeRule}. + */ +public class FlinkProjectWatermarkAssignerTransposeRuleTest extends TableTestBase { + private StreamTableTestUtil util = streamTestUtil(new TableConfig()); + + @Before + public void setup() { + util.buildStreamProgram(FlinkStreamProgram.DEFAULT_REWRITE()); + CalciteConfig calciteConfig = TableConfigUtils.getCalciteConfig(util.tableEnv().getConfig()); + calciteConfig.getStreamProgram().get().addLast( + "ProjectWatermarkAssignerTranspose", + FlinkHepRuleSetProgramBuilder.<StreamOptimizeContext>newBuilder() + .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE()) + .setHepMatchOrder(HepMatchOrder.BOTTOM_UP) + .add(RuleSets.ofList(FlinkProjectWatermarkAssignerTransposeRule.INSTANCE)) + .build() + ); + } + + @Test + public void simpleTranspose() { + String ddl = + "CREATE TABLE Source(\n" + + " a INT,\n" + + " b BIGINT,\n" + + " c TIMESTAMP(3),\n" + + "WATERMARK FOR c AS c" + + ") WITH (" + + " 'connector' = 'values',\n" + + " 'bounded' = 'false'\n" + + ")"; + + util.tableEnv().executeSql(ddl); + util.verifyPlan("SELECT a, c FROM Source"); + } + + @Test + public void cannotTranspose() { + String ddl = + "CREATE TABLE Source(\n" + + " a INT,\n" + + " b BIGINT,\n" + + " c TIMESTAMP(3),\n" + + "WATERMARK FOR c AS c" + + ") WITH (" + + " 'connector' = 'values',\n" + + " 'bounded' = 'false'\n" + + ")"; + + util.tableEnv().executeSql(ddl); + util.verifyPlan("SELECT b, a FROM Source"); + } + + @Test + public void transposeWithReorder() { Review comment: should also consider ddl with computed column and watermark with expression ########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoFlinkTableSourceScanAcrossCalcRule.java ########## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown; +import org.apache.flink.table.planner.calcite.FlinkContext; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableSourceScan; +import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalWatermarkAssigner; +import org.apache.flink.table.planner.plan.schema.TableSourceTable; + +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexLocalRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexProgram; +import org.apache.calcite.rex.RexProgramBuilder; +import org.apache.calcite.rex.RexShuttle; +import org.apache.calcite.util.Pair; + +import java.util.List; + +/** + * Rule to push the {@link FlinkLogicalWatermarkAssigner} across the {@link FlinkLogicalCalc} to the {@link FlinkLogicalTableSourceScan}. + * The rule will first look for the computed column in the {@link FlinkLogicalCalc} and then translate the watermark expression + * and the computed column into a {@link WatermarkStrategy}. With the new scan the rule will build a new {@link FlinkLogicalCalc}. + */ +public class PushWatermarkIntoFlinkTableSourceScanAcrossCalcRule extends PushWatermarkIntoFlinkLogicalTableSourceScanBaseRule { + public static final PushWatermarkIntoFlinkTableSourceScanAcrossCalcRule INSTANCE = new PushWatermarkIntoFlinkTableSourceScanAcrossCalcRule(); + + public PushWatermarkIntoFlinkTableSourceScanAcrossCalcRule() { + super(operand(FlinkLogicalWatermarkAssigner.class, + operand(FlinkLogicalCalc.class, + operand(FlinkLogicalTableSourceScan.class, none()))), + "PushWatermarkIntoFlinkTableSourceScanAcrossProjectRule"); + } + + @Override + public boolean matches(RelOptRuleCall call) { + FlinkLogicalTableSourceScan scan = call.rel(2); + TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class); + return tableSourceTable != null && tableSourceTable.tableSource() instanceof SupportsWatermarkPushDown; Review comment: we can extract a common method into `PushWatermarkIntoFlinkLogicalTableSourceScanBaseRule` ########## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenTest.scala ########## @@ -36,17 +38,22 @@ import org.apache.flink.table.utils.CatalogManagerMocks import org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema import org.apache.calcite.plan.ConventionTraitDef import org.apache.calcite.rel.`type`.RelDataType -import org.junit.Assert.{assertEquals, assertTrue} -import org.junit.Test import java.lang.{Integer => JInt, Long => JLong} +import java.util import java.util.Collections import java.util.function.{Function => JFunction, Supplier => JSupplier} +import org.junit.Assert.{assertEquals, assertTrue} +import org.junit.runner.RunWith +import org.junit.runners.Parameterized +import org.junit.Test + /** * Tests the generated [[WatermarkGenerator]] from [[WatermarkGeneratorCodeGenerator]]. */ -class WatermarkGeneratorCodeGenTest { +@RunWith(classOf[Parameterized]) +class WatermarkGeneratorCodeGenTest(val useDefinedConstructor: Boolean) { Review comment: nit: remove `val` ########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/MergeTableLikeUtil.java ########## @@ -463,7 +466,19 @@ private void collectPhysicalFieldsTypes(List<SqlNode> derivedColumns) { boolean nullable = type.getNullable() == null ? true : type.getNullable(); RelDataType relType = type.deriveType(sqlValidator, nullable); // add field name and field type to physical field list - physicalFieldNamesToTypes.put(name, relType); + nonComputedFieldNamesToTypes.put(name, relType); + } else if (derivedColumn instanceof SqlMetadataColumn) { + SqlMetadataColumn metadataColumn = (SqlMetadataColumn) derivedColumn; + String name = metadataColumn.getName().getSimple(); + if (columns.containsKey(name)) { + throw new ValidationException(String.format( + "A column named '%s' already exists in the base table.", + name)); + } + RelDataType relType = metadataColumn.getType() + .deriveType(sqlValidator, metadataColumn.getType().getNullable()); + // add field name and field type to physical field list + nonComputedFieldNamesToTypes.put(name, relType); Review comment: Are there any tests that cover these changes? ########## File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/rules/logical/FlinkProjectWatermarkAssignerTransposeRuleTest.java ########## @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.planner.calcite.CalciteConfig; +import org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgramBuilder; +import org.apache.flink.table.planner.plan.optimize.program.FlinkStreamProgram; +import org.apache.flink.table.planner.plan.optimize.program.HEP_RULES_EXECUTION_TYPE; +import org.apache.flink.table.planner.plan.optimize.program.StreamOptimizeContext; +import org.apache.flink.table.planner.utils.StreamTableTestUtil; +import org.apache.flink.table.planner.utils.TableConfigUtils; +import org.apache.flink.table.planner.utils.TableTestBase; + +import org.apache.calcite.plan.hep.HepMatchOrder; +import org.apache.calcite.tools.RuleSets; +import org.junit.Before; +import org.junit.Test; + +/** + * Test for {@link FlinkProjectWatermarkAssignerTransposeRule}. + */ +public class FlinkProjectWatermarkAssignerTransposeRuleTest extends TableTestBase { + private StreamTableTestUtil util = streamTestUtil(new TableConfig()); + + @Before + public void setup() { + util.buildStreamProgram(FlinkStreamProgram.DEFAULT_REWRITE()); Review comment: the test should only involves the required rules ########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanBaseRule.java ########## @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.flink.api.common.eventtime.Watermark; +import org.apache.flink.api.common.eventtime.WatermarkGenerator; +import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier; +import org.apache.flink.api.common.eventtime.WatermarkOutput; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.planner.calcite.FlinkContext; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.codegen.WatermarkGeneratorCodeGenerator; +import org.apache.flink.table.planner.plan.nodes.calcite.LogicalWatermarkAssigner; +import org.apache.flink.table.planner.plan.schema.TableSourceTable; +import org.apache.flink.table.runtime.generated.GeneratedWatermarkGenerator; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleOperand; +import org.apache.calcite.rel.logical.LogicalTableScan; +import org.apache.calcite.rex.RexNode; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import scala.Option; + +/** + * Base Planner rule for {@link PushWatermarkIntoTableSourceScanRule} and {@link PushWatermarkIntoTableSourceScanAcrossProjectRule}. Review comment: update the comments ########## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenerator.scala ########## @@ -87,3 +119,47 @@ object WatermarkGeneratorCodeGenerator { new GeneratedWatermarkGenerator(funcName, funcCode, ctx.references.toArray) } } + +class WatermarkGeneratorCodeGeneratorContext( + tableConfig: TableConfig, + contextTerm: String = "parameters") extends CodeGeneratorContext(tableConfig) { + + override def addReusableFunction( + function: UserDefinedFunction, + functionContextClass: Class[_ <: FunctionContext] = classOf[FunctionContext], + runtimeContextTerm: String = null): String = { + super.addReusableFunction( + function, classOf[WatermarkGeneratorCodeGeneratorFunctionContextWrapper], this.contextTerm) + } + + override def addReusableConverter( + dataType: DataType, + classLoaderTerm: String = null): String = { + super.addReusableConverter(dataType, "this.getClass().getClassLoader()") + } +} + +class WatermarkGeneratorCodeGeneratorFunctionContextWrapper( Review comment: use a shorter name: `WatermarkGeneratorFunctionContext` ? ########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanAcrossProjectRule.java ########## @@ -36,12 +36,13 @@ import java.util.List; /** - * WatermarkAssignerProjectTransposeRule. + * Planner rule that push {@link LogicalWatermarkAssigner} into a {@link LogicalTableScan} + * which wraps a {@link SupportsWatermarkPushDown} dynamic table source across {@link LogicalProject}. * */ -public class WatermarkAssignerProjectTransposeRule extends RelOptRule { - public static final WatermarkAssignerProjectTransposeRule INSTANCE = new WatermarkAssignerProjectTransposeRule(); +public class PushWatermarkIntoTableSourceScanAcrossProjectRule extends PushWatermarkIntoTableSourceScanBaseRule { + public static final PushWatermarkIntoTableSourceScanAcrossProjectRule INSTANCE = new PushWatermarkIntoTableSourceScanAcrossProjectRule(); - public WatermarkAssignerProjectTransposeRule() { + public PushWatermarkIntoTableSourceScanAcrossProjectRule() { Review comment: please update the rule description ########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkProjectWatermarkAssignerTransposeRule.java ########## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.plan.nodes.calcite.LogicalWatermarkAssigner; +import org.apache.flink.table.planner.plan.utils.NestedColumn; +import org.apache.flink.table.planner.plan.utils.NestedProjectionUtil; +import org.apache.flink.table.planner.plan.utils.NestedSchema; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.logical.LogicalProject; +import org.apache.calcite.rel.rules.ProjectRemoveRule; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexShuttle; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Transpose between the {@link LogicalWatermarkAssigner} and {@link LogicalProject}. If the top level {@link LogicalProject} + * doesn't need rowtime column that is used by {@link LogicalWatermarkAssigner}, the rule will still keep the top level + * {@link LogicalProject} as a pruner. + */ +public class FlinkProjectWatermarkAssignerTransposeRule extends RelOptRule { + + public static final FlinkProjectWatermarkAssignerTransposeRule INSTANCE = new FlinkProjectWatermarkAssignerTransposeRule(); + + public FlinkProjectWatermarkAssignerTransposeRule() { + super(operand(LogicalProject.class, + operand(LogicalWatermarkAssigner.class, any())), + "FlinkProjectWatermarkAssignerTransposeRule"); + } + + /** + * If the rule has been applied, the projection and filter will be moved to the low level {@link LogicalProject} and + * the top level {@link LogicalProject} will only works as a pruner if exists. Therefore, only when top level + * {@link LogicalProject} has calculation or needs to prune columns except for rowtime, the process will apply the rule. + * In some situations, the query keeps the rowtime info and the top level {@link LogicalProject} has the same field + * count as the input of the {@link LogicalWatermarkAssigner}. + * */ + @Override + public boolean matches(RelOptRuleCall call) { + LogicalProject project = call.rel(0); + LogicalWatermarkAssigner watermarkAssigner = call.rel(1); + // check RexNode type + boolean allRef = project.getProjects().stream().allMatch(node -> (node instanceof RexInputRef)); + if (!allRef) { + return true; + } + + int rowTimeIndex = watermarkAssigner.rowtimeFieldIndex(); + int rowTimeIndexInProject = indexOfRowtime(project.getProjects(), rowTimeIndex); + if (rowTimeIndexInProject != -1) { + return project.getRowType().getFieldCount() != watermarkAssigner.getRowType().getFieldCount(); Review comment: maybe the projection outputs the deduplicate fields, such as: a, a, a, b, b you should push down the used fields in the projection. ########## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenTest.scala ########## @@ -154,7 +200,9 @@ class WatermarkGeneratorCodeGenTest { assertTrue(JavaFunc5.closeCalled) } - private def generateWatermarkGenerator(expr: String): WatermarkGenerator = { + Review comment: nit: redundant line ########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkProjectWatermarkAssignerTransposeRule.java ########## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.plan.nodes.calcite.LogicalWatermarkAssigner; +import org.apache.flink.table.planner.plan.utils.NestedColumn; +import org.apache.flink.table.planner.plan.utils.NestedProjectionUtil; +import org.apache.flink.table.planner.plan.utils.NestedSchema; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.logical.LogicalProject; +import org.apache.calcite.rel.rules.ProjectRemoveRule; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexShuttle; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Transpose between the {@link LogicalWatermarkAssigner} and {@link LogicalProject}. If the top level {@link LogicalProject} + * doesn't need rowtime column that is used by {@link LogicalWatermarkAssigner}, the rule will still keep the top level + * {@link LogicalProject} as a pruner. + */ +public class FlinkProjectWatermarkAssignerTransposeRule extends RelOptRule { + + public static final FlinkProjectWatermarkAssignerTransposeRule INSTANCE = new FlinkProjectWatermarkAssignerTransposeRule(); + + public FlinkProjectWatermarkAssignerTransposeRule() { + super(operand(LogicalProject.class, + operand(LogicalWatermarkAssigner.class, any())), + "FlinkProjectWatermarkAssignerTransposeRule"); + } + + /** + * If the rule has been applied, the projection and filter will be moved to the low level {@link LogicalProject} and + * the top level {@link LogicalProject} will only works as a pruner if exists. Therefore, only when top level + * {@link LogicalProject} has calculation or needs to prune columns except for rowtime, the process will apply the rule. + * In some situations, the query keeps the rowtime info and the top level {@link LogicalProject} has the same field + * count as the input of the {@link LogicalWatermarkAssigner}. + * */ + @Override + public boolean matches(RelOptRuleCall call) { + LogicalProject project = call.rel(0); + LogicalWatermarkAssigner watermarkAssigner = call.rel(1); + // check RexNode type + boolean allRef = project.getProjects().stream().allMatch(node -> (node instanceof RexInputRef)); + if (!allRef) { + return true; + } + + int rowTimeIndex = watermarkAssigner.rowtimeFieldIndex(); + int rowTimeIndexInProject = indexOfRowtime(project.getProjects(), rowTimeIndex); + if (rowTimeIndexInProject != -1) { + return project.getRowType().getFieldCount() != watermarkAssigner.getRowType().getFieldCount(); + } else { + return (watermarkAssigner.getRowType().getFieldCount() - project.getRowType().getFieldCount()) != 1; + } + + } + + @Override + public void onMatch(RelOptRuleCall call) { + LogicalProject project = call.rel(0); + LogicalWatermarkAssigner watermarkAssigner = call.rel(1); + + // whether rowtime field is in the top level projection + RelNode originInput = watermarkAssigner.getInput(); + int rowTimeIndex = watermarkAssigner.rowtimeFieldIndex(); + RelDataType originRowTimeType = originInput.getRowType().getFieldList().get(rowTimeIndex).getValue(); + int rowTimeIndexInTopLevelProject = indexOfRowtime(project.getProjects(), rowTimeIndex); + + // get projects and data type of the transposed LogicalProject + FlinkTypeFactory typeFactory = FlinkTypeFactory.INSTANCE(); + List<RexNode> projectsWithRowtime = rewriteRowtimeType(project.getProjects(), rowTimeIndex, originRowTimeType); + List<String> transposedProjectFieldNames = + project.getNamedProjects().stream().map(pair -> pair.right).collect(Collectors.toList()); + String rowTimeName = originInput.getRowType().getFieldNames().get(rowTimeIndex); + int rowTimeIndexInTranposedProject; + if (rowTimeIndexInTopLevelProject == -1) { + projectsWithRowtime.add(new RexInputRef(rowTimeIndex, originRowTimeType)); + + rowTimeIndexInTranposedProject = transposedProjectFieldNames.size(); + transposedProjectFieldNames.add(rowTimeName); + } else { + rowTimeIndexInTranposedProject = rowTimeIndexInTopLevelProject; + } + RelDataType transposedProjectType = typeFactory.createStructType( + projectsWithRowtime.stream().map(RexNode::getType).collect(Collectors.toList()), + transposedProjectFieldNames); + + // build the transposed LogicalProjection + LogicalProject transposedProject = project.copy(project.getTraitSet(), originInput, projectsWithRowtime, transposedProjectType); + + // prepare for rewrite + NestedSchema nestedSchema = NestedProjectionUtil.build(projectsWithRowtime, originInput.getRowType()); + if (rowTimeIndexInTopLevelProject == -1) { + NestedColumn rowtimeColumn = NestedProjectionUtil.createNestedColumnLeaf(rowTimeName, rowTimeIndex, originRowTimeType); + nestedSchema.columns().put(rowTimeName, rowtimeColumn); + } + // label by hand + nestedSchema.columns().get(rowTimeName).setIndex(rowTimeIndexInTranposedProject); + + // build the LogicalWatermarkAssigner + RexBuilder builder = call.builder().getRexBuilder(); + RexNode newWatermarkExpr = + NestedProjectionUtil.rewrite( + Collections.singletonList(watermarkAssigner.watermarkExpr()), + nestedSchema, + builder).get(0); + LogicalWatermarkAssigner newWatermarkAssigner = LogicalWatermarkAssigner.create( + watermarkAssigner.getCluster(), + transposedProject, + rowTimeIndexInTranposedProject, + newWatermarkExpr); + + // build the origin top level LogicalProjection + List<RexNode> newProjects = new ArrayList<>(project.getProjects().size()); + for (int i = 0; i < project.getProjects().size(); i++) { + newProjects.add(new RexInputRef(i, project.getProjects().get(i).getType())); + } + LogicalProject newProject = project.copy(project.getTraitSet(), newWatermarkAssigner, newProjects, project.getRowType()); + + if (ProjectRemoveRule.isTrivial(newProject)) { + // drop project if the transformed program merely returns its input + call.transformTo(newWatermarkAssigner); + } else { + call.transformTo(newProject); + } + } + + private static int indexOfRowtime(List<RexNode> projects, int rowTimeIndex) { + for (int i = 0; i < projects.size(); i++) { + RexNode project = projects.get(i); + if (project instanceof RexInputRef) { + if (((RexInputRef) project).getIndex() == rowTimeIndex) { + return i; + } + } + } + return -1; + } + + private static List<RexNode> rewriteRowtimeType(List<RexNode> projects, int rowTimeIndex, RelDataType originType) { Review comment: originType => rowtimeType ########## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenTest.scala ########## @@ -136,9 +145,46 @@ class WatermarkGeneratorCodeGenTest { "myFunc"), new JavaFunc5 ) - val generator = generateWatermarkGenerator("myFunc(ts, `offset`)") - // mock open and close invoking - generator.setRuntimeContext(new MockStreamingRuntimeContext(false, 1, 1)) + val generator = generateWatermarkGenerator("myFunc(ts, `offset`)", + useDefinedConstructor) + if (!useDefinedConstructor) { + // mock open and close invoking + generator.setRuntimeContext(new MockStreamingRuntimeContext(false, 1, 1)) + } + generator.open(new Configuration()) + val results = data.map(d => generator.currentWatermark(d)) + generator.close() + val expected = List( + JLong.valueOf(995L), + null, + null, + JLong.valueOf(4997L), + JLong.valueOf(3990L), + JLong.valueOf(5992L)) + assertEquals(expected, results) + assertTrue(JavaFunc5.openCalled) + assertTrue(JavaFunc5.closeCalled) + } + + @Test + def testCustomizedWatermark(): Unit = { Review comment: extract the common code with `testLegacyCustomizedWatermark` and `testCustomizedWatermark ` to another method ? ########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkProjectWatermarkAssignerTransposeRule.java ########## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.plan.nodes.calcite.LogicalWatermarkAssigner; +import org.apache.flink.table.planner.plan.utils.NestedColumn; +import org.apache.flink.table.planner.plan.utils.NestedProjectionUtil; +import org.apache.flink.table.planner.plan.utils.NestedSchema; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.logical.LogicalProject; +import org.apache.calcite.rel.rules.ProjectRemoveRule; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexShuttle; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Transpose between the {@link LogicalWatermarkAssigner} and {@link LogicalProject}. If the top level {@link LogicalProject} + * doesn't need rowtime column that is used by {@link LogicalWatermarkAssigner}, the rule will still keep the top level + * {@link LogicalProject} as a pruner. + */ +public class FlinkProjectWatermarkAssignerTransposeRule extends RelOptRule { Review comment: remove the prefix `Flink`, it can be simplified as `ProjectWatermarkAssignerTransposeRule` ########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkProjectWatermarkAssignerTransposeRule.java ########## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.rules.logical; + +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.plan.nodes.calcite.LogicalWatermarkAssigner; +import org.apache.flink.table.planner.plan.utils.NestedColumn; +import org.apache.flink.table.planner.plan.utils.NestedProjectionUtil; +import org.apache.flink.table.planner.plan.utils.NestedSchema; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.logical.LogicalProject; +import org.apache.calcite.rel.rules.ProjectRemoveRule; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexShuttle; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Transpose between the {@link LogicalWatermarkAssigner} and {@link LogicalProject}. If the top level {@link LogicalProject} + * doesn't need rowtime column that is used by {@link LogicalWatermarkAssigner}, the rule will still keep the top level + * {@link LogicalProject} as a pruner. + */ +public class FlinkProjectWatermarkAssignerTransposeRule extends RelOptRule { + + public static final FlinkProjectWatermarkAssignerTransposeRule INSTANCE = new FlinkProjectWatermarkAssignerTransposeRule(); + + public FlinkProjectWatermarkAssignerTransposeRule() { + super(operand(LogicalProject.class, + operand(LogicalWatermarkAssigner.class, any())), + "FlinkProjectWatermarkAssignerTransposeRule"); + } + + /** + * If the rule has been applied, the projection and filter will be moved to the low level {@link LogicalProject} and + * the top level {@link LogicalProject} will only works as a pruner if exists. Therefore, only when top level + * {@link LogicalProject} has calculation or needs to prune columns except for rowtime, the process will apply the rule. + * In some situations, the query keeps the rowtime info and the top level {@link LogicalProject} has the same field + * count as the input of the {@link LogicalWatermarkAssigner}. + * */ + @Override + public boolean matches(RelOptRuleCall call) { + LogicalProject project = call.rel(0); + LogicalWatermarkAssigner watermarkAssigner = call.rel(1); + // check RexNode type + boolean allRef = project.getProjects().stream().allMatch(node -> (node instanceof RexInputRef)); + if (!allRef) { + return true; + } + + int rowTimeIndex = watermarkAssigner.rowtimeFieldIndex(); + int rowTimeIndexInProject = indexOfRowtime(project.getProjects(), rowTimeIndex); + if (rowTimeIndexInProject != -1) { + return project.getRowType().getFieldCount() != watermarkAssigner.getRowType().getFieldCount(); + } else { + return (watermarkAssigner.getRowType().getFieldCount() - project.getRowType().getFieldCount()) != 1; + } + + } + + @Override + public void onMatch(RelOptRuleCall call) { + LogicalProject project = call.rel(0); + LogicalWatermarkAssigner watermarkAssigner = call.rel(1); + + // whether rowtime field is in the top level projection + RelNode originInput = watermarkAssigner.getInput(); + int rowTimeIndex = watermarkAssigner.rowtimeFieldIndex(); + RelDataType originRowTimeType = originInput.getRowType().getFieldList().get(rowTimeIndex).getValue(); + int rowTimeIndexInTopLevelProject = indexOfRowtime(project.getProjects(), rowTimeIndex); + + // get projects and data type of the transposed LogicalProject + FlinkTypeFactory typeFactory = FlinkTypeFactory.INSTANCE(); Review comment: use `project.getCluster().getTypeFactory()`, and move it close to the place where it is used ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
