This is an automated email from the ASF dual-hosted git repository. godfrey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit d42aa454d54181e6a9102342ebc6d1668fe9bda9 Author: godfreyhe <[email protected]> AuthorDate: Mon Dec 21 16:40:20 2020 +0800 [FLINK-20690][table-planner-blink] Introduce BatchPhysicalCorrelate & BatchPhysicalPythonCorrelate, and make BatchExecCorrelate & BatchExecPythonCorrelate only extended from ExecNode This closes #14443 --- .../plan/nodes/exec/batch/BatchExecCorrelate.java | 58 +++++++++++++++++ ....java => BatchPhysicalPythonCorrelateRule.java} | 20 +++--- .../plan/metadata/FlinkRelMdColumnUniqueness.scala | 2 +- .../metadata/FlinkRelMdModifiedMonotonicity.scala | 4 +- .../plan/metadata/FlinkRelMdUniqueGroups.scala | 2 +- .../plan/metadata/FlinkRelMdUniqueKeys.scala | 2 +- .../batch/BatchExecPythonCorrelate.scala | 74 +++++++++------------- ...orrelate.scala => BatchPhysicalCorrelate.scala} | 38 ++++------- ...Base.scala => BatchPhysicalCorrelateBase.scala} | 13 +--- ...te.scala => BatchPhysicalPythonCorrelate.scala} | 34 ++++------ .../planner/plan/rules/FlinkBatchRuleSets.scala | 6 +- ...tchPhysicalConstantTableFunctionScanRule.scala} | 18 +++--- ...Rule.scala => BatchPhysicalCorrelateRule.scala} | 14 ++-- .../metadata/MetadataHandlerConsistencyTest.scala | 4 +- 14 files changed, 150 insertions(+), 139 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecCorrelate.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecCorrelate.java new file mode 100644 index 0000000..4495620 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecCorrelate.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.batch; + +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge; +import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCorrelate; +import org.apache.flink.table.runtime.operators.TableStreamOperator; +import org.apache.flink.table.runtime.operators.join.FlinkJoinType; +import org.apache.flink.table.types.logical.RowType; + +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexProgram; + +import javax.annotation.Nullable; + +/** + * Batch exec node which matches along with join a Java/Scala user defined table function. + */ +public class BatchExecCorrelate extends CommonExecCorrelate implements BatchExecNode<RowData> { + + public BatchExecCorrelate( + FlinkJoinType joinType, + @Nullable RexProgram project, + RexCall invocation, + @Nullable RexNode condition, + ExecEdge inputEdge, + RowType outputType, + String description) { + super( + joinType, + project, + invocation, + condition, + TableStreamOperator.class, + false, + inputEdge, + outputType, + description); + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecPythonCorrelateRule.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalPythonCorrelateRule.java similarity index 88% rename from flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecPythonCorrelateRule.java rename to flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalPythonCorrelateRule.java index 4ecb1df..748b10d 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecPythonCorrelateRule.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalPythonCorrelateRule.java @@ -22,7 +22,7 @@ import org.apache.flink.table.planner.plan.nodes.FlinkConventions; import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc; import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCorrelate; import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan; -import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecPythonCorrelate; +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalPythonCorrelate; import org.apache.flink.table.planner.plan.utils.PythonUtil; import org.apache.calcite.plan.RelOptRule; @@ -38,15 +38,15 @@ import scala.Some; /** * The physical rule is responsible for convert {@link FlinkLogicalCorrelate} to - * {@link BatchExecPythonCorrelate}. + * {@link BatchPhysicalPythonCorrelate}. */ -public class BatchExecPythonCorrelateRule extends ConverterRule { +public class BatchPhysicalPythonCorrelateRule extends ConverterRule { - public static final RelOptRule INSTANCE = new BatchExecPythonCorrelateRule(); + public static final RelOptRule INSTANCE = new BatchPhysicalPythonCorrelateRule(); - private BatchExecPythonCorrelateRule() { + private BatchPhysicalPythonCorrelateRule() { super(FlinkLogicalCorrelate.class, FlinkConventions.LOGICAL(), FlinkConventions.BATCH_PHYSICAL(), - "BatchExecPythonCorrelateRule"); + "BatchPhysicalPythonCorrelateRule"); } @Override @@ -79,7 +79,7 @@ public class BatchExecPythonCorrelateRule extends ConverterRule { } /** - * The factory is responsible for creating {@link BatchExecPythonCorrelate}. + * The factory is responsible for creating {@link BatchPhysicalPythonCorrelate}. */ private static class BatchExecPythonCorrelateFactory { private final FlinkLogicalCorrelate correlate; @@ -95,11 +95,11 @@ public class BatchExecPythonCorrelateRule extends ConverterRule { this.right = correlate.getInput(1); } - BatchExecPythonCorrelate convertToCorrelate() { + BatchPhysicalPythonCorrelate convertToCorrelate() { return convertToCorrelate(right, Option.empty()); } - private BatchExecPythonCorrelate convertToCorrelate( + private BatchPhysicalPythonCorrelate convertToCorrelate( RelNode relNode, Option<RexNode> condition) { if (relNode instanceof RelSubset) { @@ -112,7 +112,7 @@ public class BatchExecPythonCorrelateRule extends ConverterRule { Some.apply(calc.getProgram().expandLocalRef(calc.getProgram().getCondition()))); } else { FlinkLogicalTableFunctionScan scan = (FlinkLogicalTableFunctionScan) relNode; - return new BatchExecPythonCorrelate( + return new BatchPhysicalPythonCorrelate( correlate.getCluster(), traitSet, convInput, diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniqueness.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniqueness.scala index 78a69f1..dde2d10 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniqueness.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniqueness.scala @@ -622,7 +622,7 @@ class FlinkRelMdColumnUniqueness private extends MetadataHandler[BuiltInMetadata } def areColumnsUnique( - rel: BatchExecCorrelate, + rel: BatchPhysicalCorrelate, mq: RelMetadataQuery, columns: ImmutableBitSet, ignoreNulls: Boolean): JBoolean = null diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicity.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicity.scala index 250c169..60c2d1e 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicity.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicity.scala @@ -25,7 +25,7 @@ import org.apache.flink.table.planner.plan.`trait`.RelModifiedMonotonicity import org.apache.flink.table.planner.plan.metadata.FlinkMetadata.ModifiedMonotonicity import org.apache.flink.table.planner.plan.nodes.calcite.{Expand, Rank, TableAggregate, WindowAggregate, WindowTableAggregate} import org.apache.flink.table.planner.plan.nodes.logical._ -import org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchExecCorrelate, BatchExecGroupAggregateBase} +import org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchExecGroupAggregateBase, BatchPhysicalCorrelate} import org.apache.flink.table.planner.plan.nodes.physical.stream._ import org.apache.flink.table.planner.plan.schema.{FlinkPreparingTableBase, TableSourceTable} import org.apache.flink.table.planner.plan.stats.{WithLower, WithUpper} @@ -502,7 +502,7 @@ class FlinkRelMdModifiedMonotonicity private extends MetadataHandler[ModifiedMon } def getRelModifiedMonotonicity( - rel: BatchExecCorrelate, + rel: BatchPhysicalCorrelate, mq: RelMetadataQuery): RelModifiedMonotonicity = null def getRelModifiedMonotonicity( diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueGroups.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueGroups.scala index fa3c6d6..8a06bbb 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueGroups.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueGroups.scala @@ -390,7 +390,7 @@ class FlinkRelMdUniqueGroups private extends MetadataHandler[UniqueGroups] { columns: ImmutableBitSet): ImmutableBitSet = columns def getUniqueGroups( - rel: BatchExecCorrelate, + rel: BatchPhysicalCorrelate, mq: RelMetadataQuery, columns: ImmutableBitSet): ImmutableBitSet = columns diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala index 25768e5..f68a349 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUniqueKeys.scala @@ -576,7 +576,7 @@ class FlinkRelMdUniqueKeys private extends MetadataHandler[BuiltInMetadata.Uniqu ignoreNulls: Boolean): util.Set[ImmutableBitSet] = null def getUniqueKeys( - rel: BatchExecCorrelate, + rel: BatchPhysicalCorrelate, mq: RelMetadataQuery, ignoreNulls: Boolean): util.Set[ImmutableBitSet] = null diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecPythonCorrelate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonCorrelate.scala similarity index 51% copy from flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecPythonCorrelate.scala copy to flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonCorrelate.scala index f9e6c29..5f6501e 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecPythonCorrelate.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonCorrelate.scala @@ -15,70 +15,54 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.table.planner.plan.nodes.physical.batch +package org.apache.flink.table.planner.plan.nodes.exec.batch import org.apache.flink.api.dag.Transformation import org.apache.flink.core.memory.ManagedMemoryUseCase +import org.apache.flink.table.api.TableException import org.apache.flink.table.data.RowData -import org.apache.flink.table.planner.calcite.FlinkTypeFactory -import org.apache.flink.table.planner.delegation.BatchPlanner +import org.apache.flink.table.planner.delegation.PlannerBase import org.apache.flink.table.planner.plan.nodes.common.CommonPythonCorrelate -import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan +import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, ExecNodeBase} +import org.apache.flink.table.types.logical.RowType -import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} -import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.`type`.RelDataType -import org.apache.calcite.rel.core.{Correlate, JoinRelType} -import org.apache.calcite.rex.{RexCall, RexNode, RexProgram} +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rex.{RexCall, RexNode} + +import java.util.Collections /** - * Batch physical RelNode for [[Correlate]] (Python user defined table function). - */ + * Batch exec node which matches along with join a python user defined table function. + * + * <p>Note: This class can't be ported to Java, + * because java class can't extend scala interface with default implementation. + * FLINK-20693 will port this class to Java. + * + * <p>TODO change JoinRelType to FlinkJoinType + */ class BatchExecPythonCorrelate( - cluster: RelOptCluster, - traitSet: RelTraitSet, - inputRel: RelNode, - scan: FlinkLogicalTableFunctionScan, + joinType: JoinRelType, + invocation: RexCall, condition: Option[RexNode], - projectProgram: Option[RexProgram], - outputRowType: RelDataType, - joinType: JoinRelType) - extends BatchExecCorrelateBase( - cluster, - traitSet, - inputRel, - scan, - condition, - projectProgram, - outputRowType, - joinType) + inputEdge: ExecEdge, + outputType: RowType, + description: String) + extends ExecNodeBase[RowData](Collections.singletonList(inputEdge), outputType, description) + with BatchExecNode[RowData] with CommonPythonCorrelate { - def copy( - traitSet: RelTraitSet, - child: RelNode, - projectProgram: Option[RexProgram], - outputType: RelDataType): RelNode = { - new BatchExecPythonCorrelate( - cluster, - traitSet, - child, - scan, - condition, - projectProgram, - outputType, - joinType) + if (joinType == JoinRelType.LEFT && condition.isDefined) { + throw new TableException("Currently Python correlate does not support conditions in left join.") } - override protected def translateToPlanInternal( - planner: BatchPlanner): Transformation[RowData] = { + override protected def translateToPlanInternal(planner: PlannerBase): Transformation[RowData] = { val inputTransformation = getInputNodes.get(0).translateToPlan(planner) .asInstanceOf[Transformation[RowData]] val ret = createPythonOneInputTransformation( inputTransformation, - scan.getCall.asInstanceOf[RexCall], + invocation, "BatchExecPythonCorrelate", - FlinkTypeFactory.toLogicalRowType(outputRowType), + getOutputType.asInstanceOf[RowType], getConfig(planner.getExecEnv, planner.getTableConfig), joinType) if (isPythonWorkerUsingManagedMemory(planner.getTableConfig.getConfiguration)) { diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCorrelate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalCorrelate.scala similarity index 69% rename from flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCorrelate.scala rename to flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalCorrelate.scala index 405b918..6107acb 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCorrelate.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalCorrelate.scala @@ -17,11 +17,9 @@ */ package org.apache.flink.table.planner.plan.nodes.physical.batch -import org.apache.flink.api.dag.Transformation -import org.apache.flink.table.data.RowData import org.apache.flink.table.planner.calcite.FlinkTypeFactory -import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, CorrelateCodeGenerator} -import org.apache.flink.table.planner.delegation.BatchPlanner +import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecCorrelate +import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, ExecNode} import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan import org.apache.flink.table.planner.plan.utils.JoinTypeUtil @@ -34,7 +32,7 @@ import org.apache.calcite.rex.{RexCall, RexNode, RexProgram} /** * Batch physical RelNode for [[Correlate]] (Java/Scala user defined table function). */ -class BatchExecCorrelate( +class BatchPhysicalCorrelate( cluster: RelOptCluster, traitSet: RelTraitSet, inputRel: RelNode, @@ -43,7 +41,7 @@ class BatchExecCorrelate( projectProgram: Option[RexProgram], outputRowType: RelDataType, joinType: JoinRelType) - extends BatchExecCorrelateBase( + extends BatchPhysicalCorrelateBase( cluster, traitSet, inputRel, @@ -58,7 +56,7 @@ class BatchExecCorrelate( child: RelNode, projectProgram: Option[RexProgram], outputType: RelDataType): RelNode = { - new BatchExecCorrelate( + new BatchPhysicalCorrelate( cluster, traitSet, child, @@ -69,26 +67,14 @@ class BatchExecCorrelate( joinType) } - override protected def translateToPlanInternal( - planner: BatchPlanner): Transformation[RowData] = { - val config = planner.getTableConfig - val inputTransformation = getInputNodes.get(0).translateToPlan(planner) - .asInstanceOf[Transformation[RowData]] - val operatorCtx = CodeGeneratorContext(config) - CorrelateCodeGenerator.generateCorrelateTransformation( - config, - operatorCtx, - inputTransformation, - FlinkTypeFactory.toLogicalRowType(input.getRowType), - projectProgram, - scan.getCall.asInstanceOf[RexCall], - condition, - FlinkTypeFactory.toLogicalRowType(outputRowType), + override def translateToExecNode(): ExecNode[_] = { + new BatchExecCorrelate( JoinTypeUtil.getFlinkJoinType(joinType), - inputTransformation.getParallelism, - retainHeader = false, - "BatchExecCorrelate", + projectProgram.orNull, + scan.getCall.asInstanceOf[RexCall], + condition.orNull, + ExecEdge.DEFAULT, + FlinkTypeFactory.toLogicalRowType(getRowType), getRelDetailedDescription) } - } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCorrelateBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalCorrelateBase.scala similarity index 94% rename from flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCorrelateBase.scala rename to flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalCorrelateBase.scala index 3b98cd8..0462e30 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCorrelateBase.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalCorrelateBase.scala @@ -18,9 +18,7 @@ package org.apache.flink.table.planner.plan.nodes.physical.batch -import org.apache.flink.table.data.RowData import org.apache.flink.table.planner.plan.`trait`.{FlinkRelDistribution, FlinkRelDistributionTraitDef, TraitUtil} -import org.apache.flink.table.planner.plan.nodes.exec.{LegacyBatchExecNode, ExecEdge} import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan import org.apache.flink.table.planner.plan.utils.RelExplainUtil @@ -32,14 +30,12 @@ import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode, RexProgram} import org.apache.calcite.sql.SqlKind import org.apache.calcite.util.mapping.{Mapping, MappingType, Mappings} -import java.util - import scala.collection.JavaConversions._ /** * Base Batch physical RelNode for [[Correlate]] (user defined table function). */ -abstract class BatchExecCorrelateBase( +abstract class BatchPhysicalCorrelateBase( cluster: RelOptCluster, traitSet: RelTraitSet, inputRel: RelNode, @@ -49,8 +45,7 @@ abstract class BatchExecCorrelateBase( outputRowType: RelDataType, joinType: JoinRelType) extends SingleRel(cluster, traitSet, inputRel) - with BatchPhysicalRel - with LegacyBatchExecNode[RowData] { + with BatchPhysicalRel { require(joinType == JoinRelType.INNER || joinType == JoinRelType.LEFT) @@ -153,8 +148,4 @@ abstract class BatchExecCorrelateBase( val newInput = RelOptRule.convert(getInput, inputRequiredTraits) Some(copy(providedTraits, Seq(newInput))) } - - //~ ExecNode methods ----------------------------------------------------------- - - override def getInputEdges: util.List[ExecEdge] = List(ExecEdge.DEFAULT) } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecPythonCorrelate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalPythonCorrelate.scala similarity index 70% rename from flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecPythonCorrelate.scala rename to flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalPythonCorrelate.scala index f9e6c29..1732472 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecPythonCorrelate.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalPythonCorrelate.scala @@ -17,12 +17,10 @@ */ package org.apache.flink.table.planner.plan.nodes.physical.batch -import org.apache.flink.api.dag.Transformation -import org.apache.flink.core.memory.ManagedMemoryUseCase -import org.apache.flink.table.data.RowData import org.apache.flink.table.planner.calcite.FlinkTypeFactory -import org.apache.flink.table.planner.delegation.BatchPlanner import org.apache.flink.table.planner.plan.nodes.common.CommonPythonCorrelate +import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecPythonCorrelate +import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, ExecNode} import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} @@ -34,7 +32,7 @@ import org.apache.calcite.rex.{RexCall, RexNode, RexProgram} /** * Batch physical RelNode for [[Correlate]] (Python user defined table function). */ -class BatchExecPythonCorrelate( +class BatchPhysicalPythonCorrelate( cluster: RelOptCluster, traitSet: RelTraitSet, inputRel: RelNode, @@ -43,7 +41,7 @@ class BatchExecPythonCorrelate( projectProgram: Option[RexProgram], outputRowType: RelDataType, joinType: JoinRelType) - extends BatchExecCorrelateBase( + extends BatchPhysicalCorrelateBase( cluster, traitSet, inputRel, @@ -59,7 +57,7 @@ class BatchExecPythonCorrelate( child: RelNode, projectProgram: Option[RexProgram], outputType: RelDataType): RelNode = { - new BatchExecPythonCorrelate( + new BatchPhysicalPythonCorrelate( cluster, traitSet, child, @@ -70,20 +68,14 @@ class BatchExecPythonCorrelate( joinType) } - override protected def translateToPlanInternal( - planner: BatchPlanner): Transformation[RowData] = { - val inputTransformation = getInputNodes.get(0).translateToPlan(planner) - .asInstanceOf[Transformation[RowData]] - val ret = createPythonOneInputTransformation( - inputTransformation, + override def translateToExecNode(): ExecNode[_] = { + new BatchExecPythonCorrelate( + joinType, scan.getCall.asInstanceOf[RexCall], - "BatchExecPythonCorrelate", - FlinkTypeFactory.toLogicalRowType(outputRowType), - getConfig(planner.getExecEnv, planner.getTableConfig), - joinType) - if (isPythonWorkerUsingManagedMemory(planner.getTableConfig.getConfiguration)) { - ret.declareManagedMemoryUseCaseAtSlotScope(ManagedMemoryUseCase.PYTHON) - } - ret + condition, + ExecEdge.DEFAULT, + FlinkTypeFactory.toLogicalRowType(getRowType), + getRelDetailedDescription + ) } } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala index c0c7a58..8adf7f5 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkBatchRuleSets.scala @@ -431,9 +431,9 @@ object FlinkBatchRuleSets { BatchExecLookupJoinRule.SNAPSHOT_ON_TABLESCAN, BatchExecLookupJoinRule.SNAPSHOT_ON_CALC_TABLESCAN, // correlate - BatchExecConstantTableFunctionScanRule.INSTANCE, - BatchExecCorrelateRule.INSTANCE, - BatchExecPythonCorrelateRule.INSTANCE, + BatchPhysicalConstantTableFunctionScanRule.INSTANCE, + BatchPhysicalCorrelateRule.INSTANCE, + BatchPhysicalPythonCorrelateRule.INSTANCE, // sink BatchExecSinkRule.INSTANCE, BatchExecLegacySinkRule.INSTANCE diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecConstantTableFunctionScanRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalConstantTableFunctionScanRule.scala similarity index 84% rename from flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecConstantTableFunctionScanRule.scala rename to flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalConstantTableFunctionScanRule.scala index 6a1ba07..fa83ab7 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecConstantTableFunctionScanRule.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalConstantTableFunctionScanRule.scala @@ -20,7 +20,7 @@ package org.apache.flink.table.planner.plan.rules.physical.batch import org.apache.flink.table.planner.plan.nodes.FlinkConventions import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan -import org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchExecCorrelate, BatchExecValues} +import org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchPhysicalCorrelate, BatchExecValues} import com.google.common.collect.ImmutableList import org.apache.calcite.plan.RelOptRule._ @@ -31,7 +31,7 @@ import org.apache.calcite.rex.{RexLiteral, RexUtil} /** * Converts [[FlinkLogicalTableFunctionScan]] with constant RexCall to * {{{ - * [[BatchExecCorrelate]] + * [[BatchPhysicalCorrelate]] * / \ * empty [[BatchExecValues]] [[FlinkLogicalTableFunctionScan]] * }}} @@ -39,15 +39,15 @@ import org.apache.calcite.rex.{RexLiteral, RexUtil} * Add the rule to support select from a UDF directly, such as the following SQL: * SELECT * FROM LATERAL TABLE(func()) as T(c) * - * Note: [[BatchExecCorrelateRule]] is responsible for converting a reasonable physical plan for - * the normal correlate query, such as the following SQL: + * Note: [[BatchPhysicalCorrelateRule]] is responsible for converting a reasonable physical plan + * for the normal correlate query, such as the following SQL: * example1: SELECT * FROM T, LATERAL TABLE(func()) as T(c) * example2: SELECT a, c FROM T, LATERAL TABLE(func(a)) as T(c) */ -class BatchExecConstantTableFunctionScanRule +class BatchPhysicalConstantTableFunctionScanRule extends RelOptRule( operand(classOf[FlinkLogicalTableFunctionScan], any), - "BatchExecTableFunctionScanRule") { + "BatchPhysicalConstantTableFunctionScanRule") { override def matches(call: RelOptRuleCall): Boolean = { val scan: FlinkLogicalTableFunctionScan = call.rel(0) @@ -66,7 +66,7 @@ class BatchExecConstantTableFunctionScanRule ImmutableList.of(ImmutableList.of[RexLiteral]()), cluster.getTypeFactory.createStructType(ImmutableList.of(), ImmutableList.of())) - val correlate = new BatchExecCorrelate( + val correlate = new BatchPhysicalCorrelate( cluster, traitSet, values, @@ -80,6 +80,6 @@ class BatchExecConstantTableFunctionScanRule } -object BatchExecConstantTableFunctionScanRule { - val INSTANCE = new BatchExecConstantTableFunctionScanRule +object BatchPhysicalConstantTableFunctionScanRule { + val INSTANCE = new BatchPhysicalConstantTableFunctionScanRule } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecCorrelateRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalCorrelateRule.scala similarity index 92% rename from flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecCorrelateRule.scala rename to flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalCorrelateRule.scala index 840ade1..bc94541 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchExecCorrelateRule.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalCorrelateRule.scala @@ -20,7 +20,7 @@ package org.apache.flink.table.planner.plan.rules.physical.batch import org.apache.flink.table.planner.plan.nodes.FlinkConventions import org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalCorrelate, FlinkLogicalTableFunctionScan} -import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecCorrelate +import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalCorrelate import org.apache.flink.table.planner.plan.utils.PythonUtil import org.apache.calcite.plan.volcano.RelSubset @@ -29,11 +29,11 @@ import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.convert.ConverterRule import org.apache.calcite.rex.RexNode -class BatchExecCorrelateRule extends ConverterRule( +class BatchPhysicalCorrelateRule extends ConverterRule( classOf[FlinkLogicalCorrelate], FlinkConventions.LOGICAL, FlinkConventions.BATCH_PHYSICAL, - "BatchExecCorrelateRule") { + "BatchPhysicalCorrelateRule") { override def matches(call: RelOptRuleCall): Boolean = { val join = call.rel(0).asInstanceOf[FlinkLogicalCorrelate] @@ -60,7 +60,7 @@ class BatchExecCorrelateRule extends ConverterRule( val convInput: RelNode = RelOptRule.convert(join.getInput(0), FlinkConventions.BATCH_PHYSICAL) val right: RelNode = join.getInput(1) - def convertToCorrelate(relNode: RelNode, condition: Option[RexNode]): BatchExecCorrelate = { + def convertToCorrelate(relNode: RelNode, condition: Option[RexNode]): BatchPhysicalCorrelate = { relNode match { case rel: RelSubset => convertToCorrelate(rel.getRelList.get(0), condition) @@ -71,7 +71,7 @@ class BatchExecCorrelateRule extends ConverterRule( Some(calc.getProgram.expandLocalRef(calc.getProgram.getCondition))) case scan: FlinkLogicalTableFunctionScan => - new BatchExecCorrelate( + new BatchPhysicalCorrelate( rel.getCluster, traitSet, convInput, @@ -86,6 +86,6 @@ class BatchExecCorrelateRule extends ConverterRule( } } -object BatchExecCorrelateRule { - val INSTANCE: RelOptRule = new BatchExecCorrelateRule +object BatchPhysicalCorrelateRule { + val INSTANCE: RelOptRule = new BatchPhysicalCorrelateRule } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataHandlerConsistencyTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataHandlerConsistencyTest.scala index 1176383..6eaadac 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataHandlerConsistencyTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataHandlerConsistencyTest.scala @@ -18,7 +18,7 @@ package org.apache.flink.table.planner.plan.metadata -import org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchExecCorrelate, BatchExecGroupAggregateBase} +import org.apache.flink.table.planner.plan.nodes.physical.batch.{BatchExecGroupAggregateBase, BatchPhysicalCorrelate} import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.core.{Aggregate, Correlate} @@ -145,6 +145,6 @@ object MetadataHandlerConsistencyTest { def parameters(): util.Collection[Array[Any]] = { Seq[Array[Any]]( Array(classOf[Aggregate], classOf[BatchExecGroupAggregateBase]), - Array(classOf[Correlate], classOf[BatchExecCorrelate])) + Array(classOf[Correlate], classOf[BatchPhysicalCorrelate])) } }
