godfreyhe commented on a change in pull request #13449:
URL: https://github.com/apache/flink/pull/13449#discussion_r518559131



##########
File path: 
flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml
##########
@@ -572,7 +556,7 @@ GlobalGroupAggregate(groupBy=[b], select=[b, 
COUNT(distinct$0 count$0) AS EXPR$1
       +- Calc(select=[b, a, c])
          +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
             +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])
-               +- Calc(select=[a, b, c, PROCTIME() AS proctime, rowtime])
+               +- Calc(select=[b, a, c, rowtime])

Review comment:
       the projection is not been pushed down ?

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/ProjectWatermarkAssignerTransposeRule.java
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import 
org.apache.flink.table.planner.plan.nodes.calcite.LogicalWatermarkAssigner;
+import org.apache.flink.table.planner.plan.utils.NestedColumn;
+import org.apache.flink.table.planner.plan.utils.NestedProjectionUtil;
+import org.apache.flink.table.planner.plan.utils.NestedSchema;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.rules.ProjectRemoveRule;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.tools.RelBuilder;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Transpose between the {@link LogicalWatermarkAssigner} and {@link 
LogicalProject}. The transposed {@link LogicalProject}
+ * works like a pruner to prune the unused fields from source. The top level 
{@link LogicalProject} still has to do the
+ * calculation, filter and prune the rowtime column if the query doesn't need.
+ *
+ * <p>NOTES: Currently the rule doesn't support nested projection push down.
+ */
+public class ProjectWatermarkAssignerTransposeRule extends RelOptRule {
+
+       public static final ProjectWatermarkAssignerTransposeRule INSTANCE = 
new ProjectWatermarkAssignerTransposeRule();
+
+       public ProjectWatermarkAssignerTransposeRule() {
+               super(operand(LogicalProject.class,
+                               operand(LogicalWatermarkAssigner.class, any())),
+                               "FlinkProjectWatermarkAssignerTransposeRule");
+       }
+
+       @Override
+       public boolean matches(RelOptRuleCall call) {
+               LogicalProject project = call.rel(0);
+               LogicalWatermarkAssigner watermarkAssigner = call.rel(1);
+
+               int rowTimeIndex = watermarkAssigner.rowtimeFieldIndex();
+               String rowTimeName = 
watermarkAssigner.getRowType().getFieldNames().get(rowTimeIndex);
+               NestedSchema schema = 
NestedProjectionUtil.build(project.getProjects(), 
project.getInput().getRowType());
+
+               // The field count difference between the used column in the 
input and in top level projection is always non-negative.
+               // At the beginning, the optimization hasn't applied the rule, 
the input has more columns.
+               // After the rule is applied, it will always push the used 
column (including the rowtime) under the watermark assigner.
+               // Therefore, if the used columns in the input are as same as 
the input, it doesn't need apply the rule.
+
+               // For nested projection, it needs one more check: all top 
level column in nestedSchema are leaves.
+               if (schema.columns().containsKey(rowTimeName)) {
+                       return schema.columns().size() != 
watermarkAssigner.getInput().getRowType().getFieldCount();
+               } else {
+                       return 
watermarkAssigner.getInput().getRowType().getFieldCount() - 
schema.columns().size() != 1;
+               }
+
+       }
+
+       @Override
+       public void onMatch(RelOptRuleCall call) {
+               LogicalProject project = call.rel(0);
+               LogicalWatermarkAssigner watermarkAssigner = call.rel(1);
+
+               NestedSchema nestedSchema = 
NestedProjectionUtil.build(project.getProjects(), 
watermarkAssigner.getRowType());
+               RelBuilder builder = 
call.builder().push(watermarkAssigner.getInput());
+               List<RexInputRef> transposedProjects = new LinkedList<>();
+               List<String> usedNames = new LinkedList<>();
+
+               // TODO: support nested projection push down in transpose
+               // add the used column RexInputRef and names into list
+               for (NestedColumn column: nestedSchema.columns().values()) {
+                       // mark by hand
+                       column.setIndex(transposedProjects.size());

Review comment:
       It better rename `setIndex` to `setIndexOfLeafInNewSchema`

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanAcrossCalcRule.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc;
+import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableSourceScan;
+import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalWatermarkAssigner;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLocalRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexProgram;
+import org.apache.calcite.rex.RexProgramBuilder;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.util.Pair;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Rule to push the {@link FlinkLogicalWatermarkAssigner} across the {@link 
FlinkLogicalCalc} to the {@link FlinkLogicalTableSourceScan}.
+ * The rule will first look for the computed column in the {@link 
FlinkLogicalCalc} and then translate the watermark expression
+ * and the computed column into a {@link WatermarkStrategy}. With the new scan 
the rule will build a new {@link FlinkLogicalCalc}.
+ */
+public class PushWatermarkIntoTableSourceScanAcrossCalcRule extends 
PushWatermarkIntoTableSourceScanRuleBase {
+       public static final PushWatermarkIntoTableSourceScanAcrossCalcRule 
INSTANCE = new PushWatermarkIntoTableSourceScanAcrossCalcRule();
+
+       public PushWatermarkIntoTableSourceScanAcrossCalcRule() {
+               super(operand(FlinkLogicalWatermarkAssigner.class,
+                               operand(FlinkLogicalCalc.class,
+                                               
operand(FlinkLogicalTableSourceScan.class, none()))),
+                               
"PushWatermarkIntoFlinkTableSourceScanAcrossProjectRule");

Review comment:
       "PushWatermarkIntoTableSourceScanAcrossCalcRule"

##########
File path: 
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.java
##########
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc;
+import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableSourceScan;
+import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalWatermarkAssigner;
+import 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram;
+import 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgramBuilder;
+import 
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgramBuilder;
+import 
org.apache.flink.table.planner.plan.optimize.program.HEP_RULES_EXECUTION_TYPE;
+import 
org.apache.flink.table.planner.plan.optimize.program.StreamOptimizeContext;
+import org.apache.flink.table.planner.utils.StreamTableTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.hep.HepMatchOrder;
+import org.apache.calcite.rel.rules.CoreRules;
+import org.apache.calcite.tools.RuleSets;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+
+/**
+ * Test rule {@link PushWatermarkIntoTableSourceScanAcrossCalcRule} and {@link 
PushWatermarkIntoTableSourceScanRule}.
+ * */
+public class PushWatermarkIntoTableSourceScanRuleTest extends TableTestBase {
+       private StreamTableTestUtil util = streamTestUtil(new TableConfig());
+
+       @Before
+       public void setup() {
+               FlinkChainedProgram<StreamOptimizeContext> program = new 
FlinkChainedProgram<>();
+               program.addLast(
+                               "Converter",
+                               
FlinkVolcanoProgramBuilder.<StreamOptimizeContext>newBuilder()
+                                               .add(RuleSets.ofList(
+                                                               
CoreRules.PROJECT_TO_CALC,
+                                                               
CoreRules.FILTER_TO_CALC,
+                                                               
FlinkLogicalCalc.CONVERTER(),
+                                                               
FlinkLogicalTableSourceScan.CONVERTER(),
+                                                               
FlinkLogicalWatermarkAssigner.CONVERTER()
+                                                               ))
+                                               .setRequiredOutputTraits(new 
Convention[] {FlinkConventions.LOGICAL()})
+                                               .build()
+               );
+               program.addLast(
+                               "PushWatermarkIntoTableSourceScanRule",
+                               
FlinkHepRuleSetProgramBuilder.<StreamOptimizeContext>newBuilder()
+                                               
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE())
+                                               
.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
+                                               .add(RuleSets.ofList(
+                                                               
PushWatermarkIntoTableSourceScanRule.INSTANCE,
+                                                               
PushWatermarkIntoTableSourceScanAcrossCalcRule.INSTANCE))
+                                               .build()
+               );
+               util.replaceStreamProgram(program);
+       }
+
+       @Test
+       public void testSimpleWatermark() {
+               String ddl =
+                               "CREATE TABLE MyTable(" +
+                                               "  a INT,\n" +
+                                               "  b BIGINT,\n" +
+                                               "  c TIMESTAMP(3),\n" +
+                                               "  WATERMARK FOR c AS c - 
INTERVAL '5' SECOND\n" +
+                                               ") WITH (\n" +
+                                               "  'connector' = 'values',\n" +
+                                               "  'enable-watermark-push-down' 
= 'true',\n" +
+                                               "  'bounded' = 'false',\n" +
+                                               "  'disable-lookup' = 'true'" +
+                                               ")";
+               util.tableEnv().executeSql(ddl);
+               util.verifyPlan("select a, c from MyTable");
+       }
+
+       @Test
+       public void testWatermarkOnComputedColumn() {
+               String ddl =
+                               "CREATE TABLE MyTable(" +
+                                               "  a INT,\n" +
+                                               "  b BIGINT,\n" +
+                                               "  c TIMESTAMP(3),\n" +
+                                               "  d AS c + INTERVAL '5' 
SECOND,\n" +
+                                               "  WATERMARK FOR d AS d - 
INTERVAL '5' SECOND\n" +
+                                               ") WITH (\n" +
+                                               " 'connector' = 'values',\n" +
+                                               " 'enable-watermark-push-down' 
= 'true',\n" +
+                                               " 'bounded' = 'false',\n" +
+                                               " 'disable-lookup' = 'true'" +
+                                               ")";
+               util.tableEnv().executeSql(ddl);
+               util.verifyPlan("SELECT * from MyTable");
+       }
+
+       @Test
+       public void testWatermarkOnComputedColumnWithQuery() {
+               String ddl =
+                               "CREATE TABLE MyTable(" +
+                                               "  a INT,\n" +
+                                               "  b BIGINT,\n" +
+                                               "  c TIMESTAMP(3) NOT NULL,\n" +
+                                               "  d AS c + INTERVAL '5' 
SECOND,\n" +
+                                               "  WATERMARK FOR d AS d - 
INTERVAL '5' SECOND\n" +
+                                               ") WITH (\n" +
+                                               "  'connector' = 'values',\n" +
+                                               "  'enable-watermark-push-down' 
= 'true',\n" +
+                                               "  'bounded' = 'false',\n" +
+                                               "  'disable-lookup' = 'true'" +
+                                               ")";
+               util.tableEnv().executeSql(ddl);
+               util.verifyPlan("SELECT a, b FROM MyTable WHERE d > 
TO_TIMESTAMP('2020-10-09 12:12:12')");
+       }
+
+       @Test
+       public void testWatermarkOnComputedColumnWithMultipleInputs() {
+               String ddl =
+                               "CREATE TABLE MyTable(" +
+                                               "  a STRING,\n" +
+                                               "  b STRING,\n" +
+                                               "  c as TO_TIMESTAMP(a, b),\n" +
+                                               "  WATERMARK FOR c AS c - 
INTERVAL '5' SECOND\n" +
+                                               ") WITH (\n" +
+                                               "  'connector' = 'values',\n" +
+                                               "  'enable-watermark-push-down' 
= 'true',\n" +
+                                               "  'bounded' = 'false',\n" +
+                                               "  'disable-lookup' = 'true'" +
+                                               ")";
+               util.tableEnv().executeSql(ddl);
+               util.verifyPlan("SELECT * FROM MyTable");
+       }
+
+       @Test
+       public void testWatermarkOnRow() {
+               String ddl =
+                               "CREATE TABLE MyTable(" +
+                                               "  a INT,\n" +
+                                               "  b BIGINT,\n" +
+                                               "  c ROW<name STRING, d 
TIMESTAMP(3)>," +
+                                               "  e AS c.d," +
+                                               "  WATERMARK FOR e AS e - 
INTERVAL '5' SECOND\n" +
+                                               ") WITH (\n" +
+                                               "  'connector' = 'values',\n" +
+                                               "  'enable-watermark-push-down' 
= 'true',\n" +
+                                               "  'bounded' = 'false',\n" +
+                                               "  'disable-lookup' = 'true'" +
+                                               ")";
+               util.tableEnv().executeSql(ddl);
+               util.verifyPlan("SELECT * FROM MyTable");
+       }
+
+       @Test
+       public void testWatermarkOnNestedRow() {
+               String ddl =
+                               "CREATE TABLE MyTable(" +
+                                               "  a INT,\n" +
+                                               "  b BIGINT,\n" +
+                                               "  c ROW<name STRING, d row<e 
STRING, f TIMESTAMP(3)>>," +
+                                               "  g as c.d.f," +
+                                               "  WATERMARK for g as g - 
INTERVAL '5' SECOND\n" +
+                                               ") WITH (\n" +
+                                               "  'connector' = 'values',\n" +
+                                               "  'enable-watermark-push-down' 
= 'true',\n" +
+                                               "  'bounded' = 'false',\n" +
+                                               "  'disable-lookup' = 'true'" +
+                                               ")";
+               util.tableEnv().executeSql(ddl);
+               util.verifyPlan("SELECT * FROM MyTable");
+       }
+
+       @Test
+       public void testWatermarkWithUdf() {
+               util.addFunction("func1", new InnerUdf());
+               String ddl =
+                               "CREATE TABLE MyTable(" +
+                                               "  a INT,\n" +
+                                               "  b BIGINT,\n" +
+                                               "  c TIMESTAMP(3)," +
+                                               "  d AS func1(c)," +
+                                               "  WATERMARK FOR d AS d - 
INTERVAL '5' SECOND\n" +
+                                               ") WITH (\n" +
+                                               "  'connector' = 'values',\n" +
+                                               "  'enable-watermark-push-down' 
= 'true',\n" +
+                                               "  'bounded' = 'false',\n" +
+                                               "  'disable-lookup' = 'true'" +
+                                               ")";
+               util.tableEnv().executeSql(ddl);
+               util.verifyPlan("SELECT * FROM MyTable");
+       }
+
+       /**
+        * Udf for test.
+        * */
+       public static class InnerUdf extends ScalarFunction {

Review comment:
       define this class in JavaUserDefinedScalarFunctions

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/ProjectWatermarkAssignerTransposeRule.java
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import 
org.apache.flink.table.planner.plan.nodes.calcite.LogicalWatermarkAssigner;
+import org.apache.flink.table.planner.plan.utils.NestedColumn;
+import org.apache.flink.table.planner.plan.utils.NestedProjectionUtil;
+import org.apache.flink.table.planner.plan.utils.NestedSchema;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.rules.ProjectRemoveRule;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.tools.RelBuilder;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Transpose between the {@link LogicalWatermarkAssigner} and {@link 
LogicalProject}. The transposed {@link LogicalProject}
+ * works like a pruner to prune the unused fields from source. The top level 
{@link LogicalProject} still has to do the
+ * calculation, filter and prune the rowtime column if the query doesn't need.
+ *
+ * <p>NOTES: Currently the rule doesn't support nested projection push down.
+ */
+public class ProjectWatermarkAssignerTransposeRule extends RelOptRule {
+
+       public static final ProjectWatermarkAssignerTransposeRule INSTANCE = 
new ProjectWatermarkAssignerTransposeRule();
+
+       public ProjectWatermarkAssignerTransposeRule() {
+               super(operand(LogicalProject.class,
+                               operand(LogicalWatermarkAssigner.class, any())),
+                               "FlinkProjectWatermarkAssignerTransposeRule");
+       }
+
+       @Override
+       public boolean matches(RelOptRuleCall call) {
+               LogicalProject project = call.rel(0);
+               LogicalWatermarkAssigner watermarkAssigner = call.rel(1);
+
+               int rowTimeIndex = watermarkAssigner.rowtimeFieldIndex();
+               String rowTimeName = 
watermarkAssigner.getRowType().getFieldNames().get(rowTimeIndex);
+               NestedSchema schema = 
NestedProjectionUtil.build(project.getProjects(), 
project.getInput().getRowType());
+
+               // The field count difference between the used column in the 
input and in top level projection is always non-negative.
+               // At the beginning, the optimization hasn't applied the rule, 
the input has more columns.
+               // After the rule is applied, it will always push the used 
column (including the rowtime) under the watermark assigner.
+               // Therefore, if the used columns in the input are as same as 
the input, it doesn't need apply the rule.
+
+               // For nested projection, it needs one more check: all top 
level column in nestedSchema are leaves.
+               if (schema.columns().containsKey(rowTimeName)) {
+                       return schema.columns().size() != 
watermarkAssigner.getInput().getRowType().getFieldCount();
+               } else {
+                       return 
watermarkAssigner.getInput().getRowType().getFieldCount() - 
schema.columns().size() != 1;
+               }
+
+       }
+
+       @Override
+       public void onMatch(RelOptRuleCall call) {
+               LogicalProject project = call.rel(0);
+               LogicalWatermarkAssigner watermarkAssigner = call.rel(1);
+
+               NestedSchema nestedSchema = 
NestedProjectionUtil.build(project.getProjects(), 
watermarkAssigner.getRowType());
+               RelBuilder builder = 
call.builder().push(watermarkAssigner.getInput());
+               List<RexInputRef> transposedProjects = new LinkedList<>();
+               List<String> usedNames = new LinkedList<>();
+
+               // TODO: support nested projection push down in transpose
+               // add the used column RexInputRef and names into list
+               for (NestedColumn column: nestedSchema.columns().values()) {
+                       // mark by hand
+                       column.setIndex(transposedProjects.size());
+                       column.markLeaf();
+
+                       usedNames.add(column.name());
+                       
transposedProjects.add(builder.field(column.indexInOriginSchema()));
+               }
+
+               // get the rowtime field index in the transposed project
+               String rowTimeName = 
watermarkAssigner.getRowType().getFieldNames().get(watermarkAssigner.rowtimeFieldIndex());
+               int indexOfRowTimeInTransposedProject;
+               if (nestedSchema.columns().get(rowTimeName) == null) {
+                       // push the RexInputRef of the rowtime into the list
+                       int rowTimeIndexInInput = 
watermarkAssigner.rowtimeFieldIndex();
+                       indexOfRowTimeInTransposedProject = 
transposedProjects.size();
+                       
transposedProjects.add(builder.field(rowTimeIndexInInput));
+                       usedNames.add(rowTimeName);
+               } else {
+                       //find rowtime ref in the list and mark the location
+                       indexOfRowTimeInTransposedProject = 
nestedSchema.columns().get(rowTimeName).indexOfLeafInNewSchema();
+               }
+
+               // the rowtime column has no rowtime indicator
+               LogicalProject transposedProject =
+                               
LogicalProject.create(watermarkAssigner.getInput(), project.getHints(), 
transposedProjects, usedNames);
+
+               RexNode newWatermarkExpr = 
watermarkAssigner.watermarkExpr().accept(new RexShuttle() {
+                       @Override
+                       public RexNode visitInputRef(RexInputRef inputRef) {
+                               return new 
RexInputRef(indexOfRowTimeInTransposedProject, inputRef.getType());
+                       }
+               });
+
+               LogicalWatermarkAssigner newWatermarkAssigner = 
LogicalWatermarkAssigner.create(

Review comment:
       builder.watermark(xx)

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/ProjectWatermarkAssignerTransposeRule.java
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import 
org.apache.flink.table.planner.plan.nodes.calcite.LogicalWatermarkAssigner;
+import org.apache.flink.table.planner.plan.utils.NestedColumn;
+import org.apache.flink.table.planner.plan.utils.NestedProjectionUtil;
+import org.apache.flink.table.planner.plan.utils.NestedSchema;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.rules.ProjectRemoveRule;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.tools.RelBuilder;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Transpose between the {@link LogicalWatermarkAssigner} and {@link 
LogicalProject}. The transposed {@link LogicalProject}
+ * works like a pruner to prune the unused fields from source. The top level 
{@link LogicalProject} still has to do the
+ * calculation, filter and prune the rowtime column if the query doesn't need.
+ *
+ * <p>NOTES: Currently the rule doesn't support nested projection push down.
+ */
+public class ProjectWatermarkAssignerTransposeRule extends RelOptRule {
+
+       public static final ProjectWatermarkAssignerTransposeRule INSTANCE = 
new ProjectWatermarkAssignerTransposeRule();
+
+       public ProjectWatermarkAssignerTransposeRule() {
+               super(operand(LogicalProject.class,
+                               operand(LogicalWatermarkAssigner.class, any())),
+                               "FlinkProjectWatermarkAssignerTransposeRule");
+       }
+
+       @Override
+       public boolean matches(RelOptRuleCall call) {
+               LogicalProject project = call.rel(0);
+               LogicalWatermarkAssigner watermarkAssigner = call.rel(1);
+
+               int rowTimeIndex = watermarkAssigner.rowtimeFieldIndex();
+               String rowTimeName = 
watermarkAssigner.getRowType().getFieldNames().get(rowTimeIndex);
+               NestedSchema schema = 
NestedProjectionUtil.build(project.getProjects(), 
project.getInput().getRowType());
+
+               // The field count difference between the used column in the 
input and in top level projection is always non-negative.
+               // At the beginning, the optimization hasn't applied the rule, 
the input has more columns.
+               // After the rule is applied, it will always push the used 
column (including the rowtime) under the watermark assigner.
+               // Therefore, if the used columns in the input are as same as 
the input, it doesn't need apply the rule.
+
+               // For nested projection, it needs one more check: all top 
level column in nestedSchema are leaves.
+               if (schema.columns().containsKey(rowTimeName)) {
+                       return schema.columns().size() != 
watermarkAssigner.getInput().getRowType().getFieldCount();
+               } else {
+                       return 
watermarkAssigner.getInput().getRowType().getFieldCount() - 
schema.columns().size() != 1;
+               }
+
+       }
+
+       @Override
+       public void onMatch(RelOptRuleCall call) {
+               LogicalProject project = call.rel(0);
+               LogicalWatermarkAssigner watermarkAssigner = call.rel(1);
+
+               NestedSchema nestedSchema = 
NestedProjectionUtil.build(project.getProjects(), 
watermarkAssigner.getRowType());
+               RelBuilder builder = 
call.builder().push(watermarkAssigner.getInput());
+               List<RexInputRef> transposedProjects = new LinkedList<>();
+               List<String> usedNames = new LinkedList<>();
+
+               // TODO: support nested projection push down in transpose
+               // add the used column RexInputRef and names into list
+               for (NestedColumn column: nestedSchema.columns().values()) {
+                       // mark by hand
+                       column.setIndex(transposedProjects.size());
+                       column.markLeaf();
+
+                       usedNames.add(column.name());
+                       
transposedProjects.add(builder.field(column.indexInOriginSchema()));
+               }
+
+               // get the rowtime field index in the transposed project
+               String rowTimeName = 
watermarkAssigner.getRowType().getFieldNames().get(watermarkAssigner.rowtimeFieldIndex());
+               int indexOfRowTimeInTransposedProject;
+               if (nestedSchema.columns().get(rowTimeName) == null) {
+                       // push the RexInputRef of the rowtime into the list
+                       int rowTimeIndexInInput = 
watermarkAssigner.rowtimeFieldIndex();
+                       indexOfRowTimeInTransposedProject = 
transposedProjects.size();
+                       
transposedProjects.add(builder.field(rowTimeIndexInInput));
+                       usedNames.add(rowTimeName);
+               } else {
+                       //find rowtime ref in the list and mark the location
+                       indexOfRowTimeInTransposedProject = 
nestedSchema.columns().get(rowTimeName).indexOfLeafInNewSchema();
+               }
+
+               // the rowtime column has no rowtime indicator
+               LogicalProject transposedProject =

Review comment:
       `builder.project(transposedProjects, usedNames);`

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/ProjectWatermarkAssignerTransposeRule.java
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import 
org.apache.flink.table.planner.plan.nodes.calcite.LogicalWatermarkAssigner;
+import org.apache.flink.table.planner.plan.utils.NestedColumn;
+import org.apache.flink.table.planner.plan.utils.NestedProjectionUtil;
+import org.apache.flink.table.planner.plan.utils.NestedSchema;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.rules.ProjectRemoveRule;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.tools.RelBuilder;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Transpose between the {@link LogicalWatermarkAssigner} and {@link 
LogicalProject}. The transposed {@link LogicalProject}
+ * works like a pruner to prune the unused fields from source. The top level 
{@link LogicalProject} still has to do the
+ * calculation, filter and prune the rowtime column if the query doesn't need.
+ *
+ * <p>NOTES: Currently the rule doesn't support nested projection push down.
+ */
+public class ProjectWatermarkAssignerTransposeRule extends RelOptRule {
+
+       public static final ProjectWatermarkAssignerTransposeRule INSTANCE = 
new ProjectWatermarkAssignerTransposeRule();
+
+       public ProjectWatermarkAssignerTransposeRule() {
+               super(operand(LogicalProject.class,
+                               operand(LogicalWatermarkAssigner.class, any())),
+                               "FlinkProjectWatermarkAssignerTransposeRule");
+       }
+
+       @Override
+       public boolean matches(RelOptRuleCall call) {
+               LogicalProject project = call.rel(0);
+               LogicalWatermarkAssigner watermarkAssigner = call.rel(1);
+
+               int rowTimeIndex = watermarkAssigner.rowtimeFieldIndex();
+               String rowTimeName = 
watermarkAssigner.getRowType().getFieldNames().get(rowTimeIndex);
+               NestedSchema schema = 
NestedProjectionUtil.build(project.getProjects(), 
project.getInput().getRowType());
+
+               // The field count difference between the used column in the 
input and in top level projection is always non-negative.
+               // At the beginning, the optimization hasn't applied the rule, 
the input has more columns.
+               // After the rule is applied, it will always push the used 
column (including the rowtime) under the watermark assigner.
+               // Therefore, if the used columns in the input are as same as 
the input, it doesn't need apply the rule.
+
+               // For nested projection, it needs one more check: all top 
level column in nestedSchema are leaves.
+               if (schema.columns().containsKey(rowTimeName)) {
+                       return schema.columns().size() != 
watermarkAssigner.getInput().getRowType().getFieldCount();
+               } else {
+                       return 
watermarkAssigner.getInput().getRowType().getFieldCount() - 
schema.columns().size() != 1;
+               }
+
+       }
+
+       @Override
+       public void onMatch(RelOptRuleCall call) {
+               LogicalProject project = call.rel(0);
+               LogicalWatermarkAssigner watermarkAssigner = call.rel(1);
+
+               NestedSchema nestedSchema = 
NestedProjectionUtil.build(project.getProjects(), 
watermarkAssigner.getRowType());
+               RelBuilder builder = 
call.builder().push(watermarkAssigner.getInput());
+               List<RexInputRef> transposedProjects = new LinkedList<>();
+               List<String> usedNames = new LinkedList<>();
+
+               // TODO: support nested projection push down in transpose
+               // add the used column RexInputRef and names into list
+               for (NestedColumn column: nestedSchema.columns().values()) {
+                       // mark by hand
+                       column.setIndex(transposedProjects.size());
+                       column.markLeaf();
+
+                       usedNames.add(column.name());
+                       
transposedProjects.add(builder.field(column.indexInOriginSchema()));
+               }
+
+               // get the rowtime field index in the transposed project
+               String rowTimeName = 
watermarkAssigner.getRowType().getFieldNames().get(watermarkAssigner.rowtimeFieldIndex());
+               int indexOfRowTimeInTransposedProject;
+               if (nestedSchema.columns().get(rowTimeName) == null) {
+                       // push the RexInputRef of the rowtime into the list
+                       int rowTimeIndexInInput = 
watermarkAssigner.rowtimeFieldIndex();
+                       indexOfRowTimeInTransposedProject = 
transposedProjects.size();
+                       
transposedProjects.add(builder.field(rowTimeIndexInInput));
+                       usedNames.add(rowTimeName);
+               } else {
+                       //find rowtime ref in the list and mark the location
+                       indexOfRowTimeInTransposedProject = 
nestedSchema.columns().get(rowTimeName).indexOfLeafInNewSchema();
+               }
+
+               // the rowtime column has no rowtime indicator
+               LogicalProject transposedProject =
+                               
LogicalProject.create(watermarkAssigner.getInput(), project.getHints(), 
transposedProjects, usedNames);
+
+               RexNode newWatermarkExpr = 
watermarkAssigner.watermarkExpr().accept(new RexShuttle() {
+                       @Override
+                       public RexNode visitInputRef(RexInputRef inputRef) {
+                               return new 
RexInputRef(indexOfRowTimeInTransposedProject, inputRef.getType());
+                       }

Review comment:
       overwrite the ref only its index equals to the index of rowtime

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanAcrossCalcRule.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc;
+import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableSourceScan;
+import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalWatermarkAssigner;
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLocalRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexProgram;
+import org.apache.calcite.rex.RexProgramBuilder;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.util.Pair;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Rule to push the {@link FlinkLogicalWatermarkAssigner} across the {@link 
FlinkLogicalCalc} to the {@link FlinkLogicalTableSourceScan}.
+ * The rule will first look for the computed column in the {@link 
FlinkLogicalCalc} and then translate the watermark expression
+ * and the computed column into a {@link WatermarkStrategy}. With the new scan 
the rule will build a new {@link FlinkLogicalCalc}.
+ */
+public class PushWatermarkIntoTableSourceScanAcrossCalcRule extends 
PushWatermarkIntoTableSourceScanRuleBase {
+       public static final PushWatermarkIntoTableSourceScanAcrossCalcRule 
INSTANCE = new PushWatermarkIntoTableSourceScanAcrossCalcRule();
+
+       public PushWatermarkIntoTableSourceScanAcrossCalcRule() {
+               super(operand(FlinkLogicalWatermarkAssigner.class,
+                               operand(FlinkLogicalCalc.class,
+                                               
operand(FlinkLogicalTableSourceScan.class, none()))),
+                               
"PushWatermarkIntoFlinkTableSourceScanAcrossProjectRule");
+       }
+
+       @Override
+       public boolean matches(RelOptRuleCall call) {
+               FlinkLogicalTableSourceScan scan = call.rel(2);
+               return supportsWatermarkPushDown(scan);
+       }
+
+       @Override
+       public void onMatch(RelOptRuleCall call) {
+               FlinkLogicalWatermarkAssigner watermarkAssigner = call.rel(0);
+               FlinkLogicalCalc calc = call.rel(1);
+
+               RexProgram originProgram = calc.getProgram();
+               List<RexNode> projectList = 
originProgram.getProjectList().stream()
+                               .map(originProgram::expandLocalRef)
+                               .collect(Collectors.toList());
+
+               //get watermark expression
+               RexNode computedColumn = 
projectList.get(watermarkAssigner.rowtimeFieldIndex());
+               RexNode newWatermarkExpr = 
watermarkAssigner.watermarkExpr().accept(new RexShuttle() {
+                       @Override
+                       public RexNode visitInputRef(RexInputRef inputRef) {
+                               // replace the input ref of the rowtime with 
the computed column
+                               if (inputRef.getIndex() == 
watermarkAssigner.rowtimeFieldIndex()) {
+                                       return computedColumn;
+                               } else {
+                                       return inputRef;
+                               }
+                       }
+               });
+
+               // push watermark assigner into the scan
+               FlinkLogicalTableSourceScan newScan =
+                               getNewScan(watermarkAssigner, newWatermarkExpr, 
call.rel(2), ((FlinkContext) call.getPlanner().getContext()).getTableConfig());
+
+               FlinkTypeFactory typeFactory = (FlinkTypeFactory) 
watermarkAssigner.getCluster().getTypeFactory();
+               RexBuilder builder = call.builder().getRexBuilder();
+               // cast timestamp type to rowtime type.
+               RexNode newComputedColumn = builder.makeReinterpretCast(
+                               
typeFactory.createRowtimeIndicatorType(computedColumn.getType().isNullable()),
+                               computedColumn,
+                               null);
+
+               // build new calc program
+               RexProgramBuilder programBuilder = new 
RexProgramBuilder(newScan.getRowType(), builder);
+
+               for (int i = 0; i < projectList.size(); i++) {
+                       Pair<RexLocalRef, String> rexLocalRefStringPair = 
originProgram.getNamedProjects().get(i);

Review comment:
       use the element in `projectList `

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/ProjectWatermarkAssignerTransposeRule.java
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.logical;
+
+import 
org.apache.flink.table.planner.plan.nodes.calcite.LogicalWatermarkAssigner;
+import org.apache.flink.table.planner.plan.utils.NestedColumn;
+import org.apache.flink.table.planner.plan.utils.NestedProjectionUtil;
+import org.apache.flink.table.planner.plan.utils.NestedSchema;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.rules.ProjectRemoveRule;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.tools.RelBuilder;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Transpose between the {@link LogicalWatermarkAssigner} and {@link 
LogicalProject}. The transposed {@link LogicalProject}
+ * works like a pruner to prune the unused fields from source. The top level 
{@link LogicalProject} still has to do the
+ * calculation, filter and prune the rowtime column if the query doesn't need.
+ *
+ * <p>NOTES: Currently the rule doesn't support nested projection push down.
+ */
+public class ProjectWatermarkAssignerTransposeRule extends RelOptRule {
+
+       public static final ProjectWatermarkAssignerTransposeRule INSTANCE = 
new ProjectWatermarkAssignerTransposeRule();
+
+       public ProjectWatermarkAssignerTransposeRule() {
+               super(operand(LogicalProject.class,
+                               operand(LogicalWatermarkAssigner.class, any())),
+                               "FlinkProjectWatermarkAssignerTransposeRule");
+       }
+
+       @Override
+       public boolean matches(RelOptRuleCall call) {
+               LogicalProject project = call.rel(0);
+               LogicalWatermarkAssigner watermarkAssigner = call.rel(1);
+
+               int rowTimeIndex = watermarkAssigner.rowtimeFieldIndex();
+               String rowTimeName = 
watermarkAssigner.getRowType().getFieldNames().get(rowTimeIndex);
+               NestedSchema schema = 
NestedProjectionUtil.build(project.getProjects(), 
project.getInput().getRowType());
+
+               // The field count difference between the used column in the 
input and in top level projection is always non-negative.
+               // At the beginning, the optimization hasn't applied the rule, 
the input has more columns.
+               // After the rule is applied, it will always push the used 
column (including the rowtime) under the watermark assigner.
+               // Therefore, if the used columns in the input are as same as 
the input, it doesn't need apply the rule.
+
+               // For nested projection, it needs one more check: all top 
level column in nestedSchema are leaves.
+               if (schema.columns().containsKey(rowTimeName)) {
+                       return schema.columns().size() != 
watermarkAssigner.getInput().getRowType().getFieldCount();
+               } else {
+                       return 
watermarkAssigner.getInput().getRowType().getFieldCount() - 
schema.columns().size() != 1;
+               }
+
+       }
+
+       @Override
+       public void onMatch(RelOptRuleCall call) {
+               LogicalProject project = call.rel(0);
+               LogicalWatermarkAssigner watermarkAssigner = call.rel(1);
+
+               NestedSchema nestedSchema = 
NestedProjectionUtil.build(project.getProjects(), 
watermarkAssigner.getRowType());
+               RelBuilder builder = 
call.builder().push(watermarkAssigner.getInput());
+               List<RexInputRef> transposedProjects = new LinkedList<>();
+               List<String> usedNames = new LinkedList<>();
+
+               // TODO: support nested projection push down in transpose
+               // add the used column RexInputRef and names into list
+               for (NestedColumn column: nestedSchema.columns().values()) {
+                       // mark by hand
+                       column.setIndex(transposedProjects.size());
+                       column.markLeaf();
+
+                       usedNames.add(column.name());
+                       
transposedProjects.add(builder.field(column.indexInOriginSchema()));
+               }
+
+               // get the rowtime field index in the transposed project
+               String rowTimeName = 
watermarkAssigner.getRowType().getFieldNames().get(watermarkAssigner.rowtimeFieldIndex());
+               int indexOfRowTimeInTransposedProject;
+               if (nestedSchema.columns().get(rowTimeName) == null) {
+                       // push the RexInputRef of the rowtime into the list
+                       int rowTimeIndexInInput = 
watermarkAssigner.rowtimeFieldIndex();
+                       indexOfRowTimeInTransposedProject = 
transposedProjects.size();
+                       
transposedProjects.add(builder.field(rowTimeIndexInInput));
+                       usedNames.add(rowTimeName);
+               } else {
+                       //find rowtime ref in the list and mark the location
+                       indexOfRowTimeInTransposedProject = 
nestedSchema.columns().get(rowTimeName).indexOfLeafInNewSchema();
+               }
+
+               // the rowtime column has no rowtime indicator
+               LogicalProject transposedProject =
+                               
LogicalProject.create(watermarkAssigner.getInput(), project.getHints(), 
transposedProjects, usedNames);
+
+               RexNode newWatermarkExpr = 
watermarkAssigner.watermarkExpr().accept(new RexShuttle() {
+                       @Override
+                       public RexNode visitInputRef(RexInputRef inputRef) {
+                               return new 
RexInputRef(indexOfRowTimeInTransposedProject, inputRef.getType());
+                       }
+               });
+
+               LogicalWatermarkAssigner newWatermarkAssigner = 
LogicalWatermarkAssigner.create(
+                               watermarkAssigner.getCluster(),
+                               transposedProject,
+                               indexOfRowTimeInTransposedProject,
+                               newWatermarkExpr);
+
+               List<RexNode> newProjects = 
NestedProjectionUtil.rewrite(project.getProjects(), nestedSchema, 
call.builder().getRexBuilder());
+               LogicalProject newProject = 
LogicalProject.create(newWatermarkAssigner, project.getHints(), newProjects, 
project.getRowType());

Review comment:
       ditto




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to