This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c25870bb43f03941b91c5fd3bcdfd5e22b010ea4
Author: godfreyhe <[email protected]>
AuthorDate: Mon Dec 21 15:50:20 2020 +0800

    [FLINK-20690][table-planner-blink] Introduce StreamPhysicalCorrelate & 
StreamPhysicalPythonCorrelate, and make StreamExecCorrelate & 
StreamExecPythonCorrelate only extended from ExecNode
    
    This closes #14443
---
 .../nodes/exec/common/CommonExecCorrelate.java     | 102 +++++++++++++++++++++
 .../nodes/exec/stream/StreamExecCorrelate.java     |  59 ++++++++++++
 .../logical/CalcPythonCorrelateTransposeRule.java  |  10 +-
 .../rules/logical/PythonCorrelateSplitRule.java    |   8 +-
 ...java => StreamPhysicalPythonCorrelateRule.java} |  24 ++---
 .../planner/codegen/CorrelateCodeGenerator.scala   |  41 +++------
 .../plan/metadata/FlinkRelMdColumnUniqueness.scala |   2 +-
 .../metadata/FlinkRelMdModifiedMonotonicity.scala  |   2 +-
 .../plan/nodes/common/CommonPythonCorrelate.scala  |  16 ++--
 .../stream/StreamExecPythonCorrelate.scala         |  72 ++++++---------
 .../nodes/physical/batch/BatchExecCorrelate.scala  |  13 +--
 .../physical/batch/BatchExecPythonCorrelate.scala  |   7 +-
 ...rrelate.scala => StreamPhysicalCorrelate.scala} |  55 ++++-------
 ...ase.scala => StreamPhysicalCorrelateBase.scala} |   7 +-
 ...e.scala => StreamPhysicalPythonCorrelate.scala} |  44 ++++-----
 .../FlinkChangelogModeInferenceProgram.scala       |   6 +-
 .../planner/plan/rules/FlinkStreamRuleSets.scala   |   6 +-
 .../SplitPythonConditionFromCorrelateRule.scala    |  15 +--
 ...eamPhysicalConstantTableFunctionScanRule.scala} |  18 ++--
 ...ule.scala => StreamPhysicalCorrelateRule.scala} |  29 ++++--
 20 files changed, 320 insertions(+), 216 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecCorrelate.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecCorrelate.java
