godfreyhe commented on a change in pull request #13449:
URL: https://github.com/apache/flink/pull/13449#discussion_r518045062



##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoFlinkTableSourceScanRule.java
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import 
org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableSourceScan;
+import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalWatermarkAssigner;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+
+
+/**
+ * Rule to push the {@link FlinkLogicalWatermarkAssigner} into the {@link 
FlinkLogicalTableSourceScan}.
+ */
+public class PushWatermarkIntoFlinkTableSourceScanRule extends 
PushWatermarkIntoFlinkLogicalTableSourceScanBaseRule {

Review comment:
       PushWatermarkIntoTableSourceScanRule

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoFlinkLogicalTableSourceScanBaseRule.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import 
org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.codegen.WatermarkGeneratorCodeGenerator;
+import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableSourceScan;
+import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalWatermarkAssigner;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.runtime.generated.GeneratedWatermarkGenerator;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.rex.RexNode;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import scala.Option;
+
+/**
+ * Base rule for interface {@link SupportsWatermarkPushDown}. It offers a util 
to push the {@link FlinkLogicalWatermarkAssigner}
+ * into the {@link FlinkLogicalTableSourceScan}.
+ */
+public abstract class PushWatermarkIntoFlinkLogicalTableSourceScanBaseRule 
extends RelOptRule {
+
+       public 
PushWatermarkIntoFlinkLogicalTableSourceScanBaseRule(RelOptRuleOperand operand,
+                       String description) {
+               super(operand, description);
+       }
+
+       /**
+        * It uses the input watermark expression to generate the {@link 
WatermarkGeneratorSupplier}. After the {@link WatermarkStrategy}
+        * is pushed into the scan, it will build a new scan. However, when 
{@link FlinkLogicalWatermarkAssigner} is the parent of the
+        * {@link FlinkLogicalTableSourceScan} it should modify the rowtime 
type to keep the type of plan is consistent. In other cases,
+        * it just keep the data type of the scan as same as before and leave 
the work when rewriting the projection.
+        */
+       protected FlinkLogicalTableSourceScan getNewScan(
+                       FlinkLogicalWatermarkAssigner watermarkAssigner, 
RexNode watermarkExpr, FlinkLogicalTableSourceScan scan, FlinkContext context) {
+
+               GeneratedWatermarkGenerator generatedWatermarkGenerator =
+                               
WatermarkGeneratorCodeGenerator.generateWatermarkGenerator(
+                                               context.getTableConfig(),
+                                               
FlinkTypeFactory.toLogicalRowType(scan.getRowType()),
+                                               watermarkExpr,
+                                               Option.apply("context"));
+               Configuration configuration = 
context.getTableConfig().getConfiguration();
+
+               WatermarkGeneratorSupplier<RowData> supplier = new 
DefaultWatermarkGeneratorSupplier(configuration, generatedWatermarkGenerator);
+               String digest = String.format("watermark=[%s]", watermarkExpr);
+
+               WatermarkStrategy<RowData> watermarkStrategy = 
WatermarkStrategy.forGenerator(supplier);
+               Duration idleTimeout = 
configuration.get(ExecutionConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT);
+               if (!idleTimeout.isZero() && !idleTimeout.isNegative()) {
+                       watermarkStrategy.withIdleness(idleTimeout);
+                       digest = String.format("%s idletimeout=[%s]", digest, 
idleTimeout.toMillis());
+               }
+
+               TableSourceTable tableSourceTable = 
scan.getTable().unwrap(TableSourceTable.class);
+               DynamicTableSource newDynamicTableSource = 
tableSourceTable.tableSource().copy();
+
+               ((SupportsWatermarkPushDown) 
newDynamicTableSource).applyWatermark(watermarkStrategy);
+               // scan row type: set rowtime type
+               TableSourceTable newTableSourceTable;
+               if 
(scan.getRowType().equals(watermarkAssigner.getInput().getRowType())) {
+                       // without projection or project doesn't project or 
project doesn't add new computed columns
+                       newTableSourceTable = tableSourceTable.copy(
+                                       newDynamicTableSource,
+                                       watermarkAssigner.getRowType(),
+                                       new String[]{digest});
+               } else {
+                       // project exists. make the project's rowtype is 
consistent with the origin plan.
+                       newTableSourceTable = tableSourceTable.copy(
+                                       newDynamicTableSource,
+                                       scan.getRowType(),
+                                       new String[]{digest});
+               }
+
+               FlinkLogicalTableSourceScan newScan = 
FlinkLogicalTableSourceScan.create(scan.getCluster(),
+                               newTableSourceTable);
+               return newScan;
+       }
+
+       /**
+        * Wrapper of the {@link GeneratedWatermarkGenerator} that is used to 
create {@link WatermarkGenerator}.
+        * The {@link DefaultWatermarkGeneratorSupplier} uses the {@link 
WatermarkGeneratorSupplier.Context} to init
+        * the generated watermark generator.
+        */
+       private static class DefaultWatermarkGeneratorSupplier implements 
WatermarkGeneratorSupplier<RowData> {
+
+               private static final long serialVersionUID = 1L;
+
+               private final Configuration configuration;
+               private final GeneratedWatermarkGenerator 
generatedWatermarkGenerator;
+
+               public DefaultWatermarkGeneratorSupplier(Configuration 
configuration,
+                               GeneratedWatermarkGenerator 
generatedWatermarkGenerator) {
+                       this.configuration = configuration;
+                       this.generatedWatermarkGenerator = 
generatedWatermarkGenerator;
+               }
+
+               @Override
+               public WatermarkGenerator<RowData> 
createWatermarkGenerator(Context context) {
+
+                       List<Object> references = new 
ArrayList<>(Arrays.asList(generatedWatermarkGenerator.getReferences()));
+                       references.add(context);
+
+                       
org.apache.flink.table.runtime.generated.WatermarkGenerator 
innerWatermarkGenerator =
+                                       new GeneratedWatermarkGenerator(
+                                                       
generatedWatermarkGenerator.getClassName(),
+                                                       
generatedWatermarkGenerator.getCode(),
+                                                       references.toArray())
+                                                       
.newInstance(Thread.currentThread().getContextClassLoader());
+
+                       try {
+                               innerWatermarkGenerator.open(configuration);
+                       } catch (Exception e) {
+                               throw new RuntimeException("Fail to instantiate 
generated watermark generator.", e);
+                       }
+                       return new 
DefaultWatermarkGeneratorSupplier.DefaultWatermarkGenerator(innerWatermarkGenerator);
+               }
+
+               /**
+                * Wrapper of the code-generated {@link 
org.apache.flink.table.runtime.generated.WatermarkGenerator}.
+                */
+               private class DefaultWatermarkGenerator implements 
WatermarkGenerator<RowData> {

Review comment:
       add `static` 

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoFlinkTableSourceScanRule.java
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import 
org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableSourceScan;
+import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalWatermarkAssigner;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+
+
+/**
+ * Rule to push the {@link FlinkLogicalWatermarkAssigner} into the {@link 
FlinkLogicalTableSourceScan}.
+ */
+public class PushWatermarkIntoFlinkTableSourceScanRule extends 
PushWatermarkIntoFlinkLogicalTableSourceScanBaseRule {
+       public static final PushWatermarkIntoFlinkTableSourceScanRule INSTANCE 
= new PushWatermarkIntoFlinkTableSourceScanRule();
+
+       public PushWatermarkIntoFlinkTableSourceScanRule() {
+               super(operand(FlinkLogicalWatermarkAssigner.class,
+                               operand(FlinkLogicalTableSourceScan.class, 
none())),
+                               "PushWatermarkIntoFlinkTableSourceScan");

Review comment:
       "PushWatermarkIntoFlinkTableSourceScan" -> 
"PushWatermarkIntoTableSourceScanRule"

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoFlinkTableSourceScanAcrossCalcRule.java
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import 
org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc;
+import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableSourceScan;
+import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalWatermarkAssigner;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLocalRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexProgram;
+import org.apache.calcite.rex.RexProgramBuilder;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.util.Pair;
+
+import java.util.List;
+
+/**
+ * Rule to push the {@link FlinkLogicalWatermarkAssigner} across the {@link 
FlinkLogicalCalc} to the {@link FlinkLogicalTableSourceScan}.
+ * The rule will first look for the computed column in the {@link 
FlinkLogicalCalc} and then translate the watermark expression
+ * and the computed column into a {@link WatermarkStrategy}. With the new scan 
the rule will build a new {@link FlinkLogicalCalc}.
+ */
+public class PushWatermarkIntoFlinkTableSourceScanAcrossCalcRule extends 
PushWatermarkIntoFlinkLogicalTableSourceScanBaseRule {

Review comment:
       PushWatermarkIntoTableSourceScanAcrossCalcRule

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoFlinkTableSourceScanAcrossCalcRule.java
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import 
org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc;
+import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableSourceScan;
+import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalWatermarkAssigner;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLocalRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexProgram;
+import org.apache.calcite.rex.RexProgramBuilder;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.util.Pair;
+
+import java.util.List;
+
+/**
+ * Rule to push the {@link FlinkLogicalWatermarkAssigner} across the {@link 
FlinkLogicalCalc} to the {@link FlinkLogicalTableSourceScan}.
+ * The rule will first look for the computed column in the {@link 
FlinkLogicalCalc} and then translate the watermark expression
+ * and the computed column into a {@link WatermarkStrategy}. With the new scan 
the rule will build a new {@link FlinkLogicalCalc}.
+ */
+public class PushWatermarkIntoFlinkTableSourceScanAcrossCalcRule extends 
PushWatermarkIntoFlinkLogicalTableSourceScanBaseRule {
+       public static final PushWatermarkIntoFlinkTableSourceScanAcrossCalcRule 
INSTANCE = new PushWatermarkIntoFlinkTableSourceScanAcrossCalcRule();
+
+       public PushWatermarkIntoFlinkTableSourceScanAcrossCalcRule() {
+               super(operand(FlinkLogicalWatermarkAssigner.class,
+                               operand(FlinkLogicalCalc.class,
+                                               
operand(FlinkLogicalTableSourceScan.class, none()))),
+                               
"PushWatermarkIntoFlinkTableSourceScanAcrossProjectRule");
+       }
+
+       @Override
+       public boolean matches(RelOptRuleCall call) {
+               FlinkLogicalTableSourceScan scan = call.rel(2);
+               TableSourceTable tableSourceTable = 
scan.getTable().unwrap(TableSourceTable.class);
+               return tableSourceTable != null && 
tableSourceTable.tableSource() instanceof SupportsWatermarkPushDown;
+       }
+
+       @Override
+       public void onMatch(RelOptRuleCall call) {
+               FlinkLogicalWatermarkAssigner watermarkAssigner = call.rel(0);
+               FlinkLogicalCalc calc = call.rel(1);
+
+               RexProgram originProgram = calc.getProgram();
+               List<RexLocalRef> projectList = originProgram.getProjectList();
+
+               //get watermark expression
+               RexNode computedColumn = 
originProgram.expandLocalRef(projectList.get(watermarkAssigner.rowtimeFieldIndex()));
+               RexNode newWatermarkExpr = 
watermarkAssigner.watermarkExpr().accept(new RexShuttle() {
+                       @Override
+                       public RexNode visitInputRef(RexInputRef inputRef) {

Review comment:
       we should only replace the rowtime field ref and other refs should be 
re-indexed based on TableScan's rowtype.

##########
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.scala
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.stream.sql
+
+import org.apache.flink.table.planner.utils.TableTestBase
+import org.apache.flink.table.planner.plan.rules.logical._
+import org.junit.Test
+
+/**
+ * Tests for [[PushWatermarkIntoFlinkTableSourceScanRule]] and
+ * [[PushWatermarkIntoFlinkTableSourceScanAcrossCalcRule]].

Review comment:
       Tests for watermark push down.

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoFlinkTableSourceScanAcrossCalcRule.java
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import 
org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc;
+import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableSourceScan;
+import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalWatermarkAssigner;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLocalRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexProgram;
+import org.apache.calcite.rex.RexProgramBuilder;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.util.Pair;
+
+import java.util.List;
+
+/**
+ * Rule to push the {@link FlinkLogicalWatermarkAssigner} across the {@link 
FlinkLogicalCalc} to the {@link FlinkLogicalTableSourceScan}.
+ * The rule will first look for the computed column in the {@link 
FlinkLogicalCalc} and then translate the watermark expression
+ * and the computed column into a {@link WatermarkStrategy}. With the new scan 
the rule will build a new {@link FlinkLogicalCalc}.
+ */
+public class PushWatermarkIntoFlinkTableSourceScanAcrossCalcRule extends 
PushWatermarkIntoFlinkLogicalTableSourceScanBaseRule {
+       public static final PushWatermarkIntoFlinkTableSourceScanAcrossCalcRule 
INSTANCE = new PushWatermarkIntoFlinkTableSourceScanAcrossCalcRule();
+
+       public PushWatermarkIntoFlinkTableSourceScanAcrossCalcRule() {
+               super(operand(FlinkLogicalWatermarkAssigner.class,
+                               operand(FlinkLogicalCalc.class,
+                                               
operand(FlinkLogicalTableSourceScan.class, none()))),
+                               
"PushWatermarkIntoFlinkTableSourceScanAcrossProjectRule");
+       }
+
+       @Override
+       public boolean matches(RelOptRuleCall call) {
+               FlinkLogicalTableSourceScan scan = call.rel(2);
+               TableSourceTable tableSourceTable = 
scan.getTable().unwrap(TableSourceTable.class);
+               return tableSourceTable != null && 
tableSourceTable.tableSource() instanceof SupportsWatermarkPushDown;
+       }
+
+       @Override
+       public void onMatch(RelOptRuleCall call) {
+               FlinkLogicalWatermarkAssigner watermarkAssigner = call.rel(0);
+               FlinkLogicalCalc calc = call.rel(1);
+
+               RexProgram originProgram = calc.getProgram();
+               List<RexLocalRef> projectList = originProgram.getProjectList();
+
+               //get watermark expression
+               RexNode computedColumn = 
originProgram.expandLocalRef(projectList.get(watermarkAssigner.rowtimeFieldIndex()));
+               RexNode newWatermarkExpr = 
watermarkAssigner.watermarkExpr().accept(new RexShuttle() {
+                       @Override
+                       public RexNode visitInputRef(RexInputRef inputRef) {
+                               // replace the input ref with the computed 
column
+                               return computedColumn;
+                       }
+               });
+
+               // push watermark assigner into the scan
+               FlinkLogicalTableSourceScan newScan =
+                               getNewScan(watermarkAssigner, newWatermarkExpr, 
call.rel(2), (FlinkContext) call.getPlanner().getContext());
+
+               FlinkTypeFactory typeFactory = (FlinkTypeFactory) 
watermarkAssigner.getCluster().getTypeFactory();
+               RexBuilder builder = call.builder().getRexBuilder();
+               // cast timestamp type to rowtime type.
+               RexNode newRexNode = builder.makeReinterpretCast(

Review comment:
       newComputedColumn

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanBaseRule.java
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import 
org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.codegen.WatermarkGeneratorCodeGenerator;
+import 
org.apache.flink.table.planner.plan.nodes.calcite.LogicalWatermarkAssigner;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.runtime.generated.GeneratedWatermarkGenerator;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rex.RexNode;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import scala.Option;
+
+/**
+ * Base Planner rule for {@link PushWatermarkIntoTableSourceScanRule} and 
{@link PushWatermarkIntoTableSourceScanAcrossProjectRule}.
+ */
+public abstract class PushWatermarkIntoTableSourceScanBaseRule extends 
RelOptRule {
+       public PushWatermarkIntoTableSourceScanBaseRule(RelOptRuleOperand 
operand, String description) {
+               super(operand, description);
+       }
+
+       protected LogicalTableScan getNewScan(LogicalWatermarkAssigner 
watermarkAssigner, RexNode watermarkExpr, LogicalTableScan scan, FlinkContext 
context) {

Review comment:
       if only `TableConfig` is used in this method, I suggest to pass 
`TableConfig` instead `FlinkContext `.

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/sources/DynamicSourceUtils.java
##########
@@ -331,19 +329,6 @@ private static void validateWatermarks(
                }
        }
 
-       private static void validateAbilities(DynamicTableSource source) {
-               UNSUPPORTED_ABILITIES.forEach(ability -> {

Review comment:
       UNSUPPORTED_ABILITIES should also be removed

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoFlinkLogicalTableSourceScanBaseRule.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import 
org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.codegen.WatermarkGeneratorCodeGenerator;
+import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableSourceScan;
+import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalWatermarkAssigner;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.runtime.generated.GeneratedWatermarkGenerator;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.rex.RexNode;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import scala.Option;
+
+/**
+ * Base rule for interface {@link SupportsWatermarkPushDown}. It offers a util 
to push the {@link FlinkLogicalWatermarkAssigner}
+ * into the {@link FlinkLogicalTableSourceScan}.
+ */
+public abstract class PushWatermarkIntoFlinkLogicalTableSourceScanBaseRule 
extends RelOptRule {
+
+       public 
PushWatermarkIntoFlinkLogicalTableSourceScanBaseRule(RelOptRuleOperand operand,
+                       String description) {
+               super(operand, description);
+       }
+
+       /**
+        * It uses the input watermark expression to generate the {@link 
WatermarkGeneratorSupplier}. After the {@link WatermarkStrategy}
+        * is pushed into the scan, it will build a new scan. However, when 
{@link FlinkLogicalWatermarkAssigner} is the parent of the
+        * {@link FlinkLogicalTableSourceScan} it should modify the rowtime 
type to keep the type of plan is consistent. In other cases,
+        * it just keep the data type of the scan as same as before and leave 
the work when rewriting the projection.
+        */
+       protected FlinkLogicalTableSourceScan getNewScan(
+                       FlinkLogicalWatermarkAssigner watermarkAssigner, 
RexNode watermarkExpr, FlinkLogicalTableSourceScan scan, FlinkContext context) {
+
+               GeneratedWatermarkGenerator generatedWatermarkGenerator =
+                               
WatermarkGeneratorCodeGenerator.generateWatermarkGenerator(
+                                               context.getTableConfig(),
+                                               
FlinkTypeFactory.toLogicalRowType(scan.getRowType()),
+                                               watermarkExpr,
+                                               Option.apply("context"));
+               Configuration configuration = 
context.getTableConfig().getConfiguration();
+
+               WatermarkGeneratorSupplier<RowData> supplier = new 
DefaultWatermarkGeneratorSupplier(configuration, generatedWatermarkGenerator);
+               String digest = String.format("watermark=[%s]", watermarkExpr);
+
+               WatermarkStrategy<RowData> watermarkStrategy = 
WatermarkStrategy.forGenerator(supplier);
+               Duration idleTimeout = 
configuration.get(ExecutionConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT);
+               if (!idleTimeout.isZero() && !idleTimeout.isNegative()) {
+                       watermarkStrategy.withIdleness(idleTimeout);
+                       digest = String.format("%s idletimeout=[%s]", digest, 
idleTimeout.toMillis());
+               }
+
+               TableSourceTable tableSourceTable = 
scan.getTable().unwrap(TableSourceTable.class);
+               DynamicTableSource newDynamicTableSource = 
tableSourceTable.tableSource().copy();
+
+               ((SupportsWatermarkPushDown) 
newDynamicTableSource).applyWatermark(watermarkStrategy);
+               // scan row type: set rowtime type
+               TableSourceTable newTableSourceTable;
+               if 
(scan.getRowType().equals(watermarkAssigner.getInput().getRowType())) {

Review comment:
       add a method parameter determine the condition

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoFlinkTableSourceScanAcrossCalcRule.java
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import 
org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc;
+import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableSourceScan;
+import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalWatermarkAssigner;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLocalRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexProgram;
+import org.apache.calcite.rex.RexProgramBuilder;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.util.Pair;
+
+import java.util.List;
+
+/**
+ * Rule to push the {@link FlinkLogicalWatermarkAssigner} across the {@link 
FlinkLogicalCalc} to the {@link FlinkLogicalTableSourceScan}.
+ * The rule will first look for the computed column in the {@link 
FlinkLogicalCalc} and then translate the watermark expression
+ * and the computed column into a {@link WatermarkStrategy}. With the new scan 
the rule will build a new {@link FlinkLogicalCalc}.
+ */
+public class PushWatermarkIntoFlinkTableSourceScanAcrossCalcRule extends 
PushWatermarkIntoFlinkLogicalTableSourceScanBaseRule {
+       public static final PushWatermarkIntoFlinkTableSourceScanAcrossCalcRule 
INSTANCE = new PushWatermarkIntoFlinkTableSourceScanAcrossCalcRule();
+
+       public PushWatermarkIntoFlinkTableSourceScanAcrossCalcRule() {
+               super(operand(FlinkLogicalWatermarkAssigner.class,
+                               operand(FlinkLogicalCalc.class,
+                                               
operand(FlinkLogicalTableSourceScan.class, none()))),
+                               
"PushWatermarkIntoFlinkTableSourceScanAcrossProjectRule");
+       }
+
+       @Override
+       public boolean matches(RelOptRuleCall call) {
+               FlinkLogicalTableSourceScan scan = call.rel(2);
+               TableSourceTable tableSourceTable = 
scan.getTable().unwrap(TableSourceTable.class);
+               return tableSourceTable != null && 
tableSourceTable.tableSource() instanceof SupportsWatermarkPushDown;
+       }
+
+       @Override
+       public void onMatch(RelOptRuleCall call) {
+               FlinkLogicalWatermarkAssigner watermarkAssigner = call.rel(0);
+               FlinkLogicalCalc calc = call.rel(1);
+
+               RexProgram originProgram = calc.getProgram();
+               List<RexLocalRef> projectList = originProgram.getProjectList();

Review comment:
       we can expand the local ref first

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoFlinkLogicalTableSourceScanBaseRule.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import 
org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.codegen.WatermarkGeneratorCodeGenerator;
+import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableSourceScan;
+import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalWatermarkAssigner;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.runtime.generated.GeneratedWatermarkGenerator;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.rex.RexNode;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import scala.Option;
+
+/**
+ * Base rule for interface {@link SupportsWatermarkPushDown}. It offers a util 
to push the {@link FlinkLogicalWatermarkAssigner}
+ * into the {@link FlinkLogicalTableSourceScan}.
+ */
+public abstract class PushWatermarkIntoFlinkLogicalTableSourceScanBaseRule 
extends RelOptRule {
+
+       public 
PushWatermarkIntoFlinkLogicalTableSourceScanBaseRule(RelOptRuleOperand operand,
+                       String description) {
+               super(operand, description);
+       }
+
+       /**
+        * It uses the input watermark expression to generate the {@link 
WatermarkGeneratorSupplier}. After the {@link WatermarkStrategy}
+        * is pushed into the scan, it will build a new scan. However, when 
{@link FlinkLogicalWatermarkAssigner} is the parent of the
+        * {@link FlinkLogicalTableSourceScan} it should modify the rowtime 
type to keep the type of plan is consistent. In other cases,
+        * it just keep the data type of the scan as same as before and leave 
the work when rewriting the projection.
+        */
+       protected FlinkLogicalTableSourceScan getNewScan(

Review comment:
       wrap to multiple lines?

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoFlinkLogicalTableSourceScanBaseRule.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import 
org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.codegen.WatermarkGeneratorCodeGenerator;
+import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableSourceScan;
+import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalWatermarkAssigner;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.runtime.generated.GeneratedWatermarkGenerator;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.rex.RexNode;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import scala.Option;
+
+/**
+ * Base rule for interface {@link SupportsWatermarkPushDown}. It offers a util 
to push the {@link FlinkLogicalWatermarkAssigner}
+ * into the {@link FlinkLogicalTableSourceScan}.
+ */
+public abstract class PushWatermarkIntoFlinkLogicalTableSourceScanBaseRule 
extends RelOptRule {
+
+       public 
PushWatermarkIntoFlinkLogicalTableSourceScanBaseRule(RelOptRuleOperand operand,
+                       String description) {
+               super(operand, description);
+       }
+
+       /**
+        * It uses the input watermark expression to generate the {@link 
WatermarkGeneratorSupplier}. After the {@link WatermarkStrategy}
+        * is pushed into the scan, it will build a new scan. However, when 
{@link FlinkLogicalWatermarkAssigner} is the parent of the
+        * {@link FlinkLogicalTableSourceScan} it should modify the rowtime 
type to keep the type of plan is consistent. In other cases,
+        * it just keep the data type of the scan as same as before and leave 
the work when rewriting the projection.
+        */
+       protected FlinkLogicalTableSourceScan getNewScan(
+                       FlinkLogicalWatermarkAssigner watermarkAssigner, 
RexNode watermarkExpr, FlinkLogicalTableSourceScan scan, FlinkContext context) {
+
+               GeneratedWatermarkGenerator generatedWatermarkGenerator =
+                               
WatermarkGeneratorCodeGenerator.generateWatermarkGenerator(
+                                               context.getTableConfig(),
+                                               
FlinkTypeFactory.toLogicalRowType(scan.getRowType()),
+                                               watermarkExpr,
+                                               Option.apply("context"));
+               Configuration configuration = 
context.getTableConfig().getConfiguration();
+
+               WatermarkGeneratorSupplier<RowData> supplier = new 
DefaultWatermarkGeneratorSupplier(configuration, generatedWatermarkGenerator);
+               String digest = String.format("watermark=[%s]", watermarkExpr);
+
+               WatermarkStrategy<RowData> watermarkStrategy = 
WatermarkStrategy.forGenerator(supplier);
+               Duration idleTimeout = 
configuration.get(ExecutionConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT);
+               if (!idleTimeout.isZero() && !idleTimeout.isNegative()) {
+                       watermarkStrategy.withIdleness(idleTimeout);
+                       digest = String.format("%s idletimeout=[%s]", digest, 
idleTimeout.toMillis());
+               }
+
+               TableSourceTable tableSourceTable = 
scan.getTable().unwrap(TableSourceTable.class);
+               DynamicTableSource newDynamicTableSource = 
tableSourceTable.tableSource().copy();
+
+               ((SupportsWatermarkPushDown) 
newDynamicTableSource).applyWatermark(watermarkStrategy);
+               // scan row type: set rowtime type
+               TableSourceTable newTableSourceTable;
+               if 
(scan.getRowType().equals(watermarkAssigner.getInput().getRowType())) {
+                       // without projection or project doesn't project or 
project doesn't add new computed columns
+                       newTableSourceTable = tableSourceTable.copy(
+                                       newDynamicTableSource,
+                                       watermarkAssigner.getRowType(),
+                                       new String[]{digest});
+               } else {
+                       // project exists. make the project's rowtype is 
consistent with the origin plan.
+                       newTableSourceTable = tableSourceTable.copy(
+                                       newDynamicTableSource,
+                                       scan.getRowType(),
+                                       new String[]{digest});
+               }
+
+               FlinkLogicalTableSourceScan newScan = 
FlinkLogicalTableSourceScan.create(scan.getCluster(),
+                               newTableSourceTable);
+               return newScan;

Review comment:
       return the result directly

##########
File path: 
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoFlinkTableSourceScanRuleTest.java
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.planner.calcite.CalciteConfig;
+import 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgramBuilder;
+import org.apache.flink.table.planner.plan.optimize.program.FlinkStreamProgram;
+import 
org.apache.flink.table.planner.plan.optimize.program.HEP_RULES_EXECUTION_TYPE;
+import 
org.apache.flink.table.planner.plan.optimize.program.StreamOptimizeContext;
+import org.apache.flink.table.planner.utils.StreamTableTestUtil;
+import org.apache.flink.table.planner.utils.TableConfigUtils;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.apache.calcite.plan.hep.HepMatchOrder;
+import org.apache.calcite.tools.RuleSets;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+
+/**
+ * Test rule {@link PushWatermarkIntoFlinkTableSourceScanAcrossCalcRule} and 
{@link PushWatermarkIntoFlinkTableSourceScanRule}.
+ * */
+public class PushWatermarkIntoFlinkTableSourceScanRuleTest extends 
TableTestBase {
+       private StreamTableTestUtil util = streamTestUtil(new TableConfig());
+
+       @Before
+       public void setup() {
+               util.buildStreamProgram(FlinkStreamProgram.LOGICAL_REWRITE());
+               CalciteConfig calciteConfig = 
TableConfigUtils.getCalciteConfig(util.tableEnv().getConfig());
+               calciteConfig.getStreamProgram().get().addLast(
+                               "PushWatermarkIntoTableSourceScanRule",
+                               
FlinkHepRuleSetProgramBuilder.<StreamOptimizeContext>newBuilder()
+                                               
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE())
+                                               
.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+                                               .add(RuleSets.ofList(
+                                                               
PushWatermarkIntoFlinkTableSourceScanRule.INSTANCE,
+                                                               
PushWatermarkIntoFlinkTableSourceScanAcrossCalcRule.INSTANCE))
+                                               .build()
+               );
+       }
+
+       @Test
+       public void testSimpleWatermark() {
+               String ddl =
+                               "create table MyTable(" +
+                                               "  a int,\n" +
+                                               "  b bigint,\n" +
+                                               "  c timestamp(3),\n" +
+                                               "  watermark for c as c - 
interval '5' second\n" +
+                                               ") WITH (\n" +
+                                               " 'connector' = 'values',\n" +
+                                               " 'enable-watermark-push-down' 
= 'true',\n" +
+                                               " 'bounded' = 'false',\n" +
+                                               " 'disable-lookup' = 'true'" +
+                                               ")";
+               util.tableEnv().executeSql(ddl);
+               util.verifyPlan("select a, c from MyTable");
+       }
+
+       @Test
+       public void testWatermarkOnComputedColumn() {
+               String ddl =
+                               "create table MyTable(" +
+                                               "  a int,\n" +
+                                               "  b bigint,\n" +
+                                               "  c timestamp(3),\n" +
+                                               "  d as c + interval '5' 
second,\n" +
+                                               "  watermark for d as d - 
interval '5' second\n" +
+                                               ") WITH (\n" +
+                                               " 'connector' = 'values',\n" +
+                                               " 'enable-watermark-push-down' 
= 'true',\n" +
+                                               " 'bounded' = 'false',\n" +
+                                               " 'disable-lookup' = 'true'" +
+                                               ")";
+               util.tableEnv().executeSql(ddl);
+               util.verifyPlan("select * from MyTable");
+       }
+
+       @Test
+       public void testWatermarkOnComputedColumnWithQuery() {
+               String ddl =
+                               "create table MyTable(" +
+                                               "  a int,\n" +
+                                               "  b bigint,\n" +
+                                               "  c timestamp(3) not null,\n" +
+                                               "  d as c + interval '5' 
second,\n" +
+                                               "  watermark for d as d - 
interval '5' second\n" +

Review comment:
       add a test about `watermark for d as d - interval '5' second + other 
timestamp field`

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanBaseRule.java
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import 
org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.codegen.WatermarkGeneratorCodeGenerator;
+import 
org.apache.flink.table.planner.plan.nodes.calcite.LogicalWatermarkAssigner;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.runtime.generated.GeneratedWatermarkGenerator;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rex.RexNode;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import scala.Option;
+
+/**
+ * Base Planner rule for {@link PushWatermarkIntoTableSourceScanRule} and 
{@link PushWatermarkIntoTableSourceScanAcrossProjectRule}.
+ */
+public abstract class PushWatermarkIntoTableSourceScanBaseRule extends 
RelOptRule {
+       public PushWatermarkIntoTableSourceScanBaseRule(RelOptRuleOperand 
operand, String description) {
+               super(operand, description);
+       }
+
+       protected LogicalTableScan getNewScan(LogicalWatermarkAssigner 
watermarkAssigner, RexNode watermarkExpr, LogicalTableScan scan, FlinkContext 
context) {
+               // generate an inner watermark generator class that allows us 
to pass FunctionContext and ClassLoader
+               GeneratedWatermarkGenerator generatedWatermarkGenerator =
+                               
WatermarkGeneratorCodeGenerator.generateWatermarkGenerator(
+                                               context.getTableConfig(),
+                                               
FlinkTypeFactory.toLogicalRowType(scan.getRowType()),
+                                               watermarkExpr,
+                                               Option.apply("context"));
+               Configuration configuration = 
context.getTableConfig().getConfiguration();
+
+               WatermarkGeneratorSupplier<RowData> supplier = new 
DefaultWatermarkGeneratorSupplier(configuration, generatedWatermarkGenerator);
+               String digest = String.format("watermark=[%s]", watermarkExpr);
+
+               WatermarkStrategy<RowData> watermarkStrategy = 
WatermarkStrategy.forGenerator(supplier);
+               Duration idleTimeout = 
configuration.get(ExecutionConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT);
+               if (!idleTimeout.isZero() && !idleTimeout.isNegative()) {
+                       watermarkStrategy.withIdleness(idleTimeout);
+                       digest = String.format("%s idletimeout=[%s]", digest, 
idleTimeout.toMillis());

Review comment:
       `%s, idletimeout=[%s]`

##########
File path: 
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoFlinkTableSourceScanRuleTest.java
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.planner.calcite.CalciteConfig;
+import 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgramBuilder;
+import org.apache.flink.table.planner.plan.optimize.program.FlinkStreamProgram;
+import 
org.apache.flink.table.planner.plan.optimize.program.HEP_RULES_EXECUTION_TYPE;
+import 
org.apache.flink.table.planner.plan.optimize.program.StreamOptimizeContext;
+import org.apache.flink.table.planner.utils.StreamTableTestUtil;
+import org.apache.flink.table.planner.utils.TableConfigUtils;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.apache.calcite.plan.hep.HepMatchOrder;
+import org.apache.calcite.tools.RuleSets;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+
+/**
+ * Test rule {@link PushWatermarkIntoFlinkTableSourceScanAcrossCalcRule} and 
{@link PushWatermarkIntoFlinkTableSourceScanRule}.
+ * */
+public class PushWatermarkIntoFlinkTableSourceScanRuleTest extends 
TableTestBase {
+       private StreamTableTestUtil util = streamTestUtil(new TableConfig());
+
+       @Before
+       public void setup() {
+               util.buildStreamProgram(FlinkStreamProgram.LOGICAL_REWRITE());

Review comment:
       ditto

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkProjectWatermarkAssignerTransposeRule.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import 
org.apache.flink.table.planner.plan.nodes.calcite.LogicalWatermarkAssigner;
+import org.apache.flink.table.planner.plan.utils.NestedColumn;
+import org.apache.flink.table.planner.plan.utils.NestedProjectionUtil;
+import org.apache.flink.table.planner.plan.utils.NestedSchema;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.rules.ProjectRemoveRule;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Transpose between the {@link LogicalWatermarkAssigner} and {@link 
LogicalProject}. If the top level {@link LogicalProject}
+ * doesn't need rowtime column that is used by {@link 
LogicalWatermarkAssigner}, the rule will still keep the top level
+ * {@link LogicalProject} as a pruner.
+ */
+public class FlinkProjectWatermarkAssignerTransposeRule extends RelOptRule {
+
+       public static final FlinkProjectWatermarkAssignerTransposeRule INSTANCE 
= new FlinkProjectWatermarkAssignerTransposeRule();
+
+       public FlinkProjectWatermarkAssignerTransposeRule() {
+               super(operand(LogicalProject.class,
+                               operand(LogicalWatermarkAssigner.class, any())),
+                               "FlinkProjectWatermarkAssignerTransposeRule");
+       }
+
+       /**
+        * If the rule has been applied, the projection and filter will be 
moved to the low level {@link LogicalProject} and
+        * the top level {@link LogicalProject} will only works as a pruner if 
exists. Therefore, only when top level
+        * {@link LogicalProject} has calculation or needs to prune columns 
except for rowtime, the process will apply the rule.
+        * In some situations, the query keeps the rowtime info and the top 
level {@link LogicalProject} has the same field
+        * count as the input of the {@link LogicalWatermarkAssigner}.
+        * */
+       @Override
+       public boolean matches(RelOptRuleCall call) {
+               LogicalProject project = call.rel(0);
+               LogicalWatermarkAssigner watermarkAssigner = call.rel(1);
+               // check RexNode type
+               boolean allRef = project.getProjects().stream().allMatch(node 
-> (node instanceof RexInputRef));
+               if (!allRef) {
+                       return true;
+               }
+
+               int rowTimeIndex = watermarkAssigner.rowtimeFieldIndex();
+               int rowTimeIndexInProject = 
indexOfRowtime(project.getProjects(), rowTimeIndex);
+               if (rowTimeIndexInProject != -1) {
+                       return project.getRowType().getFieldCount() != 
watermarkAssigner.getRowType().getFieldCount();
+               } else {
+                       return (watermarkAssigner.getRowType().getFieldCount() 
- project.getRowType().getFieldCount()) != 1;
+               }
+
+       }
+
+       @Override
+       public void onMatch(RelOptRuleCall call) {
+               LogicalProject project = call.rel(0);
+               LogicalWatermarkAssigner watermarkAssigner = call.rel(1);
+
+               // whether rowtime field is in the top level projection
+               RelNode originInput = watermarkAssigner.getInput();
+               int rowTimeIndex = watermarkAssigner.rowtimeFieldIndex();
+               RelDataType originRowTimeType = 
originInput.getRowType().getFieldList().get(rowTimeIndex).getValue();
+               int rowTimeIndexInTopLevelProject = 
indexOfRowtime(project.getProjects(), rowTimeIndex);
+
+               // get projects and data type of the transposed LogicalProject
+               FlinkTypeFactory typeFactory = FlinkTypeFactory.INSTANCE();
+               List<RexNode> projectsWithRowtime = 
rewriteRowtimeType(project.getProjects(), rowTimeIndex, originRowTimeType);
+               List<String> transposedProjectFieldNames =
+                               project.getNamedProjects().stream().map(pair -> 
pair.right).collect(Collectors.toList());
+               String rowTimeName = 
originInput.getRowType().getFieldNames().get(rowTimeIndex);
+               int rowTimeIndexInTranposedProject;
+               if (rowTimeIndexInTopLevelProject == -1) {
+                       projectsWithRowtime.add(new RexInputRef(rowTimeIndex, 
originRowTimeType));
+
+                       rowTimeIndexInTranposedProject = 
transposedProjectFieldNames.size();
+                       transposedProjectFieldNames.add(rowTimeName);
+               } else {
+                       rowTimeIndexInTranposedProject = 
rowTimeIndexInTopLevelProject;
+               }
+               RelDataType transposedProjectType = 
typeFactory.createStructType(
+                               
projectsWithRowtime.stream().map(RexNode::getType).collect(Collectors.toList()),
+                               transposedProjectFieldNames);
+
+               // build the transposed LogicalProjection
+               LogicalProject transposedProject = 
project.copy(project.getTraitSet(), originInput, projectsWithRowtime, 
transposedProjectType);
+
+               // prepare for rewrite
+               NestedSchema nestedSchema = 
NestedProjectionUtil.build(projectsWithRowtime, originInput.getRowType());
+               if (rowTimeIndexInTopLevelProject == -1) {
+                       NestedColumn rowtimeColumn = 
NestedProjectionUtil.createNestedColumnLeaf(rowTimeName, rowTimeIndex, 
originRowTimeType);
+                       nestedSchema.columns().put(rowTimeName, rowtimeColumn);
+               }
+               // label by hand
+               
nestedSchema.columns().get(rowTimeName).setIndex(rowTimeIndexInTranposedProject);
+
+               // build the LogicalWatermarkAssigner
+               RexBuilder builder = call.builder().getRexBuilder();
+               RexNode newWatermarkExpr =
+                               NestedProjectionUtil.rewrite(
+                                               
Collections.singletonList(watermarkAssigner.watermarkExpr()),
+                                               nestedSchema,
+                                               builder).get(0);
+               LogicalWatermarkAssigner newWatermarkAssigner = 
LogicalWatermarkAssigner.create(
+                               watermarkAssigner.getCluster(),
+                               transposedProject,
+                               rowTimeIndexInTranposedProject,
+                               newWatermarkExpr);
+
+               // build the origin top level LogicalProjection
+               List<RexNode> newProjects = new 
ArrayList<>(project.getProjects().size());
+               for (int i = 0; i < project.getProjects().size(); i++) {
+                       newProjects.add(new RexInputRef(i, 
project.getProjects().get(i).getType()));
+               }
+               LogicalProject newProject = project.copy(project.getTraitSet(), 
newWatermarkAssigner, newProjects, project.getRowType());
+
+               if (ProjectRemoveRule.isTrivial(newProject)) {
+                       // drop project if the transformed program merely 
returns its input
+                       call.transformTo(newWatermarkAssigner);
+               } else {
+                       call.transformTo(newProject);
+               }
+       }
+
+       private static int indexOfRowtime(List<RexNode> projects, int 
rowTimeIndex) {
+               for (int i = 0; i < projects.size(); i++) {
+                       RexNode project = projects.get(i);
+                       if (project instanceof RexInputRef) {
+                               if (((RexInputRef) project).getIndex() == 
rowTimeIndex) {
+                                       return i;
+                               }
+                       }
+               }
+               return -1;
+       }
+
+       private static List<RexNode> rewriteRowtimeType(List<RexNode> projects, 
int rowTimeIndex, RelDataType originType) {
+               List<RexNode> projectsWithRowtime = projects.stream().map(node 
-> node.accept(new RexShuttle(){

Review comment:
       return the result directly

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoFlinkLogicalTableSourceScanBaseRule.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import 
org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.codegen.WatermarkGeneratorCodeGenerator;
+import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableSourceScan;
+import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalWatermarkAssigner;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.runtime.generated.GeneratedWatermarkGenerator;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.rex.RexNode;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import scala.Option;
+
+/**
+ * Base rule for interface {@link SupportsWatermarkPushDown}. It offers a util 
to push the {@link FlinkLogicalWatermarkAssigner}
+ * into the {@link FlinkLogicalTableSourceScan}.
+ */
+public abstract class PushWatermarkIntoFlinkLogicalTableSourceScanBaseRule 
extends RelOptRule {

Review comment:
       the class name can be simplified as 
`PushWatermarkIntoTableSourceScanRuleBase`

##########
File path: 
flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.xml
##########
@@ -0,0 +1,152 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+  <TestCase name="testSimpleWatermark">
+    <Resource name="sql">
+      <![CDATA[select a, c from MyTable]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], c=[$2])
++- LogicalWatermarkAssigner(rowtime=[c], watermark=[-($2, 5000:INTERVAL 
SECOND)])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+WatermarkAssigner(rowtime=[c], watermark=[-(c, 5000:INTERVAL SECOND)])

Review comment:
       why the watermark node can't be pushed down

##########
File path: 
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/rules/logical/FlinkProjectWatermarkAssignerTransposeRuleTest.java
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.planner.calcite.CalciteConfig;
+import 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgramBuilder;
+import org.apache.flink.table.planner.plan.optimize.program.FlinkStreamProgram;
+import 
org.apache.flink.table.planner.plan.optimize.program.HEP_RULES_EXECUTION_TYPE;
+import 
org.apache.flink.table.planner.plan.optimize.program.StreamOptimizeContext;
+import org.apache.flink.table.planner.utils.StreamTableTestUtil;
+import org.apache.flink.table.planner.utils.TableConfigUtils;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.apache.calcite.plan.hep.HepMatchOrder;
+import org.apache.calcite.tools.RuleSets;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test for {@link FlinkProjectWatermarkAssignerTransposeRule}.
+ */
+public class FlinkProjectWatermarkAssignerTransposeRuleTest extends 
TableTestBase {
+       private StreamTableTestUtil util = streamTestUtil(new TableConfig());
+
+       @Before
+       public void setup() {
+               util.buildStreamProgram(FlinkStreamProgram.DEFAULT_REWRITE());
+               CalciteConfig calciteConfig = 
TableConfigUtils.getCalciteConfig(util.tableEnv().getConfig());
+               calciteConfig.getStreamProgram().get().addLast(
+                               "ProjectWatermarkAssignerTranspose",
+                               
FlinkHepRuleSetProgramBuilder.<StreamOptimizeContext>newBuilder()
+                                               
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE())
+                                               
.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+                                               
.add(RuleSets.ofList(FlinkProjectWatermarkAssignerTransposeRule.INSTANCE))
+                                               .build()
+               );
+       }
+
+       @Test
+       public void simpleTranspose() {
+               String ddl =
+                               "CREATE TABLE Source(\n" +
+                                               "  a INT,\n" +
+                                               "  b BIGINT,\n" +
+                                               "  c TIMESTAMP(3),\n" +
+                                               "WATERMARK FOR c AS c" +
+                                               ") WITH (" +
+                                               "  'connector' = 'values',\n" +
+                                               "  'bounded' = 'false'\n" +
+                                               ")";
+
+               util.tableEnv().executeSql(ddl);
+               util.verifyPlan("SELECT a, c FROM Source");
+       }
+
+       @Test
+       public void cannotTranspose() {
+               String ddl =
+                               "CREATE TABLE Source(\n" +
+                                               "  a INT,\n" +
+                                               "  b BIGINT,\n" +
+                                               "  c TIMESTAMP(3),\n" +
+                                               "WATERMARK FOR c AS c" +
+                                               ") WITH (" +
+                                               "  'connector' = 'values',\n" +
+                                               "  'bounded' = 'false'\n" +
+                                               ")";
+
+               util.tableEnv().executeSql(ddl);
+               util.verifyPlan("SELECT b, a FROM Source");
+       }
+
+       @Test
+       public void transposeWithReorder() {

Review comment:
       should also consider ddl with computed column and watermark with 
expression

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoFlinkTableSourceScanAcrossCalcRule.java
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import 
org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc;
+import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableSourceScan;
+import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalWatermarkAssigner;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLocalRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexProgram;
+import org.apache.calcite.rex.RexProgramBuilder;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.util.Pair;
+
+import java.util.List;
+
+/**
+ * Rule to push the {@link FlinkLogicalWatermarkAssigner} across the {@link 
FlinkLogicalCalc} to the {@link FlinkLogicalTableSourceScan}.
+ * The rule will first look for the computed column in the {@link 
FlinkLogicalCalc} and then translate the watermark expression
+ * and the computed column into a {@link WatermarkStrategy}. With the new scan 
the rule will build a new {@link FlinkLogicalCalc}.
+ */
+public class PushWatermarkIntoFlinkTableSourceScanAcrossCalcRule extends 
PushWatermarkIntoFlinkLogicalTableSourceScanBaseRule {
+       public static final PushWatermarkIntoFlinkTableSourceScanAcrossCalcRule 
INSTANCE = new PushWatermarkIntoFlinkTableSourceScanAcrossCalcRule();
+
+       public PushWatermarkIntoFlinkTableSourceScanAcrossCalcRule() {
+               super(operand(FlinkLogicalWatermarkAssigner.class,
+                               operand(FlinkLogicalCalc.class,
+                                               
operand(FlinkLogicalTableSourceScan.class, none()))),
+                               
"PushWatermarkIntoFlinkTableSourceScanAcrossProjectRule");
+       }
+
+       @Override
+       public boolean matches(RelOptRuleCall call) {
+               FlinkLogicalTableSourceScan scan = call.rel(2);
+               TableSourceTable tableSourceTable = 
scan.getTable().unwrap(TableSourceTable.class);
+               return tableSourceTable != null && 
tableSourceTable.tableSource() instanceof SupportsWatermarkPushDown;

Review comment:
       we can extract a common method into 
`PushWatermarkIntoFlinkLogicalTableSourceScanBaseRule`

##########
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenTest.scala
##########
@@ -36,17 +38,22 @@ import org.apache.flink.table.utils.CatalogManagerMocks
 import org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema
 import org.apache.calcite.plan.ConventionTraitDef
 import org.apache.calcite.rel.`type`.RelDataType
-import org.junit.Assert.{assertEquals, assertTrue}
-import org.junit.Test
 
 import java.lang.{Integer => JInt, Long => JLong}
+import java.util
 import java.util.Collections
 import java.util.function.{Function => JFunction, Supplier => JSupplier}
 
+import org.junit.Assert.{assertEquals, assertTrue}
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.Test
+
 /**
   * Tests the generated [[WatermarkGenerator]] from 
[[WatermarkGeneratorCodeGenerator]].
   */
-class WatermarkGeneratorCodeGenTest {
+@RunWith(classOf[Parameterized])
+class WatermarkGeneratorCodeGenTest(val useDefinedConstructor: Boolean) {

Review comment:
       nit: remove `val`

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/MergeTableLikeUtil.java
##########
@@ -463,7 +466,19 @@ private void collectPhysicalFieldsTypes(List<SqlNode> 
derivedColumns) {
                                        boolean nullable = type.getNullable() 
== null ? true : type.getNullable();
                                        RelDataType relType = 
type.deriveType(sqlValidator, nullable);
                                        // add field name and field type to 
physical field list
-                                       physicalFieldNamesToTypes.put(name, 
relType);
+                                       nonComputedFieldNamesToTypes.put(name, 
relType);
+                               } else if (derivedColumn instanceof 
SqlMetadataColumn) {
+                                       SqlMetadataColumn metadataColumn = 
(SqlMetadataColumn) derivedColumn;
+                                       String name = 
metadataColumn.getName().getSimple();
+                                       if (columns.containsKey(name)) {
+                                               throw new 
ValidationException(String.format(
+                                                       "A column named '%s' 
already exists in the base table.",
+                                                       name));
+                                       }
+                                       RelDataType relType = 
metadataColumn.getType()
+                                                                               
        .deriveType(sqlValidator, metadataColumn.getType().getNullable());
+                                       // add field name and field type to 
physical field list
+                                       nonComputedFieldNamesToTypes.put(name, 
relType);

Review comment:
       Are there any tests that cover these changes?

##########
File path: 
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/rules/logical/FlinkProjectWatermarkAssignerTransposeRuleTest.java
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.planner.calcite.CalciteConfig;
+import 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgramBuilder;
+import org.apache.flink.table.planner.plan.optimize.program.FlinkStreamProgram;
+import 
org.apache.flink.table.planner.plan.optimize.program.HEP_RULES_EXECUTION_TYPE;
+import 
org.apache.flink.table.planner.plan.optimize.program.StreamOptimizeContext;
+import org.apache.flink.table.planner.utils.StreamTableTestUtil;
+import org.apache.flink.table.planner.utils.TableConfigUtils;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.apache.calcite.plan.hep.HepMatchOrder;
+import org.apache.calcite.tools.RuleSets;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test for {@link FlinkProjectWatermarkAssignerTransposeRule}.
+ */
+public class FlinkProjectWatermarkAssignerTransposeRuleTest extends 
TableTestBase {
+       private StreamTableTestUtil util = streamTestUtil(new TableConfig());
+
+       @Before
+       public void setup() {
+               util.buildStreamProgram(FlinkStreamProgram.DEFAULT_REWRITE());

Review comment:
       the test should only involves the required rules

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanBaseRule.java
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import 
org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.codegen.WatermarkGeneratorCodeGenerator;
+import 
org.apache.flink.table.planner.plan.nodes.calcite.LogicalWatermarkAssigner;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.runtime.generated.GeneratedWatermarkGenerator;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.rex.RexNode;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import scala.Option;
+
+/**
+ * Base Planner rule for {@link PushWatermarkIntoTableSourceScanRule} and 
{@link PushWatermarkIntoTableSourceScanAcrossProjectRule}.

Review comment:
       update the comments

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenerator.scala
##########
@@ -87,3 +119,47 @@ object WatermarkGeneratorCodeGenerator {
     new GeneratedWatermarkGenerator(funcName, funcCode, ctx.references.toArray)
   }
 }
+
+class WatermarkGeneratorCodeGeneratorContext(
+  tableConfig: TableConfig,
+  contextTerm: String = "parameters") extends 
CodeGeneratorContext(tableConfig) {
+
+  override def addReusableFunction(
+      function: UserDefinedFunction,
+      functionContextClass: Class[_ <: FunctionContext] = 
classOf[FunctionContext],
+      runtimeContextTerm: String = null): String = {
+    super.addReusableFunction(
+      function, 
classOf[WatermarkGeneratorCodeGeneratorFunctionContextWrapper], 
this.contextTerm)
+  }
+
+  override def addReusableConverter(
+      dataType: DataType,
+      classLoaderTerm: String = null): String = {
+    super.addReusableConverter(dataType, "this.getClass().getClassLoader()")
+  }
+}
+
+class WatermarkGeneratorCodeGeneratorFunctionContextWrapper(

Review comment:
       use a shorter name: `WatermarkGeneratorFunctionContext` ?

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanAcrossProjectRule.java
##########
@@ -36,12 +36,13 @@
 import java.util.List;
 
 /**
- * WatermarkAssignerProjectTransposeRule.
+ * Planner rule that push {@link LogicalWatermarkAssigner} into a {@link 
LogicalTableScan}
+ * which wraps a {@link SupportsWatermarkPushDown} dynamic table source across 
{@link LogicalProject}.
  * */
-public class WatermarkAssignerProjectTransposeRule extends RelOptRule {
-       public static final WatermarkAssignerProjectTransposeRule INSTANCE = 
new WatermarkAssignerProjectTransposeRule();
+public class PushWatermarkIntoTableSourceScanAcrossProjectRule extends 
PushWatermarkIntoTableSourceScanBaseRule {
+       public static final PushWatermarkIntoTableSourceScanAcrossProjectRule 
INSTANCE = new PushWatermarkIntoTableSourceScanAcrossProjectRule();
 
-       public WatermarkAssignerProjectTransposeRule() {
+       public PushWatermarkIntoTableSourceScanAcrossProjectRule() {

Review comment:
       please update the rule description

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkProjectWatermarkAssignerTransposeRule.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import 
org.apache.flink.table.planner.plan.nodes.calcite.LogicalWatermarkAssigner;
+import org.apache.flink.table.planner.plan.utils.NestedColumn;
+import org.apache.flink.table.planner.plan.utils.NestedProjectionUtil;
+import org.apache.flink.table.planner.plan.utils.NestedSchema;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.rules.ProjectRemoveRule;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Transpose between the {@link LogicalWatermarkAssigner} and {@link 
LogicalProject}. If the top level {@link LogicalProject}
+ * doesn't need rowtime column that is used by {@link 
LogicalWatermarkAssigner}, the rule will still keep the top level
+ * {@link LogicalProject} as a pruner.
+ */
+public class FlinkProjectWatermarkAssignerTransposeRule extends RelOptRule {
+
+       public static final FlinkProjectWatermarkAssignerTransposeRule INSTANCE 
= new FlinkProjectWatermarkAssignerTransposeRule();
+
+       public FlinkProjectWatermarkAssignerTransposeRule() {
+               super(operand(LogicalProject.class,
+                               operand(LogicalWatermarkAssigner.class, any())),
+                               "FlinkProjectWatermarkAssignerTransposeRule");
+       }
+
+       /**
+        * If the rule has been applied, the projection and filter will be 
moved to the low level {@link LogicalProject} and
+        * the top level {@link LogicalProject} will only works as a pruner if 
exists. Therefore, only when top level
+        * {@link LogicalProject} has calculation or needs to prune columns 
except for rowtime, the process will apply the rule.
+        * In some situations, the query keeps the rowtime info and the top 
level {@link LogicalProject} has the same field
+        * count as the input of the {@link LogicalWatermarkAssigner}.
+        * */
+       @Override
+       public boolean matches(RelOptRuleCall call) {
+               LogicalProject project = call.rel(0);
+               LogicalWatermarkAssigner watermarkAssigner = call.rel(1);
+               // check RexNode type
+               boolean allRef = project.getProjects().stream().allMatch(node 
-> (node instanceof RexInputRef));
+               if (!allRef) {
+                       return true;
+               }
+
+               int rowTimeIndex = watermarkAssigner.rowtimeFieldIndex();
+               int rowTimeIndexInProject = 
indexOfRowtime(project.getProjects(), rowTimeIndex);
+               if (rowTimeIndexInProject != -1) {
+                       return project.getRowType().getFieldCount() != 
watermarkAssigner.getRowType().getFieldCount();

Review comment:
       maybe the projection outputs the deduplicate fields, such as: a, a, a, 
b, b
   
   you should push down the used fields in the projection.

##########
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenTest.scala
##########
@@ -154,7 +200,9 @@ class WatermarkGeneratorCodeGenTest {
     assertTrue(JavaFunc5.closeCalled)
   }
 
-  private def generateWatermarkGenerator(expr: String): WatermarkGenerator = {
+

Review comment:
       nit: redundant line

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkProjectWatermarkAssignerTransposeRule.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import 
org.apache.flink.table.planner.plan.nodes.calcite.LogicalWatermarkAssigner;
+import org.apache.flink.table.planner.plan.utils.NestedColumn;
+import org.apache.flink.table.planner.plan.utils.NestedProjectionUtil;
+import org.apache.flink.table.planner.plan.utils.NestedSchema;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.rules.ProjectRemoveRule;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Transpose between the {@link LogicalWatermarkAssigner} and {@link 
LogicalProject}. If the top level {@link LogicalProject}
+ * doesn't need rowtime column that is used by {@link 
LogicalWatermarkAssigner}, the rule will still keep the top level
+ * {@link LogicalProject} as a pruner.
+ */
+public class FlinkProjectWatermarkAssignerTransposeRule extends RelOptRule {
+
+       public static final FlinkProjectWatermarkAssignerTransposeRule INSTANCE 
= new FlinkProjectWatermarkAssignerTransposeRule();
+
+       public FlinkProjectWatermarkAssignerTransposeRule() {
+               super(operand(LogicalProject.class,
+                               operand(LogicalWatermarkAssigner.class, any())),
+                               "FlinkProjectWatermarkAssignerTransposeRule");
+       }
+
+       /**
+        * If the rule has been applied, the projection and filter will be 
moved to the low level {@link LogicalProject} and
+        * the top level {@link LogicalProject} will only works as a pruner if 
exists. Therefore, only when top level
+        * {@link LogicalProject} has calculation or needs to prune columns 
except for rowtime, the process will apply the rule.
+        * In some situations, the query keeps the rowtime info and the top 
level {@link LogicalProject} has the same field
+        * count as the input of the {@link LogicalWatermarkAssigner}.
+        * */
+       @Override
+       public boolean matches(RelOptRuleCall call) {
+               LogicalProject project = call.rel(0);
+               LogicalWatermarkAssigner watermarkAssigner = call.rel(1);
+               // check RexNode type
+               boolean allRef = project.getProjects().stream().allMatch(node 
-> (node instanceof RexInputRef));
+               if (!allRef) {
+                       return true;
+               }
+
+               int rowTimeIndex = watermarkAssigner.rowtimeFieldIndex();
+               int rowTimeIndexInProject = 
indexOfRowtime(project.getProjects(), rowTimeIndex);
+               if (rowTimeIndexInProject != -1) {
+                       return project.getRowType().getFieldCount() != 
watermarkAssigner.getRowType().getFieldCount();
+               } else {
+                       return (watermarkAssigner.getRowType().getFieldCount() 
- project.getRowType().getFieldCount()) != 1;
+               }
+
+       }
+
+       @Override
+       public void onMatch(RelOptRuleCall call) {
+               LogicalProject project = call.rel(0);
+               LogicalWatermarkAssigner watermarkAssigner = call.rel(1);
+
+               // whether rowtime field is in the top level projection
+               RelNode originInput = watermarkAssigner.getInput();
+               int rowTimeIndex = watermarkAssigner.rowtimeFieldIndex();
+               RelDataType originRowTimeType = 
originInput.getRowType().getFieldList().get(rowTimeIndex).getValue();
+               int rowTimeIndexInTopLevelProject = 
indexOfRowtime(project.getProjects(), rowTimeIndex);
+
+               // get projects and data type of the transposed LogicalProject
+               FlinkTypeFactory typeFactory = FlinkTypeFactory.INSTANCE();
+               List<RexNode> projectsWithRowtime = 
rewriteRowtimeType(project.getProjects(), rowTimeIndex, originRowTimeType);
+               List<String> transposedProjectFieldNames =
+                               project.getNamedProjects().stream().map(pair -> 
pair.right).collect(Collectors.toList());
+               String rowTimeName = 
originInput.getRowType().getFieldNames().get(rowTimeIndex);
+               int rowTimeIndexInTranposedProject;
+               if (rowTimeIndexInTopLevelProject == -1) {
+                       projectsWithRowtime.add(new RexInputRef(rowTimeIndex, 
originRowTimeType));
+
+                       rowTimeIndexInTranposedProject = 
transposedProjectFieldNames.size();
+                       transposedProjectFieldNames.add(rowTimeName);
+               } else {
+                       rowTimeIndexInTranposedProject = 
rowTimeIndexInTopLevelProject;
+               }
+               RelDataType transposedProjectType = 
typeFactory.createStructType(
+                               
projectsWithRowtime.stream().map(RexNode::getType).collect(Collectors.toList()),
+                               transposedProjectFieldNames);
+
+               // build the transposed LogicalProjection
+               LogicalProject transposedProject = 
project.copy(project.getTraitSet(), originInput, projectsWithRowtime, 
transposedProjectType);
+
+               // prepare for rewrite
+               NestedSchema nestedSchema = 
NestedProjectionUtil.build(projectsWithRowtime, originInput.getRowType());
+               if (rowTimeIndexInTopLevelProject == -1) {
+                       NestedColumn rowtimeColumn = 
NestedProjectionUtil.createNestedColumnLeaf(rowTimeName, rowTimeIndex, 
originRowTimeType);
+                       nestedSchema.columns().put(rowTimeName, rowtimeColumn);
+               }
+               // label by hand
+               
nestedSchema.columns().get(rowTimeName).setIndex(rowTimeIndexInTranposedProject);
+
+               // build the LogicalWatermarkAssigner
+               RexBuilder builder = call.builder().getRexBuilder();
+               RexNode newWatermarkExpr =
+                               NestedProjectionUtil.rewrite(
+                                               
Collections.singletonList(watermarkAssigner.watermarkExpr()),
+                                               nestedSchema,
+                                               builder).get(0);
+               LogicalWatermarkAssigner newWatermarkAssigner = 
LogicalWatermarkAssigner.create(
+                               watermarkAssigner.getCluster(),
+                               transposedProject,
+                               rowTimeIndexInTranposedProject,
+                               newWatermarkExpr);
+
+               // build the origin top level LogicalProjection
+               List<RexNode> newProjects = new 
ArrayList<>(project.getProjects().size());
+               for (int i = 0; i < project.getProjects().size(); i++) {
+                       newProjects.add(new RexInputRef(i, 
project.getProjects().get(i).getType()));
+               }
+               LogicalProject newProject = project.copy(project.getTraitSet(), 
newWatermarkAssigner, newProjects, project.getRowType());
+
+               if (ProjectRemoveRule.isTrivial(newProject)) {
+                       // drop project if the transformed program merely 
returns its input
+                       call.transformTo(newWatermarkAssigner);
+               } else {
+                       call.transformTo(newProject);
+               }
+       }
+
+       private static int indexOfRowtime(List<RexNode> projects, int 
rowTimeIndex) {
+               for (int i = 0; i < projects.size(); i++) {
+                       RexNode project = projects.get(i);
+                       if (project instanceof RexInputRef) {
+                               if (((RexInputRef) project).getIndex() == 
rowTimeIndex) {
+                                       return i;
+                               }
+                       }
+               }
+               return -1;
+       }
+
+       private static List<RexNode> rewriteRowtimeType(List<RexNode> projects, 
int rowTimeIndex, RelDataType originType) {

Review comment:
       originType => rowtimeType

##########
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/codegen/WatermarkGeneratorCodeGenTest.scala
##########
@@ -136,9 +145,46 @@ class WatermarkGeneratorCodeGenTest {
         "myFunc"),
       new JavaFunc5
     )
-    val generator = generateWatermarkGenerator("myFunc(ts, `offset`)")
-    // mock open and close invoking
-    generator.setRuntimeContext(new MockStreamingRuntimeContext(false, 1, 1))
+    val generator = generateWatermarkGenerator("myFunc(ts, `offset`)",
+      useDefinedConstructor)
+    if (!useDefinedConstructor) {
+      // mock open and close invoking
+      generator.setRuntimeContext(new MockStreamingRuntimeContext(false, 1, 1))
+    }
+    generator.open(new Configuration())
+    val results = data.map(d => generator.currentWatermark(d))
+    generator.close()
+    val expected = List(
+      JLong.valueOf(995L),
+      null,
+      null,
+      JLong.valueOf(4997L),
+      JLong.valueOf(3990L),
+      JLong.valueOf(5992L))
+    assertEquals(expected, results)
+    assertTrue(JavaFunc5.openCalled)
+    assertTrue(JavaFunc5.closeCalled)
+  }
+
+  @Test
+  def testCustomizedWatermark(): Unit = {

Review comment:
       extract the common code with `testLegacyCustomizedWatermark` and 
`testCustomizedWatermark ` to another method ?

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkProjectWatermarkAssignerTransposeRule.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import 
org.apache.flink.table.planner.plan.nodes.calcite.LogicalWatermarkAssigner;
+import org.apache.flink.table.planner.plan.utils.NestedColumn;
+import org.apache.flink.table.planner.plan.utils.NestedProjectionUtil;
+import org.apache.flink.table.planner.plan.utils.NestedSchema;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.rules.ProjectRemoveRule;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Transpose between the {@link LogicalWatermarkAssigner} and {@link 
LogicalProject}. If the top level {@link LogicalProject}
+ * doesn't need rowtime column that is used by {@link 
LogicalWatermarkAssigner}, the rule will still keep the top level
+ * {@link LogicalProject} as a pruner.
+ */
+public class FlinkProjectWatermarkAssignerTransposeRule extends RelOptRule {

Review comment:
       remove the prefix `Flink`, it can be simplified as 
`ProjectWatermarkAssignerTransposeRule`

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/FlinkProjectWatermarkAssignerTransposeRule.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import 
org.apache.flink.table.planner.plan.nodes.calcite.LogicalWatermarkAssigner;
+import org.apache.flink.table.planner.plan.utils.NestedColumn;
+import org.apache.flink.table.planner.plan.utils.NestedProjectionUtil;
+import org.apache.flink.table.planner.plan.utils.NestedSchema;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.rules.ProjectRemoveRule;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Transpose between the {@link LogicalWatermarkAssigner} and {@link 
LogicalProject}. If the top level {@link LogicalProject}
+ * doesn't need rowtime column that is used by {@link 
LogicalWatermarkAssigner}, the rule will still keep the top level
+ * {@link LogicalProject} as a pruner.
+ */
+public class FlinkProjectWatermarkAssignerTransposeRule extends RelOptRule {
+
+       public static final FlinkProjectWatermarkAssignerTransposeRule INSTANCE 
= new FlinkProjectWatermarkAssignerTransposeRule();
+
+       public FlinkProjectWatermarkAssignerTransposeRule() {
+               super(operand(LogicalProject.class,
+                               operand(LogicalWatermarkAssigner.class, any())),
+                               "FlinkProjectWatermarkAssignerTransposeRule");
+       }
+
+       /**
+        * If the rule has been applied, the projection and filter will be 
moved to the low level {@link LogicalProject} and
+        * the top level {@link LogicalProject} will only works as a pruner if 
exists. Therefore, only when top level
+        * {@link LogicalProject} has calculation or needs to prune columns 
except for rowtime, the process will apply the rule.
+        * In some situations, the query keeps the rowtime info and the top 
level {@link LogicalProject} has the same field
+        * count as the input of the {@link LogicalWatermarkAssigner}.
+        * */
+       @Override
+       public boolean matches(RelOptRuleCall call) {
+               LogicalProject project = call.rel(0);
+               LogicalWatermarkAssigner watermarkAssigner = call.rel(1);
+               // check RexNode type
+               boolean allRef = project.getProjects().stream().allMatch(node 
-> (node instanceof RexInputRef));
+               if (!allRef) {
+                       return true;
+               }
+
+               int rowTimeIndex = watermarkAssigner.rowtimeFieldIndex();
+               int rowTimeIndexInProject = 
indexOfRowtime(project.getProjects(), rowTimeIndex);
+               if (rowTimeIndexInProject != -1) {
+                       return project.getRowType().getFieldCount() != 
watermarkAssigner.getRowType().getFieldCount();
+               } else {
+                       return (watermarkAssigner.getRowType().getFieldCount() 
- project.getRowType().getFieldCount()) != 1;
+               }
+
+       }
+
+       @Override
+       public void onMatch(RelOptRuleCall call) {
+               LogicalProject project = call.rel(0);
+               LogicalWatermarkAssigner watermarkAssigner = call.rel(1);
+
+               // whether rowtime field is in the top level projection
+               RelNode originInput = watermarkAssigner.getInput();
+               int rowTimeIndex = watermarkAssigner.rowtimeFieldIndex();
+               RelDataType originRowTimeType = 
originInput.getRowType().getFieldList().get(rowTimeIndex).getValue();
+               int rowTimeIndexInTopLevelProject = 
indexOfRowtime(project.getProjects(), rowTimeIndex);
+
+               // get projects and data type of the transposed LogicalProject
+               FlinkTypeFactory typeFactory = FlinkTypeFactory.INSTANCE();

Review comment:
       use `project.getCluster().getTypeFactory()`, and move it close to the 
place where it is used




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to