This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 110a044  [FLINK-20745][table] Clean useless codes: Never push 
calcProgram to correlate
110a044 is described below

commit 110a0444f347e35fb1a77ea85d9f5f235461ad44
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Dec 24 11:26:30 2020 +0800

    [FLINK-20745][table] Clean useless codes: Never push calcProgram to 
correlate
    
    This closes #14474
---
 .../plan/nodes/exec/batch/BatchExecCorrelate.java  |   3 -
 .../nodes/exec/common/CommonExecCorrelate.java     |   6 -
 .../nodes/exec/stream/StreamExecCorrelate.java     |   3 -
 .../batch/BatchPhysicalPythonCorrelateRule.java    |   1 -
 .../stream/StreamPhysicalPythonCorrelateRule.java  |   1 -
 .../planner/codegen/CorrelateCodeGenerator.scala   | 185 +++------------------
 .../physical/batch/BatchPhysicalCorrelate.scala    |   7 +-
 .../batch/BatchPhysicalCorrelateBase.scala         |  34 +---
 .../batch/BatchPhysicalPythonCorrelate.scala       |   6 +-
 .../physical/stream/StreamPhysicalCorrelate.scala  |   7 +-
 .../stream/StreamPhysicalCorrelateBase.scala       |   6 +-
 .../stream/StreamPhysicalPythonCorrelate.scala     |   6 +-
 ...atchPhysicalConstantTableFunctionScanRule.scala |   1 -
 .../batch/BatchPhysicalCorrelateRule.scala         |   1 -
 ...reamPhysicalConstantTableFunctionScanRule.scala |   1 -
 .../stream/StreamPhysicalCorrelateRule.scala       |   1 -
 16 files changed, 34 insertions(+), 235 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecCorrelate.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecCorrelate.java
index 4495620..d146bbd 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecCorrelate.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecCorrelate.java
@@ -27,7 +27,6 @@ import org.apache.flink.table.types.logical.RowType;
 
 import org.apache.calcite.rex.RexCall;
 import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.rex.RexProgram;
 
 import javax.annotation.Nullable;
 