new file mode 100644
index 0000000..1ec1ccb
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecCorrelate.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.common;
+
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
+import org.apache.flink.table.planner.codegen.CorrelateCodeGenerator;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexProgram;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.Optional;
+
+/**
+ * Base {@link ExecNode} which matches along with join a Java/Scala user 
defined table function.
+ */
+public abstract class CommonExecCorrelate extends ExecNodeBase<RowData> {
+       private final FlinkJoinType joinType;
+       @Nullable
+       private final RexProgram project;
+       private final RexCall invocation;
+       @Nullable
+       private final RexNode condition;
+       private final Class<?> operatorBaseClass;
+       private final boolean retainHeader;
+
+       public CommonExecCorrelate(
+                       FlinkJoinType joinType,
+                       @Nullable RexProgram project,
+                       RexCall invocation,
+                       @Nullable RexNode condition,
+                       Class<?> operatorBaseClass,
+                       boolean retainHeader,
+                       ExecEdge inputEdge,
+                       RowType outputType,
+                       String description) {
+               super(Collections.singletonList(inputEdge), outputType, 
description);
+               this.joinType = joinType;
+               this.project = project;
+               this.invocation = invocation;
+               this.condition = condition;
+               this.operatorBaseClass = operatorBaseClass;
+               this.retainHeader = retainHeader;
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       protected Transformation<RowData> translateToPlanInternal(PlannerBase 
planner) {
+               final ExecNode<RowData> inputNode = (ExecNode<RowData>) 
getInputNodes().get(0);
+               final Transformation<RowData> inputTransform = 
inputNode.translateToPlan(planner);
+               final CodeGeneratorContext ctx = new 
CodeGeneratorContext(planner.getTableConfig())
+                               .setOperatorBaseClass(operatorBaseClass);
+               final Transformation<RowData> transform = 
CorrelateCodeGenerator.generateCorrelateTransformation(
+                               planner.getTableConfig(),
+                               ctx,
+                               inputTransform,
+                               (RowType) inputNode.getOutputType(),
+                               
JavaScalaConversionUtil.toScala(Optional.ofNullable(project)),
+                               invocation,
+                               
JavaScalaConversionUtil.toScala(Optional.ofNullable(condition)),
+                               (RowType) getOutputType(),
+                               joinType,
+                               inputTransform.getParallelism(),
+                               retainHeader,
+                               getClass().getSimpleName(),
+                               getDesc());
+
+               if (inputsContainSingleton()) {
+                       transform.setParallelism(1);
+                       transform.setMaxParallelism(1);
+               }
+               return transform;
+       }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecCorrelate.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecCorrelate.java
new file mode 100644
index 0000000..5270af3
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecCorrelate.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import 
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCorrelate;
+import org.apache.flink.table.runtime.operators.AbstractProcessStreamOperator;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexProgram;
+
+import javax.annotation.Nullable;
+
+/**
+ * Stream {@link ExecNode} which matches along with join a Java/Scala user 
defined table function.
+ */
+public class StreamExecCorrelate extends CommonExecCorrelate implements 
StreamExecNode<RowData> {
+
+       public StreamExecCorrelate(
+                       FlinkJoinType joinType,
+                       @Nullable RexProgram project,
+                       RexCall invocation,
+                       @Nullable RexNode condition,
+                       ExecEdge inputEdge,
+                       RowType outputType,
+                       String description) {
+               super(
+                               joinType,
+                               project,
+                               invocation,
+                               condition,
+                               AbstractProcessStreamOperator.class,
+                               true,
+                               inputEdge,
+                               outputType,
+                               description);
+       }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/CalcPythonCorrelateTransposeRule.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/CalcPythonCorrelateTransposeRule.java
index 5501949..701b140 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/CalcPythonCorrelateTransposeRule.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/CalcPythonCorrelateTransposeRule.java
@@ -22,7 +22,7 @@ import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc;
 import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCorrelate;
 import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalRel;
 import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan;
-import 
org.apache.flink.table.planner.plan.rules.physical.stream.StreamExecCorrelateRule;
+import 
org.apache.flink.table.planner.plan.rules.physical.stream.StreamPhysicalCorrelateRule;
 import org.apache.flink.table.planner.plan.utils.PythonUtil;
 
 import org.apache.calcite.plan.RelOptRule;
@@ -58,8 +58,8 @@ public class CalcPythonCorrelateTransposeRule extends 
RelOptRule {
                FlinkLogicalCorrelate correlate = call.rel(0);
                FlinkLogicalCalc right = call.rel(2);
                JoinRelType joinType = correlate.getJoinType();
-               FlinkLogicalCalc mergedCalc = 
StreamExecCorrelateRule.getMergedCalc(right);
-               FlinkLogicalTableFunctionScan scan = 
StreamExecCorrelateRule.getTableScan(mergedCalc);
+               FlinkLogicalCalc mergedCalc = 
StreamPhysicalCorrelateRule.getMergedCalc(right);
+               FlinkLogicalTableFunctionScan scan = 
StreamPhysicalCorrelateRule.getTableScan(mergedCalc);
                return joinType == JoinRelType.INNER &&
                        PythonUtil.isPythonCall(scan.getCall(), null) &&
                        mergedCalc.getProgram().getCondition() != null;
@@ -70,8 +70,8 @@ public class CalcPythonCorrelateTransposeRule extends 
RelOptRule {
                FlinkLogicalCorrelate correlate = call.rel(0);
                FlinkLogicalCalc right = call.rel(2);
                RexBuilder rexBuilder = call.builder().getRexBuilder();
-               FlinkLogicalCalc mergedCalc = 
StreamExecCorrelateRule.getMergedCalc(right);
-               FlinkLogicalTableFunctionScan tableScan = 
StreamExecCorrelateRule.getTableScan(mergedCalc);
+               FlinkLogicalCalc mergedCalc = 
StreamPhysicalCorrelateRule.getMergedCalc(right);
+               FlinkLogicalTableFunctionScan tableScan = 
StreamPhysicalCorrelateRule.getTableScan(mergedCalc);
                RexProgram mergedCalcProgram = mergedCalc.getProgram();
 
                InputRefRewriter inputRefRewriter = new InputRefRewriter(
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PythonCorrelateSplitRule.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PythonCorrelateSplitRule.java
index f935459..f23c503 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PythonCorrelateSplitRule.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PythonCorrelateSplitRule.java
@@ -21,7 +21,7 @@ package org.apache.flink.table.planner.plan.rules.logical;
 import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc;
 import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCorrelate;
 import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan;
-import 
org.apache.flink.table.planner.plan.rules.physical.stream.StreamExecCorrelateRule;
+import 
org.apache.flink.table.planner.plan.rules.physical.stream.StreamPhysicalCorrelateRule;
 import org.apache.flink.table.planner.plan.utils.PythonUtil;
 
 import org.apache.calcite.plan.RelOptRule;
@@ -88,7 +88,7 @@ public class PythonCorrelateSplitRule extends RelOptRule {
                if (right instanceof FlinkLogicalTableFunctionScan) {
                        tableFunctionScan = (FlinkLogicalTableFunctionScan) 
right;
                } else if (right instanceof FlinkLogicalCalc) {
-                       tableFunctionScan = 
StreamExecCorrelateRule.getTableScan((FlinkLogicalCalc) right);
+                       tableFunctionScan = 
StreamPhysicalCorrelateRule.getTableScan((FlinkLogicalCalc) right);
                } else {
                        return false;
                }
@@ -237,8 +237,8 @@ public class PythonCorrelateSplitRule extends RelOptRule {
                                        scan.getCall()));
                } else {
                        FlinkLogicalCalc calc = (FlinkLogicalCalc) right;
-                       FlinkLogicalTableFunctionScan scan = 
StreamExecCorrelateRule.getTableScan(calc);
-                       FlinkLogicalCalc mergedCalc = 
StreamExecCorrelateRule.getMergedCalc(calc);
+                       FlinkLogicalTableFunctionScan scan = 
StreamPhysicalCorrelateRule.getTableScan(calc);
+                       FlinkLogicalCalc mergedCalc = 
StreamPhysicalCorrelateRule.getMergedCalc(calc);
                        FlinkLogicalTableFunctionScan newScan = 
createNewScan(scan,
                                createScalarFunctionSplitter(
                                        null,
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecPythonCorrelateRule.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonCorrelateRule.java
similarity index 86%
rename from 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecPythonCorrelateRule.java
rename to 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonCorrelateRule.java
index 8c2b12e..b59bbf2 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecPythonCorrelateRule.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonCorrelateRule.java
@@ -22,7 +22,7 @@ import 
org.apache.flink.table.planner.plan.nodes.FlinkConventions;
 import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc;
 import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCorrelate;
 import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan;
-import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCorrelate;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalPythonCorrelate;
 import org.apache.flink.table.planner.plan.utils.PythonUtil;
 
 import org.apache.calcite.plan.RelOptRule;
@@ -38,15 +38,15 @@ import scala.Some;
 
 /**
  * The physical rule is responsible for convert {@link FlinkLogicalCorrelate} 
to
- * {@link StreamExecPythonCorrelate}.
+ * {@link StreamPhysicalPythonCorrelate}.
  */
-public class StreamExecPythonCorrelateRule extends ConverterRule {
+public class StreamPhysicalPythonCorrelateRule extends ConverterRule {
 
-       public static final RelOptRule INSTANCE = new 
StreamExecPythonCorrelateRule();
+       public static final RelOptRule INSTANCE = new 
StreamPhysicalPythonCorrelateRule();
 
-       private StreamExecPythonCorrelateRule() {
+       private StreamPhysicalPythonCorrelateRule() {
                super(FlinkLogicalCorrelate.class, FlinkConventions.LOGICAL(), 
FlinkConventions.STREAM_PHYSICAL(),
-                       "StreamExecPythonCorrelateRule");
+                       "StreamPhysicalPythonCorrelateRule");
        }
 
        // find only calc and table function
@@ -85,7 +85,7 @@ public class StreamExecPythonCorrelateRule extends 
ConverterRule {
        }
 
        /**
-        * The factory is responsible for creating {@link 
StreamExecPythonCorrelate}.
+        * The factory is responsible for creating {@link 
StreamPhysicalPythonCorrelate}.
         */
        private static class StreamExecPythonCorrelateFactory {
                private final FlinkLogicalCorrelate correlate;
@@ -101,11 +101,11 @@ public class StreamExecPythonCorrelateRule extends 
ConverterRule {
                        this.right = correlate.getInput(1);
                }
 
-               StreamExecPythonCorrelate convertToCorrelate() {
+               StreamPhysicalPythonCorrelate convertToCorrelate() {
                        return convertToCorrelate(right, Option.empty());
                }
 
-               private StreamExecPythonCorrelate convertToCorrelate(
+               private StreamPhysicalPythonCorrelate convertToCorrelate(
                        RelNode relNode,
                        Option<RexNode> condition) {
                        if (relNode instanceof RelSubset) {
@@ -113,14 +113,14 @@ public class StreamExecPythonCorrelateRule extends 
ConverterRule {
                                return 
convertToCorrelate(rel.getRelList().get(0), condition);
                        } else if (relNode instanceof FlinkLogicalCalc) {
                                FlinkLogicalCalc calc = (FlinkLogicalCalc) 
relNode;
-                               RelNode tableScan = 
StreamExecCorrelateRule.getTableScan(calc);
-                               FlinkLogicalCalc newCalc = 
StreamExecCorrelateRule.getMergedCalc(calc);
+                               RelNode tableScan = 
StreamPhysicalCorrelateRule.getTableScan(calc);
+                               FlinkLogicalCalc newCalc = 
StreamPhysicalCorrelateRule.getMergedCalc(calc);
                                return convertToCorrelate(
                                        tableScan,
                                        
Some.apply(newCalc.getProgram().expandLocalRef(newCalc.getProgram().getCondition())));
                        } else {
                                FlinkLogicalTableFunctionScan scan = 
(FlinkLogicalTableFunctionScan) relNode;
-                               return new StreamExecPythonCorrelate(
+                               return new StreamPhysicalPythonCorrelate(
                                        correlate.getCluster(),
                                        traitSet,
                                        convInput,
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala
index 1976237..000cefb 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala
@@ -20,7 +20,6 @@ package org.apache.flink.table.planner.codegen
 
 import org.apache.flink.api.common.functions.Function
 import org.apache.flink.api.dag.Transformation
-import org.apache.flink.streaming.api.functions.ProcessFunction
 import org.apache.flink.table.api.{TableConfig, TableException, 
ValidationException}
 import org.apache.flink.table.data.utils.JoinedRowData
 import org.apache.flink.table.data.{GenericRowData, RowData}
@@ -30,45 +29,37 @@ import org.apache.flink.table.planner.codegen.CodeGenUtils._
 import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction
 import org.apache.flink.table.planner.functions.utils.TableSqlFunction
 import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil
-import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan
 import org.apache.flink.table.runtime.operators.CodeGenOperatorFactory
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo
 import org.apache.flink.table.runtime.util.StreamRecordCollector
 import org.apache.flink.table.types.logical.RowType
 
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.core.JoinRelType
 import org.apache.calcite.rex._
 
 import scala.collection.JavaConversions._
 
 object CorrelateCodeGenerator {
 
-  private[flink] def generateCorrelateTransformation(
+  def generateCorrelateTransformation(
       config: TableConfig,
       operatorCtx: CodeGeneratorContext,
       inputTransformation: Transformation[RowData],
-      inputRelType: RelDataType,
+      inputType: RowType,
       projectProgram: Option[RexProgram],
-      scan: FlinkLogicalTableFunctionScan,
+      invocation: RexCall,
       condition: Option[RexNode],
-      outRelType: RelDataType,
-      joinType: JoinRelType,
+      outputType: RowType,
+      joinType: FlinkJoinType,
       parallelism: Int,
       retainHeader: Boolean,
-      expression: (RexNode, List[String], Option[List[RexNode]]) => String,
       opName: String,
       transformationName: String)
     : Transformation[RowData] = {
 
-    val funcRel = scan.asInstanceOf[FlinkLogicalTableFunctionScan]
-    val rexCall = funcRel.getCall.asInstanceOf[RexCall]
-    val inputType = FlinkTypeFactory.toLogicalRowType(inputRelType)
-    val returnType = FlinkTypeFactory.toLogicalRowType(outRelType)
-
     // according to the SQL standard, every scalar function should also be a 
table function
     // but we don't allow that for now
-    rexCall.getOperator match {
+    invocation.getOperator match {
       case func: BridgingSqlFunction if func.getDefinition.getKind == 
FunctionKind.TABLE => // ok
       case _: TableSqlFunction => // ok
       case f@_ =>
@@ -82,7 +73,7 @@ object CorrelateCodeGenerator {
       val selects = program.getProjectList.map(_.getIndex)
       val inputFieldCnt = program.getInputRowType.getFieldCount
       val swallowInputOnly = selects.head > inputFieldCnt &&
-        (inputFieldCnt - outRelType.getFieldCount == 
inputRelType.getFieldCount)
+        (inputFieldCnt - outputType.getFieldCount == inputType.getFieldCount)
       // partial output or output right only
       swallowInputOnly
     } else {
@@ -93,7 +84,7 @@ object CorrelateCodeGenerator {
     // adjust indicies of InputRefs to adhere to schema expected by generator
     val changeInputRefIndexShuttle = new RexShuttle {
       override def visitInputRef(inputRef: RexInputRef): RexNode = {
-        new RexInputRef(inputRelType.getFieldCount + inputRef.getIndex, 
inputRef.getType)
+        new RexInputRef(inputType.getFieldCount + inputRef.getIndex, 
inputRef.getType)
       }
     }
 
@@ -104,18 +95,17 @@ object CorrelateCodeGenerator {
       projectProgram,
       swallowInputOnly,
       condition.map(_.accept(changeInputRefIndexShuttle)),
-      returnType,
+      outputType,
       joinType,
-      rexCall,
+      invocation,
       opName,
-      classOf[ProcessFunction[RowData, RowData]],
       retainHeader)
 
     ExecNodeUtil.createOneInputTransformation(
       inputTransformation,
       transformationName,
       substituteStreamOperator,
-      InternalTypeInfo.of(returnType),
+      InternalTypeInfo.of(outputType),
       parallelism,
       0)
   }
@@ -131,10 +121,9 @@ object CorrelateCodeGenerator {
       swallowInputOnly: Boolean = false,
       condition: Option[RexNode],
       returnType: RowType,
-      joinType: JoinRelType,
+      joinType: FlinkJoinType,
       rexCall: RexCall,
       ruleDescription: String,
-      functionClass: Class[T],
       retainHeader: Boolean = true)
     : CodeGenOperatorFactory[RowData] = {
 
@@ -177,7 +166,7 @@ object CorrelateCodeGenerator {
          |""".stripMargin
 
     // 3. left join
-    if (joinType == JoinRelType.LEFT) {
+    if (joinType == FlinkJoinType.LEFT) {
       if (swallowInputOnly) {
         // and the returned row table function is empty, collect a null
         val nullRowTerm = CodeGenUtils.newName("nullRow")
@@ -250,7 +239,7 @@ object CorrelateCodeGenerator {
              |""".stripMargin
 
         }
-    } else if (joinType != JoinRelType.INNER) {
+    } else if (joinType != FlinkJoinType.INNER) {
       throw new TableException(s"Unsupported JoinRelType: $joinType for 
correlate join.")
     }
 
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniqueness.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniqueness.scala
index ecd59ab..78a69f1 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniqueness.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniqueness.scala
@@ -628,7 +628,7 @@ class FlinkRelMdColumnUniqueness private extends 
MetadataHandler[BuiltInMetadata
       ignoreNulls: Boolean): JBoolean = null
 
   def areColumnsUnique(
-      rel: StreamExecCorrelate,
+      rel: StreamPhysicalCorrelate,
       mq: RelMetadataQuery,
       columns: ImmutableBitSet,
       ignoreNulls: Boolean): JBoolean = null
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicity.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicity.scala
index 016ae28..250c169 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicity.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicity.scala
@@ -506,7 +506,7 @@ class FlinkRelMdModifiedMonotonicity private extends 
MetadataHandler[ModifiedMon
       mq: RelMetadataQuery): RelModifiedMonotonicity = null
 
   def getRelModifiedMonotonicity(
-      rel: StreamExecCorrelate,
+      rel: StreamPhysicalCorrelate,
       mq: RelMetadataQuery): RelModifiedMonotonicity = {
     getMonotonicity(rel.getInput(0), mq, rel.getRowType.getFieldCount)
   }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPythonCorrelate.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPythonCorrelate.scala
index 2534793..9f1cb92 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPythonCorrelate.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPythonCorrelate.scala
@@ -18,21 +18,19 @@
 
 package org.apache.flink.table.planner.plan.nodes.common
 
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.core.JoinRelType
-import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode}
 import org.apache.flink.api.dag.Transformation
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator
 import org.apache.flink.streaming.api.transformations.OneInputTransformation
 import org.apache.flink.table.data.RowData
 import org.apache.flink.table.functions.python.PythonFunctionInfo
-import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import 
org.apache.flink.table.planner.plan.nodes.common.CommonPythonCorrelate.PYTHON_TABLE_FUNCTION_OPERATOR_NAME
-import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo
 import org.apache.flink.table.types.logical.RowType
 
+import org.apache.calcite.rel.core.JoinRelType
+import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode}
+
 import scala.collection.mutable
 
 trait CommonPythonCorrelate extends CommonPythonBase {
@@ -73,18 +71,16 @@ trait CommonPythonCorrelate extends CommonPythonBase {
 
   protected def createPythonOneInputTransformation(
       inputTransform: Transformation[RowData],
-      scan: FlinkLogicalTableFunctionScan,
+      pythonTableFuncRexCall: RexCall,
       name: String,
-      outputRowType: RelDataType,
+      outputRowType: RowType,
       config: Configuration,
       joinType: JoinRelType): OneInputTransformation[RowData, RowData] = {
-    val pythonTableFuncRexCall = scan.getCall.asInstanceOf[RexCall]
     val (pythonUdtfInputOffsets, pythonFunctionInfo) =
       extractPythonTableFunctionInfo(pythonTableFuncRexCall)
     val pythonOperatorInputRowType = inputTransform.getOutputType
       .asInstanceOf[InternalTypeInfo[RowData]]
-    val pythonOperatorOutputRowType = InternalTypeInfo.of(
-      FlinkTypeFactory.toLogicalType(outputRowType).asInstanceOf[RowType])
+    val pythonOperatorOutputRowType = InternalTypeInfo.of(outputRowType)
     val pythonOperator = getPythonTableFunctionOperator(
       config,
       pythonOperatorInputRowType,
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecPythonCorrelate.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonCorrelate.scala
similarity index 57%
copy from 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecPythonCorrelate.scala
copy to 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonCorrelate.scala
index 3dc7699..6b7bc83 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecPythonCorrelate.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonCorrelate.scala
@@ -15,74 +15,54 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.table.planner.plan.nodes.physical.stream
+package org.apache.flink.table.planner.plan.nodes.exec.stream
 
 import org.apache.flink.api.dag.Transformation
 import org.apache.flink.core.memory.ManagedMemoryUseCase
 import org.apache.flink.table.api.TableException
 import org.apache.flink.table.data.RowData
-import org.apache.flink.table.planner.delegation.StreamPlanner
+import org.apache.flink.table.planner.delegation.PlannerBase
 import org.apache.flink.table.planner.plan.nodes.common.CommonPythonCorrelate
-import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan
+import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, ExecNodeBase}
+import org.apache.flink.table.types.logical.RowType
 
-import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.JoinRelType
-import org.apache.calcite.rex.{RexNode, RexProgram}
+import org.apache.calcite.rex.{RexCall, RexNode}
+
+import java.util.Collections
 
 /**
-  * Flink RelNode which matches along with join a python user defined table 
function.
-  */
+ * Stream exec node which matches along with join a python user defined table 
function.
+ *
+ * <p>Note: This class can't be ported to Java,
+ * because java class can't extend scala interface with default implementation.
+ * FLINK-20693 will port this class to Java.
+ *
+ * <p>TODO change JoinRelType to FlinkJoinType
+ */
 class StreamExecPythonCorrelate(
-    cluster: RelOptCluster,
-    traitSet: RelTraitSet,
-    inputRel: RelNode,
-    projectProgram: Option[RexProgram],
-    scan: FlinkLogicalTableFunctionScan,
+    joinType: JoinRelType,
+    invocation: RexCall,
     condition: Option[RexNode],
-    outputRowType: RelDataType,
-    joinType: JoinRelType)
-  extends StreamExecCorrelateBase(
-    cluster,
-    traitSet,
-    inputRel,
-    projectProgram,
-    scan,
-    condition,
-    outputRowType,
-    joinType)
+    inputEdge: ExecEdge,
+    outputType: RowType,
+    description: String)
+  extends ExecNodeBase[RowData](Collections.singletonList(inputEdge), 
outputType, description)
+  with StreamExecNode[RowData]
   with CommonPythonCorrelate {
 
-  if (condition.isDefined) {
+  if (joinType == JoinRelType.LEFT && condition.isDefined) {
     throw new TableException("Currently Python correlate does not support 
conditions in left join.")
   }
 
-  def copy(
-      traitSet: RelTraitSet,
-      newChild: RelNode,
-      projectProgram: Option[RexProgram],
-      outputType: RelDataType): RelNode = {
-    new StreamExecPythonCorrelate(
-      cluster,
-      traitSet,
-      newChild,
-      projectProgram,
-      scan,
-      condition,
-      outputType,
-      joinType)
-  }
-
-  override protected def translateToPlanInternal(
-      planner: StreamPlanner): Transformation[RowData] = {
+  override protected def translateToPlanInternal(planner: PlannerBase): 
Transformation[RowData] = {
     val inputTransformation = getInputNodes.get(0).translateToPlan(planner)
       .asInstanceOf[Transformation[RowData]]
     val ret = createPythonOneInputTransformation(
       inputTransformation,
-      scan,
+      invocation,
       "StreamExecPythonCorrelate",
-      outputRowType,
+      getOutputType.asInstanceOf[RowType],
       getConfig(planner.getExecEnv, planner.getTableConfig),
       joinType)
     if 
(isPythonWorkerUsingManagedMemory(planner.getTableConfig.getConfiguration)) {
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCorrelate.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCorrelate.scala
index 1dec67d..405b918 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCorrelate.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCorrelate.scala
@@ -19,15 +19,17 @@ package 
org.apache.flink.table.planner.plan.nodes.physical.batch
 
 import org.apache.flink.api.dag.Transformation
 import org.apache.flink.table.data.RowData
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, 
CorrelateCodeGenerator}
 import org.apache.flink.table.planner.delegation.BatchPlanner
 import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan
+import org.apache.flink.table.planner.plan.utils.JoinTypeUtil
 
 import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.{Correlate, JoinRelType}
-import org.apache.calcite.rex.{RexNode, RexProgram}
+import org.apache.calcite.rex.{RexCall, RexNode, RexProgram}
 
 /**
   * Batch physical RelNode for [[Correlate]] (Java/Scala user defined table 
function).
@@ -77,15 +79,14 @@ class BatchExecCorrelate(
       config,
       operatorCtx,
       inputTransformation,
-      input.getRowType,
+      FlinkTypeFactory.toLogicalRowType(input.getRowType),
       projectProgram,
-      scan,
+      scan.getCall.asInstanceOf[RexCall],
       condition,
-      outputRowType,
-      joinType,
+      FlinkTypeFactory.toLogicalRowType(outputRowType),
+      JoinTypeUtil.getFlinkJoinType(joinType),
       inputTransformation.getParallelism,
       retainHeader = false,
-      getExpressionString,
       "BatchExecCorrelate",
       getRelDetailedDescription)
   }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecPythonCorrelate.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecPythonCorrelate.scala
index 6ff8307..f9e6c29 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecPythonCorrelate.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecPythonCorrelate.scala
@@ -20,6 +20,7 @@ package 
org.apache.flink.table.planner.plan.nodes.physical.batch
 import org.apache.flink.api.dag.Transformation
 import org.apache.flink.core.memory.ManagedMemoryUseCase
 import org.apache.flink.table.data.RowData
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import org.apache.flink.table.planner.delegation.BatchPlanner
 import org.apache.flink.table.planner.plan.nodes.common.CommonPythonCorrelate
 import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan
@@ -28,7 +29,7 @@ import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.{Correlate, JoinRelType}
-import org.apache.calcite.rex.{RexNode, RexProgram}
+import org.apache.calcite.rex.{RexCall, RexNode, RexProgram}
 
 /**
   * Batch physical RelNode for [[Correlate]] (Python user defined table 
function).
@@ -75,9 +76,9 @@ class BatchExecPythonCorrelate(
       .asInstanceOf[Transformation[RowData]]
     val ret = createPythonOneInputTransformation(
       inputTransformation,
-      scan,
+      scan.getCall.asInstanceOf[RexCall],
       "BatchExecPythonCorrelate",
-      outputRowType,
+      FlinkTypeFactory.toLogicalRowType(outputRowType),
       getConfig(planner.getExecEnv, planner.getTableConfig),
       joinType)
     if 
(isPythonWorkerUsingManagedMemory(planner.getTableConfig.getConfiguration)) {
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecCorrelate.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalCorrelate.scala
similarity index 56%
rename from 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecCorrelate.scala
rename to 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalCorrelate.scala
index d18832b..70bf369 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecCorrelate.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalCorrelate.scala
@@ -17,23 +17,22 @@
  */
 package org.apache.flink.table.planner.plan.nodes.physical.stream
 
-import org.apache.flink.api.dag.Transformation
-import org.apache.flink.table.data.RowData
-import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, 
CorrelateCodeGenerator}
-import org.apache.flink.table.planner.delegation.StreamPlanner
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCorrelate
+import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, ExecNode}
 import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan
-import org.apache.flink.table.runtime.operators.AbstractProcessStreamOperator
+import org.apache.flink.table.planner.plan.utils.JoinTypeUtil
 
 import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.JoinRelType
-import org.apache.calcite.rex.{RexNode, RexProgram}
+import org.apache.calcite.rex.{RexCall, RexNode, RexProgram}
 
 /**
-  * Flink RelNode which matches along with join a Java/Scala user defined 
table function.
-  */
-class StreamExecCorrelate(
+ * Flink RelNode which matches along with join a Java/Scala user defined table 
function.
+ */
+class StreamPhysicalCorrelate(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
     inputRel: RelNode,
@@ -42,7 +41,7 @@ class StreamExecCorrelate(
     condition: Option[RexNode],
     outputRowType: RelDataType,
     joinType: JoinRelType)
-  extends StreamExecCorrelateBase(
+  extends StreamPhysicalCorrelateBase(
     cluster,
     traitSet,
     inputRel,
@@ -57,7 +56,7 @@ class StreamExecCorrelate(
       newChild: RelNode,
       projectProgram: Option[RexProgram],
       outputType: RelDataType): RelNode = {
-    new StreamExecCorrelate(
+    new StreamPhysicalCorrelate(
       cluster,
       traitSet,
       newChild,
@@ -68,32 +67,14 @@ class StreamExecCorrelate(
       joinType)
   }
 
-  override protected def translateToPlanInternal(
-      planner: StreamPlanner): Transformation[RowData] = {
-    val tableConfig = planner.getTableConfig
-    val inputTransformation = getInputNodes.get(0).translateToPlan(planner)
-      .asInstanceOf[Transformation[RowData]]
-    val operatorCtx = CodeGeneratorContext(tableConfig)
-      .setOperatorBaseClass(classOf[AbstractProcessStreamOperator[_]])
-    val transform = CorrelateCodeGenerator.generateCorrelateTransformation(
-      tableConfig,
-      operatorCtx,
-      inputTransformation,
-      inputRel.getRowType,
-      projectProgram,
-      scan,
-      condition,
-      outputRowType,
-      joinType,
-      inputTransformation.getParallelism,
-      retainHeader = true,
-      getExpressionString,
-      "StreamExecCorrelate",
+  override def translateToExecNode(): ExecNode[_] = {
+    new StreamExecCorrelate(
+      JoinTypeUtil.getFlinkJoinType(joinType),
+      projectProgram.orNull,
+      scan.getCall.asInstanceOf[RexCall],
+      condition.orNull,
+      ExecEdge.DEFAULT,
+      FlinkTypeFactory.toLogicalRowType(getRowType),
       getRelDetailedDescription)
-    if (inputsContainSingleton()) {
-      transform.setParallelism(1)
-      transform.setMaxParallelism(1)
-    }
-    transform
   }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecCorrelateBase.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalCorrelateBase.scala
similarity index 92%
rename from 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecCorrelateBase.scala
rename to 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalCorrelateBase.scala
index b129e8e..0f0642c 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecCorrelateBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalCorrelateBase.scala
@@ -17,8 +17,6 @@
  */
 package org.apache.flink.table.planner.plan.nodes.physical.stream
 
-import org.apache.flink.table.data.RowData
-import org.apache.flink.table.planner.plan.nodes.exec.LegacyStreamExecNode
 import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan
 import org.apache.flink.table.planner.plan.utils.RelExplainUtil
 
@@ -33,7 +31,7 @@ import scala.collection.JavaConversions._
 /**
   * Base Flink RelNode which matches along with join a user defined table 
function.
   */
-abstract class StreamExecCorrelateBase(
+abstract class StreamPhysicalCorrelateBase(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
     inputRel: RelNode,
@@ -43,8 +41,7 @@ abstract class StreamExecCorrelateBase(
     outputRowType: RelDataType,
     joinType: JoinRelType)
   extends SingleRel(cluster, traitSet, inputRel)
-  with StreamPhysicalRel
-  with LegacyStreamExecNode[RowData] {
+  with StreamPhysicalRel {
 
   require(joinType == JoinRelType.INNER || joinType == JoinRelType.LEFT)
 
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecPythonCorrelate.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalPythonCorrelate.scala
similarity index 64%
rename from 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecPythonCorrelate.scala
rename to 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalPythonCorrelate.scala
index 3dc7699..9a29323 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecPythonCorrelate.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalPythonCorrelate.scala
@@ -17,24 +17,22 @@
  */
 package org.apache.flink.table.planner.plan.nodes.physical.stream
 
-import org.apache.flink.api.dag.Transformation
-import org.apache.flink.core.memory.ManagedMemoryUseCase
-import org.apache.flink.table.api.TableException
-import org.apache.flink.table.data.RowData
-import org.apache.flink.table.planner.delegation.StreamPlanner
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import org.apache.flink.table.planner.plan.nodes.common.CommonPythonCorrelate
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonCorrelate
+import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, ExecNode}
 import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan
 
 import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.JoinRelType
-import org.apache.calcite.rex.{RexNode, RexProgram}
+import org.apache.calcite.rex.{RexCall, RexNode, RexProgram}
 
 /**
   * Flink RelNode which matches along with join a python user defined table 
function.
   */
-class StreamExecPythonCorrelate(
+class StreamPhysicalPythonCorrelate(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
     inputRel: RelNode,
@@ -43,7 +41,7 @@ class StreamExecPythonCorrelate(
     condition: Option[RexNode],
     outputRowType: RelDataType,
     joinType: JoinRelType)
-  extends StreamExecCorrelateBase(
+  extends StreamPhysicalCorrelateBase(
     cluster,
     traitSet,
     inputRel,
@@ -54,16 +52,12 @@ class StreamExecPythonCorrelate(
     joinType)
   with CommonPythonCorrelate {
 
-  if (condition.isDefined) {
-    throw new TableException("Currently Python correlate does not support 
conditions in left join.")
-  }
-
   def copy(
       traitSet: RelTraitSet,
       newChild: RelNode,
       projectProgram: Option[RexProgram],
       outputType: RelDataType): RelNode = {
-    new StreamExecPythonCorrelate(
+    new StreamPhysicalPythonCorrelate(
       cluster,
       traitSet,
       newChild,
@@ -74,20 +68,14 @@ class StreamExecPythonCorrelate(
       joinType)
   }
 
-  override protected def translateToPlanInternal(
-      planner: StreamPlanner): Transformation[RowData] = {
-    val inputTransformation = getInputNodes.get(0).translateToPlan(planner)
-      .asInstanceOf[Transformation[RowData]]
-    val ret = createPythonOneInputTransformation(
-      inputTransformation,
-      scan,
-      "StreamExecPythonCorrelate",
-      outputRowType,
-      getConfig(planner.getExecEnv, planner.getTableConfig),
-      joinType)
-    if 
(isPythonWorkerUsingManagedMemory(planner.getTableConfig.getConfiguration)) {
-      ret.declareManagedMemoryUseCaseAtSlotScope(ManagedMemoryUseCase.PYTHON)
-    }
-    ret
+  override def translateToExecNode(): ExecNode[_] = {
+    new StreamExecPythonCorrelate(
+      joinType,
+      scan.getCall.asInstanceOf[RexCall],
+      condition,
+      ExecEdge.DEFAULT,
+      FlinkTypeFactory.toLogicalRowType(getRowType),
+      getRelDetailedDescription
+    )
   }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
index 0c46116..6b5643f 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
@@ -271,8 +271,8 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
         val leftTrait = 
children.head.getTraitSet.getTrait(ModifyKindSetTraitDef.INSTANCE)
         createNewNode(temporalJoin, children, leftTrait, requiredTrait, 
requester)
 
-      case _: StreamPhysicalCalcBase | _: StreamExecCorrelate |
-           _: StreamExecPythonCorrelate | _: StreamExecLookupJoin | _: 
StreamPhysicalExchange |
+      case _: StreamPhysicalCalcBase | _: StreamPhysicalCorrelateBase |
+           _: StreamExecLookupJoin | _: StreamPhysicalExchange |
            _: StreamExecExpand | _: StreamExecMiniBatchAssigner |
            _: StreamPhysicalWatermarkAssigner =>
         // transparent forward requiredTrait to children
@@ -571,7 +571,7 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
           }
         }
 
-      case _: StreamExecCorrelate | _: StreamExecPythonCorrelate | _: 
StreamExecLookupJoin |
+      case _: StreamPhysicalCorrelateBase | _: StreamExecLookupJoin |
            _: StreamPhysicalExchange | _: StreamExecExpand | _: 
StreamExecMiniBatchAssigner |
            _: StreamPhysicalWatermarkAssigner =>
         // transparent forward requiredTrait to children
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
index 59948b0..12855b8 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala
@@ -437,9 +437,9 @@ object FlinkStreamRuleSets {
     // CEP
     StreamExecMatchRule.INSTANCE,
     // correlate
-    StreamExecConstantTableFunctionScanRule.INSTANCE,
-    StreamExecCorrelateRule.INSTANCE,
-    StreamExecPythonCorrelateRule.INSTANCE,
+    StreamPhysicalConstantTableFunctionScanRule.INSTANCE,
+    StreamPhysicalCorrelateRule.INSTANCE,
+    StreamPhysicalPythonCorrelateRule.INSTANCE,
     // sink
     StreamExecSinkRule.INSTANCE,
     StreamExecLegacySinkRule.INSTANCE
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitPythonConditionFromCorrelateRule.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitPythonConditionFromCorrelateRule.scala
index 4806d40..9663152 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitPythonConditionFromCorrelateRule.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitPythonConditionFromCorrelateRule.scala
@@ -18,14 +18,15 @@
 
 package org.apache.flink.table.planner.plan.rules.logical
 
+import org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalCalc, 
FlinkLogicalCorrelate, FlinkLogicalRel, FlinkLogicalTableFunctionScan}
+import 
org.apache.flink.table.planner.plan.rules.physical.stream.StreamPhysicalCorrelateRule
+import 
org.apache.flink.table.planner.plan.utils.PythonUtil.{containsPythonCall, 
isNonPythonCall}
+import org.apache.flink.table.planner.plan.utils.RexDefaultVisitor
+
 import org.apache.calcite.plan.RelOptRule.{any, operand}
 import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptUtil}
 import org.apache.calcite.rel.core.JoinRelType
 import org.apache.calcite.rex._
-import org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalCalc, 
FlinkLogicalCorrelate, FlinkLogicalRel, FlinkLogicalTableFunctionScan}
-import 
org.apache.flink.table.planner.plan.rules.physical.stream.StreamExecCorrelateRule
-import 
org.apache.flink.table.planner.plan.utils.PythonUtil.{containsPythonCall, 
isNonPythonCall}
-import org.apache.flink.table.planner.plan.utils.RexDefaultVisitor
 
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
@@ -51,8 +52,8 @@ class SplitPythonConditionFromCorrelateRule
     val correlate: FlinkLogicalCorrelate = 
call.rel(0).asInstanceOf[FlinkLogicalCorrelate]
     val right: FlinkLogicalCalc = call.rel(2).asInstanceOf[FlinkLogicalCalc]
     val joinType: JoinRelType = correlate.getJoinType
-    val mergedCalc = StreamExecCorrelateRule.getMergedCalc(right)
-    val tableScan = StreamExecCorrelateRule
+    val mergedCalc = StreamPhysicalCorrelateRule.getMergedCalc(right)
+    val tableScan = StreamPhysicalCorrelateRule
       .getTableScan(mergedCalc)
       .asInstanceOf[FlinkLogicalTableFunctionScan]
     joinType == JoinRelType.INNER &&
@@ -66,7 +67,7 @@ class SplitPythonConditionFromCorrelateRule
     val correlate: FlinkLogicalCorrelate = 
call.rel(0).asInstanceOf[FlinkLogicalCorrelate]
     val right: FlinkLogicalCalc = call.rel(2).asInstanceOf[FlinkLogicalCalc]
     val rexBuilder = call.builder().getRexBuilder
-    val mergedCalc = StreamExecCorrelateRule.getMergedCalc(right)
+    val mergedCalc = StreamPhysicalCorrelateRule.getMergedCalc(right)
     val mergedCalcProgram = mergedCalc.getProgram
     val input = mergedCalc.getInput
 
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecConstantTableFunctionScanRule.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalConstantTableFunctionScanRule.scala
similarity index 83%
rename from 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecConstantTableFunctionScanRule.scala
rename to 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalConstantTableFunctionScanRule.scala
index 23eb212..ed9edbf 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecConstantTableFunctionScanRule.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalConstantTableFunctionScanRule.scala
@@ -20,7 +20,7 @@ package 
org.apache.flink.table.planner.plan.rules.physical.stream
 
 import org.apache.flink.table.planner.plan.nodes.FlinkConventions
 import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan
-import 
org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamExecCorrelate, 
StreamExecValues}
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamPhysicalCorrelate,
 StreamExecValues}
 
 import com.google.common.collect.ImmutableList
 import org.apache.calcite.plan.RelOptRule._
@@ -31,7 +31,7 @@ import org.apache.calcite.rex.{RexLiteral, RexUtil}
 /**
   * Converts [[FlinkLogicalTableFunctionScan]] with constant RexCall to
   * {{{
-  *                    [[StreamExecCorrelate]]
+  *                    [[StreamPhysicalCorrelate]]
   *                          /       \
   * empty [[StreamExecValues]]  [[FlinkLogicalTableFunctionScan]]
   * }}}
@@ -39,15 +39,15 @@ import org.apache.calcite.rex.{RexLiteral, RexUtil}
   * Add the rule to support select from a UDF directly, such as the following 
SQL:
   * SELECT * FROM LATERAL TABLE(func()) as T(c)
   *
-  * Note: [[StreamExecCorrelateRule]] is responsible for converting a 
reasonable physical plan for
-  * the normal correlate query, such as the following SQL:
+  * Note: [[StreamPhysicalCorrelateRule]] is responsible for converting a 
reasonable physical plan
+ * for the normal correlate query, such as the following SQL:
   * example1: SELECT * FROM T, LATERAL TABLE(func()) as T(c)
   * example2: SELECT a, c FROM T, LATERAL TABLE(func(a)) as T(c)
   */
-class StreamExecConstantTableFunctionScanRule
+class StreamPhysicalConstantTableFunctionScanRule
   extends RelOptRule(
     operand(classOf[FlinkLogicalTableFunctionScan], any),
-    "StreamExecConstantTableFunctionScanRule") {
+    "StreamPhysicalConstantTableFunctionScanRule") {
 
   override def matches(call: RelOptRuleCall): Boolean = {
     val scan: FlinkLogicalTableFunctionScan = call.rel(0)
@@ -66,7 +66,7 @@ class StreamExecConstantTableFunctionScanRule
       ImmutableList.of(ImmutableList.of[RexLiteral]()),
       cluster.getTypeFactory.createStructType(ImmutableList.of(), 
ImmutableList.of()))
 
-    val correlate = new StreamExecCorrelate(
+    val correlate = new StreamPhysicalCorrelate(
       cluster,
       traitSet,
       values,
@@ -80,6 +80,6 @@ class StreamExecConstantTableFunctionScanRule
 
 }
 
-object StreamExecConstantTableFunctionScanRule {
-  val INSTANCE = new StreamExecConstantTableFunctionScanRule
+object StreamPhysicalConstantTableFunctionScanRule {
+  val INSTANCE = new StreamPhysicalConstantTableFunctionScanRule
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecCorrelateRule.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalCorrelateRule.scala
similarity index 89%
rename from 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecCorrelateRule.scala
rename to 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalCorrelateRule.scala
index 776d81a..182ebeb 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecCorrelateRule.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalCorrelateRule.scala
@@ -17,31 +17,36 @@
  */
 package org.apache.flink.table.planner.plan.rules.physical.stream
 
-import org.apache.calcite.plan.hep.HepRelVertex
 import org.apache.flink.table.api.TableException
 import org.apache.flink.table.planner.plan.nodes.FlinkConventions
 import org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalCalc, 
FlinkLogicalCorrelate, FlinkLogicalTableFunctionScan}
-import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCorrelate
-import 
org.apache.flink.table.planner.plan.rules.physical.stream.StreamExecCorrelateRule.{getMergedCalc,
 getTableScan}
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalCorrelate
+import 
org.apache.flink.table.planner.plan.rules.physical.stream.StreamPhysicalCorrelateRule.{getMergedCalc,
 getTableScan}
+import org.apache.flink.table.planner.plan.utils.PythonUtil
+
+import org.apache.calcite.plan.hep.HepRelVertex
 import org.apache.calcite.plan.volcano.RelSubset
 import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
 import org.apache.calcite.rex.{RexNode, RexProgram, RexProgramBuilder}
-import org.apache.flink.table.planner.plan.utils.PythonUtil
 
-class StreamExecCorrelateRule
+/**
+ * Rule that converts [[FlinkLogicalCorrelate]] to [[StreamPhysicalCorrelate]].
+ */
+class StreamPhysicalCorrelateRule
   extends ConverterRule(
     classOf[FlinkLogicalCorrelate],
     FlinkConventions.LOGICAL,
     FlinkConventions.STREAM_PHYSICAL,
-    "StreamExecCorrelateRule") {
+    "StreamPhysicalCorrelateRule") {
 
   override def matches(call: RelOptRuleCall): Boolean = {
     val correlate: FlinkLogicalCorrelate = call.rel(0)
     val right = correlate.getRight.asInstanceOf[RelSubset].getOriginal
 
     // find only calc and table function
+    @scala.annotation.tailrec
     def findTableFunction(calc: FlinkLogicalCalc): Boolean = {
       val child = calc.getInput.asInstanceOf[RelSubset].getOriginal
       child match {
@@ -67,7 +72,10 @@ class StreamExecCorrelateRule
       correlate.getInput(0), FlinkConventions.STREAM_PHYSICAL)
     val right: RelNode = correlate.getInput(1)
 
-    def convertToCorrelate(relNode: RelNode, condition: Option[RexNode]): 
StreamExecCorrelate = {
+    @scala.annotation.tailrec
+    def convertToCorrelate(
+        relNode: RelNode,
+        condition: Option[RexNode]): StreamPhysicalCorrelate = {
       relNode match {
         case rel: RelSubset =>
           convertToCorrelate(rel.getRelList.get(0), condition)
@@ -80,7 +88,7 @@ class StreamExecCorrelateRule
             
Some(newCalc.getProgram.expandLocalRef(newCalc.getProgram.getCondition)))
 
         case scan: FlinkLogicalTableFunctionScan =>
-          new StreamExecCorrelate(
+          new StreamPhysicalCorrelate(
             rel.getCluster,
             traitSet,
             convInput,
@@ -96,8 +104,8 @@ class StreamExecCorrelateRule
 
 }
 
-object StreamExecCorrelateRule {
-  val INSTANCE: RelOptRule = new StreamExecCorrelateRule
+object StreamPhysicalCorrelateRule {
+  val INSTANCE: RelOptRule = new StreamPhysicalCorrelateRule
 
   def getMergedCalc(calc: FlinkLogicalCalc): FlinkLogicalCalc = {
     val child = calc.getInput match {
@@ -122,6 +130,7 @@ object StreamExecCorrelateRule {
     }
   }
 
+  @scala.annotation.tailrec
   def getTableScan(calc: FlinkLogicalCalc): FlinkLogicalTableFunctionScan = {
     val child = calc.getInput match {
       case relSubset: RelSubset => relSubset.getOriginal

Reply via email to