This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d42aa454d54181e6a9102342ebc6d1668fe9bda9
Author: godfreyhe <[email protected]>
AuthorDate: Mon Dec 21 16:40:20 2020 +0800

    [FLINK-20690][table-planner-blink] Introduce BatchPhysicalCorrelate & 
BatchPhysicalPythonCorrelate, and make BatchExecCorrelate & 
BatchExecPythonCorrelate only extended from ExecNode
    
    This closes #14443
---
 .../plan/nodes/exec/batch/BatchExecCorrelate.java  | 58 +++++++++++++++++
 ....java => BatchPhysicalPythonCorrelateRule.java} | 20 +++---
 .../plan/metadata/FlinkRelMdColumnUniqueness.scala |  2 +-
 .../metadata/FlinkRelMdModifiedMonotonicity.scala  |  4 +-
 .../plan/metadata/FlinkRelMdUniqueGroups.scala     |  2 +-
 .../plan/metadata/FlinkRelMdUniqueKeys.scala       |  2 +-
 .../batch/BatchExecPythonCorrelate.scala           | 74 +++++++++-------------
 ...orrelate.scala => BatchPhysicalCorrelate.scala} | 38 ++++-------
 ...Base.scala => BatchPhysicalCorrelateBase.scala} | 13 +---
 ...te.scala => BatchPhysicalPythonCorrelate.scala} | 34 ++++------
 .../planner/plan/rules/FlinkBatchRuleSets.scala    |  6 +-
 ...tchPhysicalConstantTableFunctionScanRule.scala} | 18 +++---
 ...Rule.scala => BatchPhysicalCorrelateRule.scala} | 14 ++--
 .../metadata/MetadataHandlerConsistencyTest.scala  |  4 +-
 14 files changed, 150 insertions(+), 139 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecCorrelate.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecCorrelate.java