@@ -38,7 +37,6 @@ public class BatchExecCorrelate extends CommonExecCorrelate 
implements BatchExec
 
        public BatchExecCorrelate(
                        FlinkJoinType joinType,
-                       @Nullable RexProgram project,
                        RexCall invocation,
                        @Nullable RexNode condition,
                        ExecEdge inputEdge,
@@ -46,7 +44,6 @@ public class BatchExecCorrelate extends CommonExecCorrelate 
implements BatchExec
                        String description) {
                super(
                                joinType,
-                               project,
                                invocation,
                                condition,
                                TableStreamOperator.class,
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecCorrelate.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecCorrelate.java
index 1ec1ccb..35dd52a 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecCorrelate.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecCorrelate.java
@@ -32,7 +32,6 @@ import org.apache.flink.table.types.logical.RowType;
 
 import org.apache.calcite.rex.RexCall;
 import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.rex.RexProgram;
 
 import javax.annotation.Nullable;
 
@@ -44,8 +43,6 @@ import java.util.Optional;
  */
 public abstract class CommonExecCorrelate extends ExecNodeBase<RowData> {
        private final FlinkJoinType joinType;
-       @Nullable
-       private final RexProgram project;
        private final RexCall invocation;
        @Nullable
        private final RexNode condition;
@@ -54,7 +51,6 @@ public abstract class CommonExecCorrelate extends 
ExecNodeBase<RowData> {
 
        public CommonExecCorrelate(
                        FlinkJoinType joinType,
-                       @Nullable RexProgram project,
                        RexCall invocation,
                        @Nullable RexNode condition,
                        Class<?> operatorBaseClass,
@@ -64,7 +60,6 @@ public abstract class CommonExecCorrelate extends 
ExecNodeBase<RowData> {
                        String description) {
                super(Collections.singletonList(inputEdge), outputType, 
description);
                this.joinType = joinType;
-               this.project = project;
                this.invocation = invocation;
                this.condition = condition;
                this.operatorBaseClass = operatorBaseClass;
@@ -83,7 +78,6 @@ public abstract class CommonExecCorrelate extends 
ExecNodeBase<RowData> {
                                ctx,
                                inputTransform,
                                (RowType) inputNode.getOutputType(),
-                               
JavaScalaConversionUtil.toScala(Optional.ofNullable(project)),
                                invocation,
                                
JavaScalaConversionUtil.toScala(Optional.ofNullable(condition)),
                                (RowType) getOutputType(),
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecCorrelate.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecCorrelate.java
index 5270af3..3d86548 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecCorrelate.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecCorrelate.java
@@ -28,7 +28,6 @@ import org.apache.flink.table.types.logical.RowType;
 
 import org.apache.calcite.rex.RexCall;
 import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.rex.RexProgram;
 
 import javax.annotation.Nullable;
 
@@ -39,7 +38,6 @@ public class StreamExecCorrelate extends CommonExecCorrelate 
implements StreamEx
 
        public StreamExecCorrelate(
                        FlinkJoinType joinType,
-                       @Nullable RexProgram project,
                        RexCall invocation,
                        @Nullable RexNode condition,
                        ExecEdge inputEdge,
@@ -47,7 +45,6 @@ public class StreamExecCorrelate extends CommonExecCorrelate 
implements StreamEx
                        String description) {
                super(
                                joinType,
-                               project,
                                invocation,
                                condition,
                                AbstractProcessStreamOperator.class,
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalPythonCorrelateRule.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalPythonCorrelateRule.java
index 748b10d..9467b6a 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalPythonCorrelateRule.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalPythonCorrelateRule.java
@@ -118,7 +118,6 @@ public class BatchPhysicalPythonCorrelateRule extends 
ConverterRule {
                                        convInput,
                                        scan,
                                        condition,
-                                       null,
                                        correlate.getRowType(),
                                        correlate.getJoinType());
                        }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonCorrelateRule.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonCorrelateRule.java
index b59bbf2..d44fbee 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonCorrelateRule.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonCorrelateRule.java
@@ -124,7 +124,6 @@ public class StreamPhysicalPythonCorrelateRule extends 
ConverterRule {
                                        correlate.getCluster(),
                                        traitSet,
                                        convInput,
-                                       null,
                                        scan,
                                        condition,
                                        correlate.getRowType(),
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala
index 000cefb..79bf788 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala
@@ -21,8 +21,8 @@ package org.apache.flink.table.planner.codegen
 import org.apache.flink.api.common.functions.Function
 import org.apache.flink.api.dag.Transformation
 import org.apache.flink.table.api.{TableConfig, TableException, 
ValidationException}
+import org.apache.flink.table.data.RowData
 import org.apache.flink.table.data.utils.JoinedRowData
-import org.apache.flink.table.data.{GenericRowData, RowData}
 import org.apache.flink.table.functions.FunctionKind
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import org.apache.flink.table.planner.codegen.CodeGenUtils._
@@ -37,8 +37,6 @@ import org.apache.flink.table.types.logical.RowType
 
 import org.apache.calcite.rex._
 
-import scala.collection.JavaConversions._
-
 object CorrelateCodeGenerator {
 
   def generateCorrelateTransformation(
@@ -46,7 +44,6 @@ object CorrelateCodeGenerator {
       operatorCtx: CodeGeneratorContext,
       inputTransformation: Transformation[RowData],
       inputType: RowType,
-      projectProgram: Option[RexProgram],
       invocation: RexCall,
       condition: Option[RexNode],
       outputType: RowType,
@@ -68,19 +65,6 @@ object CorrelateCodeGenerator {
             s"Currently, only table functions can be used in a correlate 
operation.")
     }
 
-    val swallowInputOnly = if (projectProgram.isDefined) {
-      val program = projectProgram.get
-      val selects = program.getProjectList.map(_.getIndex)
-      val inputFieldCnt = program.getInputRowType.getFieldCount
-      val swallowInputOnly = selects.head > inputFieldCnt &&
-        (inputFieldCnt - outputType.getFieldCount == inputType.getFieldCount)
-      // partial output or output right only
-      swallowInputOnly
-    } else {
-      // completely output left input + right
-      false
-    }
-
     // adjust indicies of InputRefs to adhere to schema expected by generator
     val changeInputRefIndexShuttle = new RexShuttle {
       override def visitInputRef(inputRef: RexInputRef): RexNode = {
@@ -92,8 +76,6 @@ object CorrelateCodeGenerator {
       operatorCtx,
       config,
       inputType,
-      projectProgram,
-      swallowInputOnly,
       condition.map(_.accept(changeInputRefIndexShuttle)),
       outputType,
       joinType,
@@ -117,8 +99,6 @@ object CorrelateCodeGenerator {
       ctx: CodeGeneratorContext,
       config: TableConfig,
       inputType: RowType,
-      projectProgram: Option[RexProgram],
-      swallowInputOnly: Boolean = false,
       condition: Option[RexNode],
       returnType: RowType,
       joinType: FlinkJoinType,
@@ -136,8 +116,6 @@ object CorrelateCodeGenerator {
       ctx,
       config,
       inputType,
-      projectProgram,
-      swallowInputOnly,
       functionResultType,
       returnType,
       condition,
@@ -167,78 +145,27 @@ object CorrelateCodeGenerator {
 
     // 3. left join
     if (joinType == FlinkJoinType.LEFT) {
-      if (swallowInputOnly) {
-        // and the returned row table function is empty, collect a null
-        val nullRowTerm = CodeGenUtils.newName("nullRow")
-        ctx.addReusableOutputRecord(functionResultType, 
classOf[GenericRowData], nullRowTerm)
-        ctx.addReusableNullRow(nullRowTerm, functionResultType.getFieldCount)
-        val header = if (retainHeader) {
-          s"$nullRowTerm.setRowKind(${exprGenerator.input1Term}.getRowKind());"
-        } else {
-          ""
-        }
-        body +=
-          s"""
-             |boolean hasOutput = $correlateCollectorTerm.isCollected();
-             |if (!hasOutput) {
-             |  $header
-             |  $correlateCollectorTerm.outputResult($nullRowTerm);
-             |}
-             |""".stripMargin
-      } else if (projectProgram.isDefined) {
-        // output partial fields of left and right
-        val outputTerm = CodeGenUtils.newName("projectOut")
-        ctx.addReusableOutputRecord(returnType, classOf[GenericRowData], 
outputTerm)
-
-        val header = if (retainHeader) {
-          
s"$outputTerm.setRowKind(${CodeGenUtils.DEFAULT_INPUT1_TERM}.getRowKind());"
-        } else {
-          ""
-        }
-        val projectionExpression = generateProjectResultExpr(
-          ctx,
-          config,
-          inputType,
-          functionResultType,
-          udtfAlwaysNull = true,
-          returnType,
-          outputTerm,
-          projectProgram.get)
-
-        body +=
-          s"""
-             |boolean hasOutput = $correlateCollectorTerm.isCollected();
-             |if (!hasOutput) {
-             |  ${projectionExpression.code}
-             |  $header
-             |  $correlateCollectorTerm.outputResult($outputTerm);
-             |}
-             |""".stripMargin
-
+      // output all fields of left and right
+      // in case of left outer join and the returned row of table function is 
empty,
+      // fill all fields of row with null
+      val joinedRowTerm = CodeGenUtils.newName("joinedRow")
+      val nullRowTerm = CodeGenUtils.newName("nullRow")
+      ctx.addReusableOutputRecord(returnType, classOf[JoinedRowData], 
joinedRowTerm)
+      ctx.addReusableNullRow(nullRowTerm, functionResultType.getFieldCount)
+      val header = if (retainHeader) {
+        s"$joinedRowTerm.setRowKind(${exprGenerator.input1Term}.getRowKind());"
       } else {
-        // output all fields of left and right
-        // in case of left outer join and the returned row of table function 
is empty,
-        // fill all fields of row with null
-        val joinedRowTerm = CodeGenUtils.newName("joinedRow")
-        val nullRowTerm = CodeGenUtils.newName("nullRow")
-        ctx.addReusableOutputRecord(returnType, classOf[JoinedRowData], 
joinedRowTerm)
-        ctx.addReusableNullRow(nullRowTerm, functionResultType.getFieldCount)
-        val header = if (retainHeader) {
-          
s"$joinedRowTerm.setRowKind(${exprGenerator.input1Term}.getRowKind());"
-        } else {
-          ""
-        }
-        body +=
-          s"""
-             |boolean hasOutput = $correlateCollectorTerm.isCollected();
-             |if (!hasOutput) {
-             |  $joinedRowTerm.replace(${exprGenerator.input1Term}, 
$nullRowTerm);
-             |  $header
-             |  $correlateCollectorTerm.outputResult($joinedRowTerm);
-             |}
-             |""".stripMargin
-
-        }
+        ""
+      }
+      body +=
+        s"""
+           |boolean hasOutput = $correlateCollectorTerm.isCollected();
+           |if (!hasOutput) {
+           |  $joinedRowTerm.replace(${exprGenerator.input1Term}, 
$nullRowTerm);
+           |  $header
+           |  $correlateCollectorTerm.outputResult($joinedRowTerm);
+           |}
+           |""".stripMargin
     } else if (joinType != FlinkJoinType.INNER) {
       throw new TableException(s"Unsupported JoinRelType: $joinType for 
correlate join.")
     }
@@ -248,34 +175,6 @@ object CorrelateCodeGenerator {
     new CodeGenOperatorFactory(genOperator)
   }
 
-  private def generateProjectResultExpr(
-      ctx: CodeGeneratorContext,
-      config: TableConfig,
-      input1Type: RowType,
-      functionResultType: RowType,
-      udtfAlwaysNull: Boolean,
-      returnType: RowType,
-      outputTerm: String,
-      program: RexProgram): GeneratedExpression = {
-    val projectExprGenerator = new ExprCodeGenerator(ctx, udtfAlwaysNull)
-      .bindInput(input1Type, CodeGenUtils.DEFAULT_INPUT1_TERM)
-    if (udtfAlwaysNull) {
-      val udtfNullRow = CodeGenUtils.newName("udtfNullRow")
-      ctx.addReusableNullRow(udtfNullRow, functionResultType.getFieldCount)
-
-      projectExprGenerator.bindSecondInput(
-        functionResultType,
-        udtfNullRow)
-    } else {
-      projectExprGenerator.bindSecondInput(
-        functionResultType)
-    }
-    val projection = program.getProjectList.map(program.expandLocalRef)
-    val projectionExprs = 
projection.map(projectExprGenerator.generateExpression)
-    projectExprGenerator.generateResultExpression(
-      projectionExprs, returnType, classOf[GenericRowData], outputTerm)
-  }
-
   /**
    * Generates a collector that correlates input and converted table function 
results. Returns a
    * collector term for referencing the collector.
@@ -284,8 +183,6 @@ object CorrelateCodeGenerator {
       ctx: CodeGeneratorContext,
       config: TableConfig,
       inputType: RowType,
-      projectProgram: Option[RexProgram],
-      swallowInputOnly: Boolean,
       functionResultType: RowType,
       resultType: RowType,
       condition: Option[RexNode],
@@ -298,45 +195,7 @@ object CorrelateCodeGenerator {
 
     val collectorCtx = CodeGeneratorContext(config)
 
-    val body = if (projectProgram.isDefined) {
-      // partial output
-      if (swallowInputOnly) {
-        // output right only
-        val header = if (retainHeader) {
-          s"$udtfInputTerm.setRowKind($inputTerm.getRowKind());"
-        } else {
-          ""
-        }
-        s"""
-           |$header
-           |outputResult($udtfInputTerm);
-        """.stripMargin
-      } else {
-        val outputTerm = CodeGenUtils.newName("projectOut")
-        collectorCtx.addReusableOutputRecord(resultType, 
classOf[GenericRowData], outputTerm)
-
-        val header = if (retainHeader) {
-          s"$outputTerm.setRowKind($inputTerm.getRowKind());"
-        } else {
-          ""
-        }
-        val projectionExpression = generateProjectResultExpr(
-          collectorCtx,
-          config,
-          inputType,
-          functionResultType,
-          udtfAlwaysNull = false,
-          resultType,
-          outputTerm,
-          projectProgram.get)
-
-        s"""
-           |$header
-           |${projectionExpression.code}
-           |outputResult(${projectionExpression.resultTerm});
-        """.stripMargin
-      }
-    } else {
+    val body = {
       // completely output left input + right
       val joinedRowTerm = CodeGenUtils.newName("joinedRow")
       collectorCtx.addReusableOutputRecord(resultType, classOf[JoinedRowData], 
joinedRowTerm)
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalCorrelate.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalCorrelate.scala
index 6107acb..2b2026c 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalCorrelate.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalCorrelate.scala
@@ -27,7 +27,7 @@ import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.{Correlate, JoinRelType}
-import org.apache.calcite.rex.{RexCall, RexNode, RexProgram}
+import org.apache.calcite.rex.{RexCall, RexNode}
 
 /**
   * Batch physical RelNode for [[Correlate]] (Java/Scala user defined table 
function).
@@ -38,7 +38,6 @@ class BatchPhysicalCorrelate(
     inputRel: RelNode,
     scan: FlinkLogicalTableFunctionScan,
     condition: Option[RexNode],
-    projectProgram: Option[RexProgram],
     outputRowType: RelDataType,
     joinType: JoinRelType)
   extends BatchPhysicalCorrelateBase(
@@ -47,14 +46,12 @@ class BatchPhysicalCorrelate(
     inputRel,
     scan,
     condition,
-    projectProgram,
     outputRowType,
     joinType) {
 
   def copy(
       traitSet: RelTraitSet,
       child: RelNode,
-      projectProgram: Option[RexProgram],
       outputType: RelDataType): RelNode = {
     new BatchPhysicalCorrelate(
       cluster,
@@ -62,7 +59,6 @@ class BatchPhysicalCorrelate(
       child,
       scan,
       condition,
-      projectProgram,
       outputType,
       joinType)
   }
@@ -70,7 +66,6 @@ class BatchPhysicalCorrelate(
   override def translateToExecNode(): ExecNode[_] = {
     new BatchExecCorrelate(
       JoinTypeUtil.getFlinkJoinType(joinType),
-      projectProgram.orNull,
       scan.getCall.asInstanceOf[RexCall],
       condition.orNull,
       ExecEdge.DEFAULT,
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalCorrelateBase.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalCorrelateBase.scala
index 0462e30..6d11af2 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalCorrelateBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalCorrelateBase.scala
@@ -26,8 +26,7 @@ import org.apache.calcite.plan.{RelOptCluster, RelOptRule, 
RelTraitSet}
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.{Correlate, JoinRelType}
 import org.apache.calcite.rel.{RelCollationTraitDef, RelDistribution, 
RelFieldCollation, RelNode, RelWriter, SingleRel}
-import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode, RexProgram}
-import org.apache.calcite.sql.SqlKind
+import org.apache.calcite.rex.{RexCall, RexNode}
 import org.apache.calcite.util.mapping.{Mapping, MappingType, Mappings}
 
 import scala.collection.JavaConversions._
@@ -41,7 +40,6 @@ abstract class BatchPhysicalCorrelateBase(
     inputRel: RelNode,
     scan: FlinkLogicalTableFunctionScan,
     condition: Option[RexNode],
-    projectProgram: Option[RexProgram],
     outputRowType: RelDataType,
     joinType: JoinRelType)
   extends SingleRel(cluster, traitSet, inputRel)
@@ -52,7 +50,7 @@ abstract class BatchPhysicalCorrelateBase(
   override def deriveRowType(): RelDataType = outputRowType
 
   override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): 
RelNode = {
-    copy(traitSet, inputs.get(0), projectProgram, outputRowType)
+    copy(traitSet, inputs.get(0), outputRowType)
   }
 
   /**
@@ -61,7 +59,6 @@ abstract class BatchPhysicalCorrelateBase(
   def copy(
       traitSet: RelTraitSet,
       child: RelNode,
-      projectProgram: Option[RexProgram],
       outputType: RelDataType): RelNode
 
   override def explainTerms(pw: RelWriter): RelWriter = {
@@ -85,30 +82,11 @@ abstract class BatchPhysicalCorrelateBase(
 
     def getOutputInputMapping: Mapping = {
       val inputFieldCnt = getInput.getRowType.getFieldCount
-      projectProgram match {
-        case Some(program) =>
-          val projects = program.getProjectList.map(program.expandLocalRef)
-          val mapping = Mappings.create(MappingType.INVERSE_FUNCTION, 
inputFieldCnt, projects.size)
-          projects.zipWithIndex.foreach {
-            case (project, index) =>
-              project match {
-                case inputRef: RexInputRef => mapping.set(inputRef.getIndex, 
index)
-                case call: RexCall if call.getKind == SqlKind.AS =>
-                  call.getOperands.head match {
-                    case inputRef: RexInputRef => 
mapping.set(inputRef.getIndex, index)
-                    case _ => // ignore
-                  }
-                case _ => // ignore
-              }
-          }
-          mapping.inverse()
-        case _ =>
-          val mapping = Mappings.create(MappingType.FUNCTION, inputFieldCnt, 
inputFieldCnt)
-          (0 until inputFieldCnt).foreach {
-            index => mapping.set(index, index)
-          }
-          mapping
+      val mapping = Mappings.create(MappingType.FUNCTION, inputFieldCnt, 
inputFieldCnt)
+      (0 until inputFieldCnt).foreach {
+        index => mapping.set(index, index)
       }
+      mapping
     }
 
     val mapping = getOutputInputMapping
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalPythonCorrelate.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalPythonCorrelate.scala
index 1732472..c939583 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalPythonCorrelate.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalPythonCorrelate.scala
@@ -27,7 +27,7 @@ import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.{Correlate, JoinRelType}
-import org.apache.calcite.rex.{RexCall, RexNode, RexProgram}
+import org.apache.calcite.rex.{RexCall, RexNode}
 
 /**
   * Batch physical RelNode for [[Correlate]] (Python user defined table 
function).
@@ -38,7 +38,6 @@ class BatchPhysicalPythonCorrelate(
     inputRel: RelNode,
     scan: FlinkLogicalTableFunctionScan,
     condition: Option[RexNode],
-    projectProgram: Option[RexProgram],
     outputRowType: RelDataType,
     joinType: JoinRelType)
   extends BatchPhysicalCorrelateBase(
@@ -47,7 +46,6 @@ class BatchPhysicalPythonCorrelate(
     inputRel,
     scan,
     condition,
-    projectProgram,
     outputRowType,
     joinType)
   with CommonPythonCorrelate {
@@ -55,7 +53,6 @@ class BatchPhysicalPythonCorrelate(
   def copy(
       traitSet: RelTraitSet,
       child: RelNode,
-      projectProgram: Option[RexProgram],
       outputType: RelDataType): RelNode = {
     new BatchPhysicalPythonCorrelate(
       cluster,
@@ -63,7 +60,6 @@ class BatchPhysicalPythonCorrelate(
       child,
       scan,
       condition,
-      projectProgram,
       outputType,
       joinType)
   }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalCorrelate.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalCorrelate.scala
index 70bf369..7f1e992 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalCorrelate.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalCorrelate.scala
@@ -27,7 +27,7 @@ import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.JoinRelType
-import org.apache.calcite.rex.{RexCall, RexNode, RexProgram}
+import org.apache.calcite.rex.{RexCall, RexNode}
 
 /**
  * Flink RelNode which matches along with join a Java/Scala user defined table 
function.
@@ -36,7 +36,6 @@ class StreamPhysicalCorrelate(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
     inputRel: RelNode,
-    projectProgram: Option[RexProgram],
     scan: FlinkLogicalTableFunctionScan,
     condition: Option[RexNode],
     outputRowType: RelDataType,
@@ -45,7 +44,6 @@ class StreamPhysicalCorrelate(
     cluster,
     traitSet,
     inputRel,
-    projectProgram,
     scan,
     condition,
     outputRowType,
@@ -54,13 +52,11 @@ class StreamPhysicalCorrelate(
   def copy(
       traitSet: RelTraitSet,
       newChild: RelNode,
-      projectProgram: Option[RexProgram],
       outputType: RelDataType): RelNode = {
     new StreamPhysicalCorrelate(
       cluster,
       traitSet,
       newChild,
-      projectProgram,
       scan,
       condition,
       outputType,
@@ -70,7 +66,6 @@ class StreamPhysicalCorrelate(
   override def translateToExecNode(): ExecNode[_] = {
     new StreamExecCorrelate(
       JoinTypeUtil.getFlinkJoinType(joinType),
-      projectProgram.orNull,
       scan.getCall.asInstanceOf[RexCall],
       condition.orNull,
       ExecEdge.DEFAULT,
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalCorrelateBase.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalCorrelateBase.scala
index 0f0642c..a633924 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalCorrelateBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalCorrelateBase.scala
@@ -24,7 +24,7 @@ import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.JoinRelType
 import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
-import org.apache.calcite.rex.{RexCall, RexNode, RexProgram}
+import org.apache.calcite.rex.{RexCall, RexNode}
 
 import scala.collection.JavaConversions._
 
@@ -35,7 +35,6 @@ abstract class StreamPhysicalCorrelateBase(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
     inputRel: RelNode,
-    val projectProgram: Option[RexProgram],
     scan: FlinkLogicalTableFunctionScan,
     condition: Option[RexNode],
     outputRowType: RelDataType,
@@ -50,7 +49,7 @@ abstract class StreamPhysicalCorrelateBase(
   override def deriveRowType(): RelDataType = outputRowType
 
   override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): 
RelNode = {
-    copy(traitSet, inputs.get(0), projectProgram, outputRowType)
+    copy(traitSet, inputs.get(0), outputRowType)
   }
 
   /**
@@ -59,7 +58,6 @@ abstract class StreamPhysicalCorrelateBase(
   def copy(
       traitSet: RelTraitSet,
       newChild: RelNode,
-      projectProgram: Option[RexProgram],
       outputType: RelDataType): RelNode
 
   override def explainTerms(pw: RelWriter): RelWriter = {
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalPythonCorrelate.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalPythonCorrelate.scala
index 9a29323..1e08b4d 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalPythonCorrelate.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalPythonCorrelate.scala
@@ -27,7 +27,7 @@ import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.JoinRelType
-import org.apache.calcite.rex.{RexCall, RexNode, RexProgram}
+import org.apache.calcite.rex.{RexCall, RexNode}
 
 /**
   * Flink RelNode which matches along with join a python user defined table 
function.
@@ -36,7 +36,6 @@ class StreamPhysicalPythonCorrelate(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
     inputRel: RelNode,
-    projectProgram: Option[RexProgram],
     scan: FlinkLogicalTableFunctionScan,
     condition: Option[RexNode],
     outputRowType: RelDataType,
@@ -45,7 +44,6 @@ class StreamPhysicalPythonCorrelate(
     cluster,
     traitSet,
     inputRel,
-    projectProgram,
     scan,
     condition,
     outputRowType,
@@ -55,13 +53,11 @@ class StreamPhysicalPythonCorrelate(
   def copy(
       traitSet: RelTraitSet,
       newChild: RelNode,
-      projectProgram: Option[RexProgram],
       outputType: RelDataType): RelNode = {
     new StreamPhysicalPythonCorrelate(
       cluster,
       traitSet,
       newChild,
-      projectProgram,
       scan,
       condition,
       outputType,
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalConstantTableFunctionScanRule.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalConstantTableFunctionScanRule.scala
index fa83ab7..15e001e 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalConstantTableFunctionScanRule.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalConstantTableFunctionScanRule.scala
@@ -72,7 +72,6 @@ class BatchPhysicalConstantTableFunctionScanRule
       values,
       scan,
       None,
-      None,
       scan.getRowType,
       JoinRelType.INNER)
     call.transformTo(correlate)
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalCorrelateRule.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalCorrelateRule.scala
index bc94541..2c321e1 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalCorrelateRule.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalCorrelateRule.scala
@@ -77,7 +77,6 @@ class BatchPhysicalCorrelateRule extends ConverterRule(
             convInput,
             scan,
             condition,
-            None,
             rel.getRowType,
             join.getJoinType)
       }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalConstantTableFunctionScanRule.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalConstantTableFunctionScanRule.scala
index ed9edbf..2d8dd19 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalConstantTableFunctionScanRule.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalConstantTableFunctionScanRule.scala
@@ -70,7 +70,6 @@ class StreamPhysicalConstantTableFunctionScanRule
       cluster,
       traitSet,
       values,
-      None,
       scan,
       None,
       scan.getRowType,
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalCorrelateRule.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalCorrelateRule.scala
index 182ebeb..bc6972b 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalCorrelateRule.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalCorrelateRule.scala
@@ -92,7 +92,6 @@ class StreamPhysicalCorrelateRule
             rel.getCluster,
             traitSet,
             convInput,
-            None,
             scan,
             condition,
             rel.getRowType,

Reply via email to