new file mode 100644
index 0000000..4495620
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecCorrelate.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.batch;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import 
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCorrelate;
+import org.apache.flink.table.runtime.operators.TableStreamOperator;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexProgram;
+
+import javax.annotation.Nullable;
+
+/**
+ * Batch exec node which matches along with join a Java/Scala user defined 
table function.
+ */
+public class BatchExecCorrelate extends CommonExecCorrelate implements 
BatchExecNode<RowData> {
+
+       public BatchExecCorrelate(
+                       FlinkJoinType joinType,
+                       @Nullable RexProgram project,
+                       RexCall invocation,
+                       @Nullable RexNode condition,
+                       ExecEdge inputEdge,
+                       RowType outputType,
+                       String description) {
+               super(
+                               joinType,
+                               project,
+                               invocation,
+                               condition,
+                               TableStreamOperator.class,
+                               false,
+                               inputEdge,
+                               outputType,
+                               description);
+       }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecPythonCorrelateRule.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalPythonCorrelateRule.java
similarity index 88%
rename from 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecPythonCorrelateRule.java
rename to 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalPythonCorrelateRule.java
index 4ecb1df..748b10d 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecPythonCorrelateRule.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalPythonCorrelateRule.java
@@ -22,7 +22,7 @@ import 
org.apache.flink.table.planner.plan.nodes.FlinkConventions;
 import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc;
 import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCorrelate;
 import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan;
-import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecPythonCorrelate;
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalPythonCorrelate;
 import org.apache.flink.table.planner.plan.utils.PythonUtil;
 
 import org.apache.calcite.plan.RelOptRule;
@@ -38,15 +38,15 @@ import scala.Some;
 
 /**
  * The physical rule is responsible for convert {@link FlinkLogicalCorrelate} 
to
- * {@link BatchExecPythonCorrelate}.
+ * {@link BatchPhysicalPythonCorrelate}.
  */
-public class BatchExecPythonCorrelateRule extends ConverterRule {
+public class BatchPhysicalPythonCorrelateRule extends ConverterRule {
 
-       public static final RelOptRule INSTANCE = new 
BatchExecPythonCorrelateRule();
+       public static final RelOptRule INSTANCE = new 
BatchPhysicalPythonCorrelateRule();
 
-       private BatchExecPythonCorrelateRule() {
+       private BatchPhysicalPythonCorrelateRule() {
                super(FlinkLogicalCorrelate.class, FlinkConventions.LOGICAL(), 
FlinkConventions.BATCH_PHYSICAL(),
-                       "BatchExecPythonCorrelateRule");
+                       "BatchPhysicalPythonCorrelateRule");
        }
 
        @Override
@@ -79,7 +79,7 @@ public class BatchExecPythonCorrelateRule extends 
ConverterRule {
        }
 
        /**
-        * The factory is responsible for creating {@link 
BatchExecPythonCorrelate}.
+        * The factory is responsible for creating {@link 
BatchPhysicalPythonCorrelate}.
         */
        private static class BatchExecPythonCorrelateFactory {
                private final FlinkLogicalCorrelate correlate;
@@ -95,11 +95,11 @@ public class BatchExecPythonCorrelateRule extends 
ConverterRule {
                        this.right = correlate.getInput(1);
                }
 
-               BatchExecPythonCorrelate convertToCorrelate() {
+               BatchPhysicalPythonCorrelate convertToCorrelate() {
                        return convertToCorrelate(right, Option.empty());
                }
 
-               private BatchExecPythonCorrelate convertToCorrelate(
+               private BatchPhysicalPythonCorrelate convertToCorrelate(
                        RelNode relNode,
                        Option<RexNode> condition) {
                        if (relNode instanceof RelSubset) {
@@ -112,7 +112,7 @@ public class BatchExecPythonCorrelateRule extends 
ConverterRule {
                                        
Some.apply(calc.getProgram().expandLocalRef(calc.getProgram().getCondition())));
                        } else {
                                FlinkLogicalTableFunctionScan scan = 
(FlinkLogicalTableFunctionScan) relNode;
-                               return new BatchExecPythonCorrelate(
+                               return new BatchPhysicalPythonCorrelate(
                                        correlate.getCluster(),
                                        traitSet,
                                        convInput,
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniqueness.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniqueness.scala
index 78a69f1..dde2d10 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniqueness.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniqueness.scala
@@ -622,7 +622,7 @@ class FlinkRelMdColumnUniqueness private extends 
MetadataHandler[BuiltInMetadata
   }
 
   def areColumnsUnique(
-      rel: BatchExecCorrelate,
+      rel: BatchPhysicalCorrelate,
       mq: RelMetadataQuery,
       columns: ImmutableBitSet,
       ignoreNulls: Boolean): JBoolean = null
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicity.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicity.scala
index 250c169..60c2d1e 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicity.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicity.scala
@@ -25,7 +25,7 @@ import 
org.apache.flink.table.planner.plan.`trait`.RelModifiedMonotonicity
 import 
org.apache.flink.table.planner.plan.metadata.FlinkMetadata.ModifiedMonotonicity
 import org.apache.flink.table.planner.plan.nodes.calcite.{Expand, Rank, 
TableAggregate, WindowAggregate, WindowTableAggregate}
 import org.apache.flink.table.planner.plan.nodes.logical._
-import 
org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchExecCorrelate, 
BatchExecGroupAggregateBase}
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchExecGroupAggregateBase,
 BatchPhysicalCorrelate}
 import org.apache.flink.table.planner.plan.nodes.physical.stream._
 import org.apache.flink.table.planner.plan.schema.{FlinkPreparingTableBase, 
TableSourceTable}
 import org.apache.flink.table.planner.plan.stats.{WithLower, WithUpper}
@@ -502,7 +502,7 @@ class FlinkRelMdModifiedMonotonicity private extends 
MetadataHandler[ModifiedMon
   }
 
   def getRelModifiedMonotonicity(
-      rel: BatchExecCorrelate,
+      rel: BatchPhysicalCorrelate,
       mq: RelMetadataQuery): RelModifiedMonotonicity = null
 
   def getRelModifiedMonotonicity(
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueGroups.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueGroups.scala
index fa3c6d6..8a06bbb 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueGroups.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueGroups.scala
@@ -390,7 +390,7 @@ class FlinkRelMdUniqueGroups private extends 
MetadataHandler[UniqueGroups] {
       columns: ImmutableBitSet): ImmutableBitSet = columns
 
   def getUniqueGroups(
-      rel: BatchExecCorrelate,
+      rel: BatchPhysicalCorrelate,
       mq: RelMetadataQuery,
       columns: ImmutableBitSet): ImmutableBitSet = columns
 
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala
index 25768e5..f68a349 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala
@@ -576,7 +576,7 @@ class FlinkRelMdUniqueKeys private extends 
MetadataHandler[BuiltInMetadata.Uniqu
       ignoreNulls: Boolean): util.Set[ImmutableBitSet] = null
 
   def getUniqueKeys(
-      rel: BatchExecCorrelate,
+      rel: BatchPhysicalCorrelate,
       mq: RelMetadataQuery,
       ignoreNulls: Boolean): util.Set[ImmutableBitSet] = null
 
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecPythonCorrelate.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonCorrelate.scala
similarity index 51%
copy from 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecPythonCorrelate.scala
copy to 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonCorrelate.scala
index f9e6c29..5f6501e 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecPythonCorrelate.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonCorrelate.scala
@@ -15,70 +15,54 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.table.planner.plan.nodes.physical.batch
+package org.apache.flink.table.planner.plan.nodes.exec.batch
 
 import org.apache.flink.api.dag.Transformation
 import org.apache.flink.core.memory.ManagedMemoryUseCase
+import org.apache.flink.table.api.TableException
 import org.apache.flink.table.data.RowData
-import org.apache.flink.table.planner.calcite.FlinkTypeFactory
-import org.apache.flink.table.planner.delegation.BatchPlanner
+import org.apache.flink.table.planner.delegation.PlannerBase
 import org.apache.flink.table.planner.plan.nodes.common.CommonPythonCorrelate
-import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan
+import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, ExecNodeBase}
+import org.apache.flink.table.types.logical.RowType
 
-import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.core.{Correlate, JoinRelType}
-import org.apache.calcite.rex.{RexCall, RexNode, RexProgram}
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex.{RexCall, RexNode}
+
+import java.util.Collections
 
 /**
-  * Batch physical RelNode for [[Correlate]] (Python user defined table 
function).
-  */
+ * Batch exec node which matches along with join a python user defined table 
function.
+ *
+ * <p>Note: This class can't be ported to Java,
+ * because java class can't extend scala interface with default implementation.
+ * FLINK-20693 will port this class to Java.
+ *
+ * <p>TODO change JoinRelType to FlinkJoinType
+ */
 class BatchExecPythonCorrelate(
-    cluster: RelOptCluster,
-    traitSet: RelTraitSet,
-    inputRel: RelNode,
-    scan: FlinkLogicalTableFunctionScan,
+    joinType: JoinRelType,
+    invocation: RexCall,
     condition: Option[RexNode],
-    projectProgram: Option[RexProgram],
-    outputRowType: RelDataType,
-    joinType: JoinRelType)
-  extends BatchExecCorrelateBase(
-    cluster,
-    traitSet,
-    inputRel,
-    scan,
-    condition,
-    projectProgram,
-    outputRowType,
-    joinType)
+    inputEdge: ExecEdge,
+    outputType: RowType,
+    description: String)
+  extends ExecNodeBase[RowData](Collections.singletonList(inputEdge), 
outputType, description)
+  with BatchExecNode[RowData]
   with CommonPythonCorrelate {
 
-  def copy(
-      traitSet: RelTraitSet,
-      child: RelNode,
-      projectProgram: Option[RexProgram],
-      outputType: RelDataType): RelNode = {
-    new BatchExecPythonCorrelate(
-      cluster,
-      traitSet,
-      child,
-      scan,
-      condition,
-      projectProgram,
-      outputType,
-      joinType)
+  if (joinType == JoinRelType.LEFT && condition.isDefined) {
+    throw new TableException("Currently Python correlate does not support 
conditions in left join.")
   }
 
-  override protected def translateToPlanInternal(
-      planner: BatchPlanner): Transformation[RowData] = {
+  override protected def translateToPlanInternal(planner: PlannerBase): 
Transformation[RowData] = {
     val inputTransformation = getInputNodes.get(0).translateToPlan(planner)
       .asInstanceOf[Transformation[RowData]]
     val ret = createPythonOneInputTransformation(
       inputTransformation,
-      scan.getCall.asInstanceOf[RexCall],
+      invocation,
       "BatchExecPythonCorrelate",
-      FlinkTypeFactory.toLogicalRowType(outputRowType),
+      getOutputType.asInstanceOf[RowType],
       getConfig(planner.getExecEnv, planner.getTableConfig),
       joinType)
     if 
(isPythonWorkerUsingManagedMemory(planner.getTableConfig.getConfiguration)) {
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCorrelate.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalCorrelate.scala
similarity index 69%
rename from 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCorrelate.scala
rename to 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalCorrelate.scala
index 405b918..6107acb 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCorrelate.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalCorrelate.scala
@@ -17,11 +17,9 @@
  */
 package org.apache.flink.table.planner.plan.nodes.physical.batch
 
-import org.apache.flink.api.dag.Transformation
-import org.apache.flink.table.data.RowData
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
-import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, 
CorrelateCodeGenerator}
-import org.apache.flink.table.planner.delegation.BatchPlanner
+import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecCorrelate
+import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, ExecNode}
 import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan
 import org.apache.flink.table.planner.plan.utils.JoinTypeUtil
 
@@ -34,7 +32,7 @@ import org.apache.calcite.rex.{RexCall, RexNode, RexProgram}
 /**
   * Batch physical RelNode for [[Correlate]] (Java/Scala user defined table 
function).
   */
-class BatchExecCorrelate(
+class BatchPhysicalCorrelate(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
     inputRel: RelNode,
@@ -43,7 +41,7 @@ class BatchExecCorrelate(
     projectProgram: Option[RexProgram],
     outputRowType: RelDataType,
     joinType: JoinRelType)
-  extends BatchExecCorrelateBase(
+  extends BatchPhysicalCorrelateBase(
     cluster,
     traitSet,
     inputRel,
@@ -58,7 +56,7 @@ class BatchExecCorrelate(
       child: RelNode,
       projectProgram: Option[RexProgram],
       outputType: RelDataType): RelNode = {
-    new BatchExecCorrelate(
+    new BatchPhysicalCorrelate(
       cluster,
       traitSet,
       child,
@@ -69,26 +67,14 @@ class BatchExecCorrelate(
       joinType)
   }
 
-  override protected def translateToPlanInternal(
-      planner: BatchPlanner): Transformation[RowData] = {
-    val config = planner.getTableConfig
-    val inputTransformation = getInputNodes.get(0).translateToPlan(planner)
-      .asInstanceOf[Transformation[RowData]]
-    val operatorCtx = CodeGeneratorContext(config)
-    CorrelateCodeGenerator.generateCorrelateTransformation(
-      config,
-      operatorCtx,
-      inputTransformation,
-      FlinkTypeFactory.toLogicalRowType(input.getRowType),
-      projectProgram,
-      scan.getCall.asInstanceOf[RexCall],
-      condition,
-      FlinkTypeFactory.toLogicalRowType(outputRowType),
+  override def translateToExecNode(): ExecNode[_] = {
+    new BatchExecCorrelate(
       JoinTypeUtil.getFlinkJoinType(joinType),
-      inputTransformation.getParallelism,
-      retainHeader = false,
-      "BatchExecCorrelate",
+      projectProgram.orNull,
+      scan.getCall.asInstanceOf[RexCall],
+      condition.orNull,
+      ExecEdge.DEFAULT,
+      FlinkTypeFactory.toLogicalRowType(getRowType),
       getRelDetailedDescription)
   }
-
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCorrelateBase.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalCorrelateBase.scala
similarity index 94%
rename from 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCorrelateBase.scala
rename to 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalCorrelateBase.scala
index 3b98cd8..0462e30 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCorrelateBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalCorrelateBase.scala
@@ -18,9 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.physical.batch
 
-import org.apache.flink.table.data.RowData
 import org.apache.flink.table.planner.plan.`trait`.{FlinkRelDistribution, 
FlinkRelDistributionTraitDef, TraitUtil}
-import org.apache.flink.table.planner.plan.nodes.exec.{LegacyBatchExecNode, 
ExecEdge}
 import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan
 import org.apache.flink.table.planner.plan.utils.RelExplainUtil
 
@@ -32,14 +30,12 @@ import org.apache.calcite.rex.{RexCall, RexInputRef, 
RexNode, RexProgram}
 import org.apache.calcite.sql.SqlKind
 import org.apache.calcite.util.mapping.{Mapping, MappingType, Mappings}
 
-import java.util
-
 import scala.collection.JavaConversions._
 
 /**
   * Base Batch physical RelNode for [[Correlate]] (user defined table 
function).
   */
-abstract class BatchExecCorrelateBase(
+abstract class BatchPhysicalCorrelateBase(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
     inputRel: RelNode,
@@ -49,8 +45,7 @@ abstract class BatchExecCorrelateBase(
     outputRowType: RelDataType,
     joinType: JoinRelType)
   extends SingleRel(cluster, traitSet, inputRel)
-  with BatchPhysicalRel
-  with LegacyBatchExecNode[RowData] {
+  with BatchPhysicalRel {
 
   require(joinType == JoinRelType.INNER || joinType == JoinRelType.LEFT)
 
@@ -153,8 +148,4 @@ abstract class BatchExecCorrelateBase(
     val newInput = RelOptRule.convert(getInput, inputRequiredTraits)
     Some(copy(providedTraits, Seq(newInput)))
   }
-
-  //~ ExecNode methods 
-----------------------------------------------------------
-
-  override def getInputEdges: util.List[ExecEdge] = List(ExecEdge.DEFAULT)
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecPythonCorrelate.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalPythonCorrelate.scala
similarity index 70%
rename from 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecPythonCorrelate.scala
rename to 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalPythonCorrelate.scala
index f9e6c29..1732472 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecPythonCorrelate.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalPythonCorrelate.scala
@@ -17,12 +17,10 @@
  */
 package org.apache.flink.table.planner.plan.nodes.physical.batch
 
-import org.apache.flink.api.dag.Transformation
-import org.apache.flink.core.memory.ManagedMemoryUseCase
-import org.apache.flink.table.data.RowData
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
-import org.apache.flink.table.planner.delegation.BatchPlanner
 import org.apache.flink.table.planner.plan.nodes.common.CommonPythonCorrelate
+import 
org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecPythonCorrelate
+import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, ExecNode}
 import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan
 
 import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
@@ -34,7 +32,7 @@ import org.apache.calcite.rex.{RexCall, RexNode, RexProgram}
 /**
   * Batch physical RelNode for [[Correlate]] (Python user defined table 
function).
   */
-class BatchExecPythonCorrelate(
+class BatchPhysicalPythonCorrelate(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
     inputRel: RelNode,
@@ -43,7 +41,7 @@ class BatchExecPythonCorrelate(
     projectProgram: Option[RexProgram],
     outputRowType: RelDataType,
     joinType: JoinRelType)
-  extends BatchExecCorrelateBase(
+  extends BatchPhysicalCorrelateBase(
     cluster,
     traitSet,
     inputRel,
@@ -59,7 +57,7 @@ class BatchExecPythonCorrelate(
       child: RelNode,
       projectProgram: Option[RexProgram],
       outputType: RelDataType): RelNode = {
-    new BatchExecPythonCorrelate(
+    new BatchPhysicalPythonCorrelate(
       cluster,
       traitSet,
       child,
@@ -70,20 +68,14 @@ class BatchExecPythonCorrelate(
       joinType)
   }
 
-  override protected def translateToPlanInternal(
-      planner: BatchPlanner): Transformation[RowData] = {
-    val inputTransformation = getInputNodes.get(0).translateToPlan(planner)
-      .asInstanceOf[Transformation[RowData]]
-    val ret = createPythonOneInputTransformation(
-      inputTransformation,
+  override def translateToExecNode(): ExecNode[_] = {
+    new BatchExecPythonCorrelate(
+      joinType,
       scan.getCall.asInstanceOf[RexCall],
-      "BatchExecPythonCorrelate",
-      FlinkTypeFactory.toLogicalRowType(outputRowType),
-      getConfig(planner.getExecEnv, planner.getTableConfig),
-      joinType)
-    if 
(isPythonWorkerUsingManagedMemory(planner.getTableConfig.getConfiguration)) {
-      ret.declareManagedMemoryUseCaseAtSlotScope(ManagedMemoryUseCase.PYTHON)
-    }
-    ret
+      condition,
+      ExecEdge.DEFAULT,
+      FlinkTypeFactory.toLogicalRowType(getRowType),
+      getRelDetailedDescription
+    )
   }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
index c0c7a58..8adf7f5 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala
@@ -431,9 +431,9 @@ object FlinkBatchRuleSets {
     BatchExecLookupJoinRule.SNAPSHOT_ON_TABLESCAN,
     BatchExecLookupJoinRule.SNAPSHOT_ON_CALC_TABLESCAN,
     // correlate
-    BatchExecConstantTableFunctionScanRule.INSTANCE,
-    BatchExecCorrelateRule.INSTANCE,
-    BatchExecPythonCorrelateRule.INSTANCE,
+    BatchPhysicalConstantTableFunctionScanRule.INSTANCE,
+    BatchPhysicalCorrelateRule.INSTANCE,
+    BatchPhysicalPythonCorrelateRule.INSTANCE,
     // sink
     BatchExecSinkRule.INSTANCE,
     BatchExecLegacySinkRule.INSTANCE
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecConstantTableFunctionScanRule.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalConstantTableFunctionScanRule.scala
similarity index 84%
rename from 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecConstantTableFunctionScanRule.scala
rename to 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalConstantTableFunctionScanRule.scala
index 6a1ba07..fa83ab7 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecConstantTableFunctionScanRule.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalConstantTableFunctionScanRule.scala
@@ -20,7 +20,7 @@ package 
org.apache.flink.table.planner.plan.rules.physical.batch
 
 import org.apache.flink.table.planner.plan.nodes.FlinkConventions
 import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan
-import 
org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchExecCorrelate, 
BatchExecValues}
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchPhysicalCorrelate,
 BatchExecValues}
 
 import com.google.common.collect.ImmutableList
 import org.apache.calcite.plan.RelOptRule._
@@ -31,7 +31,7 @@ import org.apache.calcite.rex.{RexLiteral, RexUtil}
 /**
   * Converts [[FlinkLogicalTableFunctionScan]] with constant RexCall to
   * {{{
-  *                    [[BatchExecCorrelate]]
+  *                    [[BatchPhysicalCorrelate]]
   *                          /       \
   * empty [[BatchExecValues]]  [[FlinkLogicalTableFunctionScan]]
   * }}}
@@ -39,15 +39,15 @@ import org.apache.calcite.rex.{RexLiteral, RexUtil}
   * Add the rule to support select from a UDF directly, such as the following 
SQL:
   * SELECT * FROM LATERAL TABLE(func()) as T(c)
   *
-  * Note: [[BatchExecCorrelateRule]] is responsible for converting a 
reasonable physical plan for
-  * the normal correlate query, such as the following SQL:
+  * Note: [[BatchPhysicalCorrelateRule]] is responsible for converting a 
reasonable physical plan
+  * for the normal correlate query, such as the following SQL:
   * example1: SELECT * FROM T, LATERAL TABLE(func()) as T(c)
   * example2: SELECT a, c FROM T, LATERAL TABLE(func(a)) as T(c)
   */
-class BatchExecConstantTableFunctionScanRule
+class BatchPhysicalConstantTableFunctionScanRule
   extends RelOptRule(
     operand(classOf[FlinkLogicalTableFunctionScan], any),
-    "BatchExecTableFunctionScanRule") {
+    "BatchPhysicalConstantTableFunctionScanRule") {
 
   override def matches(call: RelOptRuleCall): Boolean = {
     val scan: FlinkLogicalTableFunctionScan = call.rel(0)
@@ -66,7 +66,7 @@ class BatchExecConstantTableFunctionScanRule
       ImmutableList.of(ImmutableList.of[RexLiteral]()),
       cluster.getTypeFactory.createStructType(ImmutableList.of(), 
ImmutableList.of()))
 
-    val correlate = new BatchExecCorrelate(
+    val correlate = new BatchPhysicalCorrelate(
       cluster,
       traitSet,
       values,
@@ -80,6 +80,6 @@ class BatchExecConstantTableFunctionScanRule
 
 }
 
-object BatchExecConstantTableFunctionScanRule {
-  val INSTANCE = new BatchExecConstantTableFunctionScanRule
+object BatchPhysicalConstantTableFunctionScanRule {
+  val INSTANCE = new BatchPhysicalConstantTableFunctionScanRule
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecCorrelateRule.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalCorrelateRule.scala
similarity index 92%
rename from 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecCorrelateRule.scala
rename to 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalCorrelateRule.scala
index 840ade1..bc94541 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecCorrelateRule.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalCorrelateRule.scala
@@ -20,7 +20,7 @@ package 
org.apache.flink.table.planner.plan.rules.physical.batch
 
 import org.apache.flink.table.planner.plan.nodes.FlinkConventions
 import org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalCalc, 
FlinkLogicalCorrelate, FlinkLogicalTableFunctionScan}
-import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecCorrelate
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalCorrelate
 import org.apache.flink.table.planner.plan.utils.PythonUtil
 
 import org.apache.calcite.plan.volcano.RelSubset
@@ -29,11 +29,11 @@ import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
 import org.apache.calcite.rex.RexNode
 
-class BatchExecCorrelateRule extends ConverterRule(
+class BatchPhysicalCorrelateRule extends ConverterRule(
   classOf[FlinkLogicalCorrelate],
   FlinkConventions.LOGICAL,
   FlinkConventions.BATCH_PHYSICAL,
-  "BatchExecCorrelateRule") {
+  "BatchPhysicalCorrelateRule") {
 
   override def matches(call: RelOptRuleCall): Boolean = {
     val join = call.rel(0).asInstanceOf[FlinkLogicalCorrelate]
@@ -60,7 +60,7 @@ class BatchExecCorrelateRule extends ConverterRule(
     val convInput: RelNode = RelOptRule.convert(join.getInput(0), 
FlinkConventions.BATCH_PHYSICAL)
     val right: RelNode = join.getInput(1)
 
-    def convertToCorrelate(relNode: RelNode, condition: Option[RexNode]): 
BatchExecCorrelate = {
+    def convertToCorrelate(relNode: RelNode, condition: Option[RexNode]): 
BatchPhysicalCorrelate = {
       relNode match {
         case rel: RelSubset =>
           convertToCorrelate(rel.getRelList.get(0), condition)
@@ -71,7 +71,7 @@ class BatchExecCorrelateRule extends ConverterRule(
             Some(calc.getProgram.expandLocalRef(calc.getProgram.getCondition)))
 
         case scan: FlinkLogicalTableFunctionScan =>
-          new BatchExecCorrelate(
+          new BatchPhysicalCorrelate(
             rel.getCluster,
             traitSet,
             convInput,
@@ -86,6 +86,6 @@ class BatchExecCorrelateRule extends ConverterRule(
   }
 }
 
-object BatchExecCorrelateRule {
-  val INSTANCE: RelOptRule = new BatchExecCorrelateRule
+object BatchPhysicalCorrelateRule {
+  val INSTANCE: RelOptRule = new BatchPhysicalCorrelateRule
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataHandlerConsistencyTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataHandlerConsistencyTest.scala
index 1176383..6eaadac 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataHandlerConsistencyTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataHandlerConsistencyTest.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.planner.plan.metadata
 
-import 
org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchExecCorrelate, 
BatchExecGroupAggregateBase}
+import 
org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchExecGroupAggregateBase,
 BatchPhysicalCorrelate}
 
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.core.{Aggregate, Correlate}
@@ -145,6 +145,6 @@ object MetadataHandlerConsistencyTest {
   def parameters(): util.Collection[Array[Any]] = {
     Seq[Array[Any]](
       Array(classOf[Aggregate], classOf[BatchExecGroupAggregateBase]),
-      Array(classOf[Correlate], classOf[BatchExecCorrelate]))
+      Array(classOf[Correlate], classOf[BatchPhysicalCorrelate]))
   }
 }

Reply via email to