This is an automated email from the ASF dual-hosted git repository. godfrey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit c25870bb43f03941b91c5fd3bcdfd5e22b010ea4 Author: godfreyhe <[email protected]> AuthorDate: Mon Dec 21 15:50:20 2020 +0800 [FLINK-20690][table-planner-blink] Introduce StreamPhysicalCorrelate & StreamPhysicalPythonCorrelate, and make StreamExecCorrelate & StreamExecPythonCorrelate only extended from ExecNode This closes #14443 --- .../nodes/exec/common/CommonExecCorrelate.java | 102 +++++++++++++++++++++ .../nodes/exec/stream/StreamExecCorrelate.java | 59 ++++++++++++ .../logical/CalcPythonCorrelateTransposeRule.java | 10 +- .../rules/logical/PythonCorrelateSplitRule.java | 8 +- ...java => StreamPhysicalPythonCorrelateRule.java} | 24 ++--- .../planner/codegen/CorrelateCodeGenerator.scala | 41 +++------ .../plan/metadata/FlinkRelMdColumnUniqueness.scala | 2 +- .../metadata/FlinkRelMdModifiedMonotonicity.scala | 2 +- .../plan/nodes/common/CommonPythonCorrelate.scala | 16 ++-- .../stream/StreamExecPythonCorrelate.scala | 72 ++++++--------- .../nodes/physical/batch/BatchExecCorrelate.scala | 13 +-- .../physical/batch/BatchExecPythonCorrelate.scala | 7 +- ...rrelate.scala => StreamPhysicalCorrelate.scala} | 55 ++++------- ...ase.scala => StreamPhysicalCorrelateBase.scala} | 7 +- ...e.scala => StreamPhysicalPythonCorrelate.scala} | 44 ++++----- .../FlinkChangelogModeInferenceProgram.scala | 6 +- .../planner/plan/rules/FlinkStreamRuleSets.scala | 6 +- .../SplitPythonConditionFromCorrelateRule.scala | 15 +-- ...eamPhysicalConstantTableFunctionScanRule.scala} | 18 ++-- ...ule.scala => StreamPhysicalCorrelateRule.scala} | 29 ++++-- 20 files changed, 320 insertions(+), 216 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecCorrelate.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecCorrelate.java new file mode 100644 index 0000000..1ec1ccb --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecCorrelate.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.common; + +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.planner.codegen.CodeGeneratorContext; +import org.apache.flink.table.planner.codegen.CorrelateCodeGenerator; +import org.apache.flink.table.planner.delegation.PlannerBase; +import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase; +import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; +import org.apache.flink.table.runtime.operators.join.FlinkJoinType; +import org.apache.flink.table.types.logical.RowType; + +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexProgram; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.Optional; + +/** + * Base {@link ExecNode} which matches along with join a Java/Scala user defined table function. + */ +public abstract class CommonExecCorrelate extends ExecNodeBase<RowData> { + private final FlinkJoinType joinType; + @Nullable + private final RexProgram project; + private final RexCall invocation; + @Nullable + private final RexNode condition; + private final Class<?> operatorBaseClass; + private final boolean retainHeader; + + public CommonExecCorrelate( + FlinkJoinType joinType, + @Nullable RexProgram project, + RexCall invocation, + @Nullable RexNode condition, + Class<?> operatorBaseClass, + boolean retainHeader, + ExecEdge inputEdge, + RowType outputType, + String description) { + super(Collections.singletonList(inputEdge), outputType, description); + this.joinType = joinType; + this.project = project; + this.invocation = invocation; + this.condition = condition; + this.operatorBaseClass = operatorBaseClass; + this.retainHeader = retainHeader; + } + + @SuppressWarnings("unchecked") + @Override + protected Transformation<RowData> translateToPlanInternal(PlannerBase planner) { + final ExecNode<RowData> inputNode = (ExecNode<RowData>) getInputNodes().get(0); + final Transformation<RowData> inputTransform = inputNode.translateToPlan(planner); + final CodeGeneratorContext ctx = new CodeGeneratorContext(planner.getTableConfig()) + .setOperatorBaseClass(operatorBaseClass); + final Transformation<RowData> transform = CorrelateCodeGenerator.generateCorrelateTransformation( + planner.getTableConfig(), + ctx, + inputTransform, + (RowType) inputNode.getOutputType(), + JavaScalaConversionUtil.toScala(Optional.ofNullable(project)), + invocation, + JavaScalaConversionUtil.toScala(Optional.ofNullable(condition)), + (RowType) getOutputType(), + joinType, + inputTransform.getParallelism(), + retainHeader, + getClass().getSimpleName(), + getDesc()); + + if (inputsContainSingleton()) { + transform.setParallelism(1); + transform.setMaxParallelism(1); + } + return transform; + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecCorrelate.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecCorrelate.java new file mode 100644 index 0000000..5270af3 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecCorrelate.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; +import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCorrelate; +import org.apache.flink.table.runtime.operators.AbstractProcessStreamOperator; +import org.apache.flink.table.runtime.operators.join.FlinkJoinType; +import org.apache.flink.table.types.logical.RowType; + +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexProgram; + +import javax.annotation.Nullable; + +/** + * Stream {@link ExecNode} which matches along with join a Java/Scala user defined table function. + */ +public class StreamExecCorrelate extends CommonExecCorrelate implements StreamExecNode<RowData> { + + public StreamExecCorrelate( + FlinkJoinType joinType, + @Nullable RexProgram project, + RexCall invocation, + @Nullable RexNode condition, + ExecEdge inputEdge, + RowType outputType, + String description) { + super( + joinType, + project, + invocation, + condition, + AbstractProcessStreamOperator.class, + true, + inputEdge, + outputType, + description); + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/CalcPythonCorrelateTransposeRule.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/CalcPythonCorrelateTransposeRule.java index 5501949..701b140 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/CalcPythonCorrelateTransposeRule.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/CalcPythonCorrelateTransposeRule.java @@ -22,7 +22,7 @@ import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc; import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCorrelate; import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalRel; import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan; -import org.apache.flink.table.planner.plan.rules.physical.stream.StreamExecCorrelateRule; +import org.apache.flink.table.planner.plan.rules.physical.stream.StreamPhysicalCorrelateRule; import org.apache.flink.table.planner.plan.utils.PythonUtil; import org.apache.calcite.plan.RelOptRule; @@ -58,8 +58,8 @@ public class CalcPythonCorrelateTransposeRule extends RelOptRule { FlinkLogicalCorrelate correlate = call.rel(0); FlinkLogicalCalc right = call.rel(2); JoinRelType joinType = correlate.getJoinType(); - FlinkLogicalCalc mergedCalc = StreamExecCorrelateRule.getMergedCalc(right); - FlinkLogicalTableFunctionScan scan = StreamExecCorrelateRule.getTableScan(mergedCalc); + FlinkLogicalCalc mergedCalc = StreamPhysicalCorrelateRule.getMergedCalc(right); + FlinkLogicalTableFunctionScan scan = StreamPhysicalCorrelateRule.getTableScan(mergedCalc); return joinType == JoinRelType.INNER && PythonUtil.isPythonCall(scan.getCall(), null) && mergedCalc.getProgram().getCondition() != null; @@ -70,8 +70,8 @@ public class CalcPythonCorrelateTransposeRule extends RelOptRule { FlinkLogicalCorrelate correlate = call.rel(0); FlinkLogicalCalc right = call.rel(2); RexBuilder rexBuilder = call.builder().getRexBuilder(); - FlinkLogicalCalc mergedCalc = StreamExecCorrelateRule.getMergedCalc(right); - FlinkLogicalTableFunctionScan tableScan = StreamExecCorrelateRule.getTableScan(mergedCalc); + FlinkLogicalCalc mergedCalc = StreamPhysicalCorrelateRule.getMergedCalc(right); + FlinkLogicalTableFunctionScan tableScan = StreamPhysicalCorrelateRule.getTableScan(mergedCalc); RexProgram mergedCalcProgram = mergedCalc.getProgram(); InputRefRewriter inputRefRewriter = new InputRefRewriter( diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PythonCorrelateSplitRule.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PythonCorrelateSplitRule.java index f935459..f23c503 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PythonCorrelateSplitRule.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PythonCorrelateSplitRule.java @@ -21,7 +21,7 @@ package org.apache.flink.table.planner.plan.rules.logical; import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc; import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCorrelate; import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan; -import org.apache.flink.table.planner.plan.rules.physical.stream.StreamExecCorrelateRule; +import org.apache.flink.table.planner.plan.rules.physical.stream.StreamPhysicalCorrelateRule; import org.apache.flink.table.planner.plan.utils.PythonUtil; import org.apache.calcite.plan.RelOptRule; @@ -88,7 +88,7 @@ public class PythonCorrelateSplitRule extends RelOptRule { if (right instanceof FlinkLogicalTableFunctionScan) { tableFunctionScan = (FlinkLogicalTableFunctionScan) right; } else if (right instanceof FlinkLogicalCalc) { - tableFunctionScan = StreamExecCorrelateRule.getTableScan((FlinkLogicalCalc) right); + tableFunctionScan = StreamPhysicalCorrelateRule.getTableScan((FlinkLogicalCalc) right); } else { return false; } @@ -237,8 +237,8 @@ public class PythonCorrelateSplitRule extends RelOptRule { scan.getCall())); } else { FlinkLogicalCalc calc = (FlinkLogicalCalc) right; - FlinkLogicalTableFunctionScan scan = StreamExecCorrelateRule.getTableScan(calc); - FlinkLogicalCalc mergedCalc = StreamExecCorrelateRule.getMergedCalc(calc); + FlinkLogicalTableFunctionScan scan = StreamPhysicalCorrelateRule.getTableScan(calc); + FlinkLogicalCalc mergedCalc = StreamPhysicalCorrelateRule.getMergedCalc(calc); FlinkLogicalTableFunctionScan newScan = createNewScan(scan, createScalarFunctionSplitter( null, diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecPythonCorrelateRule.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonCorrelateRule.java similarity index 86% rename from flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecPythonCorrelateRule.java rename to flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonCorrelateRule.java index 8c2b12e..b59bbf2 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecPythonCorrelateRule.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonCorrelateRule.java @@ -22,7 +22,7 @@ import org.apache.flink.table.planner.plan.nodes.FlinkConventions; import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc; import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCorrelate; import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan; -import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCorrelate; +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalPythonCorrelate; import org.apache.flink.table.planner.plan.utils.PythonUtil; import org.apache.calcite.plan.RelOptRule; @@ -38,15 +38,15 @@ import scala.Some; /** * The physical rule is responsible for convert {@link FlinkLogicalCorrelate} to - * {@link StreamExecPythonCorrelate}. + * {@link StreamPhysicalPythonCorrelate}. */ -public class StreamExecPythonCorrelateRule extends ConverterRule { +public class StreamPhysicalPythonCorrelateRule extends ConverterRule { - public static final RelOptRule INSTANCE = new StreamExecPythonCorrelateRule(); + public static final RelOptRule INSTANCE = new StreamPhysicalPythonCorrelateRule(); - private StreamExecPythonCorrelateRule() { + private StreamPhysicalPythonCorrelateRule() { super(FlinkLogicalCorrelate.class, FlinkConventions.LOGICAL(), FlinkConventions.STREAM_PHYSICAL(), - "StreamExecPythonCorrelateRule"); + "StreamPhysicalPythonCorrelateRule"); } // find only calc and table function @@ -85,7 +85,7 @@ public class StreamExecPythonCorrelateRule extends ConverterRule { } /** - * The factory is responsible for creating {@link StreamExecPythonCorrelate}. + * The factory is responsible for creating {@link StreamPhysicalPythonCorrelate}. */ private static class StreamExecPythonCorrelateFactory { private final FlinkLogicalCorrelate correlate; @@ -101,11 +101,11 @@ public class StreamExecPythonCorrelateRule extends ConverterRule { this.right = correlate.getInput(1); } - StreamExecPythonCorrelate convertToCorrelate() { + StreamPhysicalPythonCorrelate convertToCorrelate() { return convertToCorrelate(right, Option.empty()); } - private StreamExecPythonCorrelate convertToCorrelate( + private StreamPhysicalPythonCorrelate convertToCorrelate( RelNode relNode, Option<RexNode> condition) { if (relNode instanceof RelSubset) { @@ -113,14 +113,14 @@ public class StreamExecPythonCorrelateRule extends ConverterRule { return convertToCorrelate(rel.getRelList().get(0), condition); } else if (relNode instanceof FlinkLogicalCalc) { FlinkLogicalCalc calc = (FlinkLogicalCalc) relNode; - RelNode tableScan = StreamExecCorrelateRule.getTableScan(calc); - FlinkLogicalCalc newCalc = StreamExecCorrelateRule.getMergedCalc(calc); + RelNode tableScan = StreamPhysicalCorrelateRule.getTableScan(calc); + FlinkLogicalCalc newCalc = StreamPhysicalCorrelateRule.getMergedCalc(calc); return convertToCorrelate( tableScan, Some.apply(newCalc.getProgram().expandLocalRef(newCalc.getProgram().getCondition()))); } else { FlinkLogicalTableFunctionScan scan = (FlinkLogicalTableFunctionScan) relNode; - return new StreamExecPythonCorrelate( + return new StreamPhysicalPythonCorrelate( correlate.getCluster(), traitSet, convInput, diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala index 1976237..000cefb 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala @@ -20,7 +20,6 @@ package org.apache.flink.table.planner.codegen import org.apache.flink.api.common.functions.Function import org.apache.flink.api.dag.Transformation -import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.table.api.{TableConfig, TableException, ValidationException} import org.apache.flink.table.data.utils.JoinedRowData import org.apache.flink.table.data.{GenericRowData, RowData} @@ -30,45 +29,37 @@ import org.apache.flink.table.planner.codegen.CodeGenUtils._ import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction import org.apache.flink.table.planner.functions.utils.TableSqlFunction import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil -import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan import org.apache.flink.table.runtime.operators.CodeGenOperatorFactory +import org.apache.flink.table.runtime.operators.join.FlinkJoinType import org.apache.flink.table.runtime.typeutils.InternalTypeInfo import org.apache.flink.table.runtime.util.StreamRecordCollector import org.apache.flink.table.types.logical.RowType -import org.apache.calcite.rel.`type`.RelDataType -import org.apache.calcite.rel.core.JoinRelType import org.apache.calcite.rex._ import scala.collection.JavaConversions._ object CorrelateCodeGenerator { - private[flink] def generateCorrelateTransformation( + def generateCorrelateTransformation( config: TableConfig, operatorCtx: CodeGeneratorContext, inputTransformation: Transformation[RowData], - inputRelType: RelDataType, + inputType: RowType, projectProgram: Option[RexProgram], - scan: FlinkLogicalTableFunctionScan, + invocation: RexCall, condition: Option[RexNode], - outRelType: RelDataType, - joinType: JoinRelType, + outputType: RowType, + joinType: FlinkJoinType, parallelism: Int, retainHeader: Boolean, - expression: (RexNode, List[String], Option[List[RexNode]]) => String, opName: String, transformationName: String) : Transformation[RowData] = { - val funcRel = scan.asInstanceOf[FlinkLogicalTableFunctionScan] - val rexCall = funcRel.getCall.asInstanceOf[RexCall] - val inputType = FlinkTypeFactory.toLogicalRowType(inputRelType) - val returnType = FlinkTypeFactory.toLogicalRowType(outRelType) - // according to the SQL standard, every scalar function should also be a table function // but we don't allow that for now - rexCall.getOperator match { + invocation.getOperator match { case func: BridgingSqlFunction if func.getDefinition.getKind == FunctionKind.TABLE => // ok case _: TableSqlFunction => // ok case f@_ => @@ -82,7 +73,7 @@ object CorrelateCodeGenerator { val selects = program.getProjectList.map(_.getIndex) val inputFieldCnt = program.getInputRowType.getFieldCount val swallowInputOnly = selects.head > inputFieldCnt && - (inputFieldCnt - outRelType.getFieldCount == inputRelType.getFieldCount) + (inputFieldCnt - outputType.getFieldCount == inputType.getFieldCount) // partial output or output right only swallowInputOnly } else { @@ -93,7 +84,7 @@ object CorrelateCodeGenerator { // adjust indicies of InputRefs to adhere to schema expected by generator val changeInputRefIndexShuttle = new RexShuttle { override def visitInputRef(inputRef: RexInputRef): RexNode = { - new RexInputRef(inputRelType.getFieldCount + inputRef.getIndex, inputRef.getType) + new RexInputRef(inputType.getFieldCount + inputRef.getIndex, inputRef.getType) } } @@ -104,18 +95,17 @@ object CorrelateCodeGenerator { projectProgram, swallowInputOnly, condition.map(_.accept(changeInputRefIndexShuttle)), - returnType, + outputType, joinType, - rexCall, + invocation, opName, - classOf[ProcessFunction[RowData, RowData]], retainHeader) ExecNodeUtil.createOneInputTransformation( inputTransformation, transformationName, substituteStreamOperator, - InternalTypeInfo.of(returnType), + InternalTypeInfo.of(outputType), parallelism, 0) } @@ -131,10 +121,9 @@ object CorrelateCodeGenerator { swallowInputOnly: Boolean = false, condition: Option[RexNode], returnType: RowType, - joinType: JoinRelType, + joinType: FlinkJoinType, rexCall: RexCall, ruleDescription: String, - functionClass: Class[T], retainHeader: Boolean = true) : CodeGenOperatorFactory[RowData] = { @@ -177,7 +166,7 @@ object CorrelateCodeGenerator { |""".stripMargin // 3. left join - if (joinType == JoinRelType.LEFT) { + if (joinType == FlinkJoinType.LEFT) { if (swallowInputOnly) { // and the returned row table function is empty, collect a null val nullRowTerm = CodeGenUtils.newName("nullRow") @@ -250,7 +239,7 @@ object CorrelateCodeGenerator { |""".stripMargin } - } else if (joinType != JoinRelType.INNER) { + } else if (joinType != FlinkJoinType.INNER) { throw new TableException(s"Unsupported JoinRelType: $joinType for correlate join.") } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniqueness.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniqueness.scala index ecd59ab..78a69f1 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniqueness.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnUniqueness.scala @@ -628,7 +628,7 @@ class FlinkRelMdColumnUniqueness private extends MetadataHandler[BuiltInMetadata ignoreNulls: Boolean): JBoolean = null def areColumnsUnique( - rel: StreamExecCorrelate, + rel: StreamPhysicalCorrelate, mq: RelMetadataQuery, columns: ImmutableBitSet, ignoreNulls: Boolean): JBoolean = null diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicity.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicity.scala index 016ae28..250c169 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicity.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdModifiedMonotonicity.scala @@ -506,7 +506,7 @@ class FlinkRelMdModifiedMonotonicity private extends MetadataHandler[ModifiedMon mq: RelMetadataQuery): RelModifiedMonotonicity = null def getRelModifiedMonotonicity( - rel: StreamExecCorrelate, + rel: StreamPhysicalCorrelate, mq: RelMetadataQuery): RelModifiedMonotonicity = { getMonotonicity(rel.getInput(0), mq, rel.getRowType.getFieldCount) } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPythonCorrelate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPythonCorrelate.scala index 2534793..9f1cb92 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPythonCorrelate.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPythonCorrelate.scala @@ -18,21 +18,19 @@ package org.apache.flink.table.planner.plan.nodes.common -import org.apache.calcite.rel.`type`.RelDataType -import org.apache.calcite.rel.core.JoinRelType -import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode} import org.apache.flink.api.dag.Transformation import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.operators.OneInputStreamOperator import org.apache.flink.streaming.api.transformations.OneInputTransformation import org.apache.flink.table.data.RowData import org.apache.flink.table.functions.python.PythonFunctionInfo -import org.apache.flink.table.planner.calcite.FlinkTypeFactory import org.apache.flink.table.planner.plan.nodes.common.CommonPythonCorrelate.PYTHON_TABLE_FUNCTION_OPERATOR_NAME -import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan import org.apache.flink.table.runtime.typeutils.InternalTypeInfo import org.apache.flink.table.types.logical.RowType +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode} + import scala.collection.mutable trait CommonPythonCorrelate extends CommonPythonBase { @@ -73,18 +71,16 @@ trait CommonPythonCorrelate extends CommonPythonBase { protected def createPythonOneInputTransformation( inputTransform: Transformation[RowData], - scan: FlinkLogicalTableFunctionScan, + pythonTableFuncRexCall: RexCall, name: String, - outputRowType: RelDataType, + outputRowType: RowType, config: Configuration, joinType: JoinRelType): OneInputTransformation[RowData, RowData] = { - val pythonTableFuncRexCall = scan.getCall.asInstanceOf[RexCall] val (pythonUdtfInputOffsets, pythonFunctionInfo) = extractPythonTableFunctionInfo(pythonTableFuncRexCall) val pythonOperatorInputRowType = inputTransform.getOutputType .asInstanceOf[InternalTypeInfo[RowData]] - val pythonOperatorOutputRowType = InternalTypeInfo.of( - FlinkTypeFactory.toLogicalType(outputRowType).asInstanceOf[RowType]) + val pythonOperatorOutputRowType = InternalTypeInfo.of(outputRowType) val pythonOperator = getPythonTableFunctionOperator( config, pythonOperatorInputRowType, diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecPythonCorrelate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonCorrelate.scala similarity index 57% copy from flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecPythonCorrelate.scala copy to flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonCorrelate.scala index 3dc7699..6b7bc83 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecPythonCorrelate.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonCorrelate.scala @@ -15,74 +15,54 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.table.planner.plan.nodes.physical.stream +package org.apache.flink.table.planner.plan.nodes.exec.stream import org.apache.flink.api.dag.Transformation import org.apache.flink.core.memory.ManagedMemoryUseCase import org.apache.flink.table.api.TableException import org.apache.flink.table.data.RowData -import org.apache.flink.table.planner.delegation.StreamPlanner +import org.apache.flink.table.planner.delegation.PlannerBase import org.apache.flink.table.planner.plan.nodes.common.CommonPythonCorrelate -import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan +import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, ExecNodeBase} +import org.apache.flink.table.types.logical.RowType -import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} -import org.apache.calcite.rel.RelNode -import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.JoinRelType -import org.apache.calcite.rex.{RexNode, RexProgram} +import org.apache.calcite.rex.{RexCall, RexNode} + +import java.util.Collections /** - * Flink RelNode which matches along with join a python user defined table function. - */ + * Stream exec node which matches along with join a python user defined table function. + * + * <p>Note: This class can't be ported to Java, + * because java class can't extend scala interface with default implementation. + * FLINK-20693 will port this class to Java. + * + * <p>TODO change JoinRelType to FlinkJoinType + */ class StreamExecPythonCorrelate( - cluster: RelOptCluster, - traitSet: RelTraitSet, - inputRel: RelNode, - projectProgram: Option[RexProgram], - scan: FlinkLogicalTableFunctionScan, + joinType: JoinRelType, + invocation: RexCall, condition: Option[RexNode], - outputRowType: RelDataType, - joinType: JoinRelType) - extends StreamExecCorrelateBase( - cluster, - traitSet, - inputRel, - projectProgram, - scan, - condition, - outputRowType, - joinType) + inputEdge: ExecEdge, + outputType: RowType, + description: String) + extends ExecNodeBase[RowData](Collections.singletonList(inputEdge), outputType, description) + with StreamExecNode[RowData] with CommonPythonCorrelate { - if (condition.isDefined) { + if (joinType == JoinRelType.LEFT && condition.isDefined) { throw new TableException("Currently Python correlate does not support conditions in left join.") } - def copy( - traitSet: RelTraitSet, - newChild: RelNode, - projectProgram: Option[RexProgram], - outputType: RelDataType): RelNode = { - new StreamExecPythonCorrelate( - cluster, - traitSet, - newChild, - projectProgram, - scan, - condition, - outputType, - joinType) - } - - override protected def translateToPlanInternal( - planner: StreamPlanner): Transformation[RowData] = { + override protected def translateToPlanInternal(planner: PlannerBase): Transformation[RowData] = { val inputTransformation = getInputNodes.get(0).translateToPlan(planner) .asInstanceOf[Transformation[RowData]] val ret = createPythonOneInputTransformation( inputTransformation, - scan, + invocation, "StreamExecPythonCorrelate", - outputRowType, + getOutputType.asInstanceOf[RowType], getConfig(planner.getExecEnv, planner.getTableConfig), joinType) if (isPythonWorkerUsingManagedMemory(planner.getTableConfig.getConfiguration)) { diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCorrelate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCorrelate.scala index 1dec67d..405b918 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCorrelate.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCorrelate.scala @@ -19,15 +19,17 @@ package org.apache.flink.table.planner.plan.nodes.physical.batch import org.apache.flink.api.dag.Transformation import org.apache.flink.table.data.RowData +import org.apache.flink.table.planner.calcite.FlinkTypeFactory import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, CorrelateCodeGenerator} import org.apache.flink.table.planner.delegation.BatchPlanner import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan +import org.apache.flink.table.planner.plan.utils.JoinTypeUtil import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.{Correlate, JoinRelType} -import org.apache.calcite.rex.{RexNode, RexProgram} +import org.apache.calcite.rex.{RexCall, RexNode, RexProgram} /** * Batch physical RelNode for [[Correlate]] (Java/Scala user defined table function). @@ -77,15 +79,14 @@ class BatchExecCorrelate( config, operatorCtx, inputTransformation, - input.getRowType, + FlinkTypeFactory.toLogicalRowType(input.getRowType), projectProgram, - scan, + scan.getCall.asInstanceOf[RexCall], condition, - outputRowType, - joinType, + FlinkTypeFactory.toLogicalRowType(outputRowType), + JoinTypeUtil.getFlinkJoinType(joinType), inputTransformation.getParallelism, retainHeader = false, - getExpressionString, "BatchExecCorrelate", getRelDetailedDescription) } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecPythonCorrelate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecPythonCorrelate.scala index 6ff8307..f9e6c29 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecPythonCorrelate.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecPythonCorrelate.scala @@ -20,6 +20,7 @@ package org.apache.flink.table.planner.plan.nodes.physical.batch import org.apache.flink.api.dag.Transformation import org.apache.flink.core.memory.ManagedMemoryUseCase import org.apache.flink.table.data.RowData +import org.apache.flink.table.planner.calcite.FlinkTypeFactory import org.apache.flink.table.planner.delegation.BatchPlanner import org.apache.flink.table.planner.plan.nodes.common.CommonPythonCorrelate import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan @@ -28,7 +29,7 @@ import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.{Correlate, JoinRelType} -import org.apache.calcite.rex.{RexNode, RexProgram} +import org.apache.calcite.rex.{RexCall, RexNode, RexProgram} /** * Batch physical RelNode for [[Correlate]] (Python user defined table function). @@ -75,9 +76,9 @@ class BatchExecPythonCorrelate( .asInstanceOf[Transformation[RowData]] val ret = createPythonOneInputTransformation( inputTransformation, - scan, + scan.getCall.asInstanceOf[RexCall], "BatchExecPythonCorrelate", - outputRowType, + FlinkTypeFactory.toLogicalRowType(outputRowType), getConfig(planner.getExecEnv, planner.getTableConfig), joinType) if (isPythonWorkerUsingManagedMemory(planner.getTableConfig.getConfiguration)) { diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecCorrelate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalCorrelate.scala similarity index 56% rename from flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecCorrelate.scala rename to flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalCorrelate.scala index d18832b..70bf369 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecCorrelate.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalCorrelate.scala @@ -17,23 +17,22 @@ */ package org.apache.flink.table.planner.plan.nodes.physical.stream -import org.apache.flink.api.dag.Transformation -import org.apache.flink.table.data.RowData -import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, CorrelateCodeGenerator} -import org.apache.flink.table.planner.delegation.StreamPlanner +import org.apache.flink.table.planner.calcite.FlinkTypeFactory +import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCorrelate +import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, ExecNode} import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan -import org.apache.flink.table.runtime.operators.AbstractProcessStreamOperator +import org.apache.flink.table.planner.plan.utils.JoinTypeUtil import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.JoinRelType -import org.apache.calcite.rex.{RexNode, RexProgram} +import org.apache.calcite.rex.{RexCall, RexNode, RexProgram} /** - * Flink RelNode which matches along with join a Java/Scala user defined table function. - */ -class StreamExecCorrelate( + * Flink RelNode which matches along with join a Java/Scala user defined table function. + */ +class StreamPhysicalCorrelate( cluster: RelOptCluster, traitSet: RelTraitSet, inputRel: RelNode, @@ -42,7 +41,7 @@ class StreamExecCorrelate( condition: Option[RexNode], outputRowType: RelDataType, joinType: JoinRelType) - extends StreamExecCorrelateBase( + extends StreamPhysicalCorrelateBase( cluster, traitSet, inputRel, @@ -57,7 +56,7 @@ class StreamExecCorrelate( newChild: RelNode, projectProgram: Option[RexProgram], outputType: RelDataType): RelNode = { - new StreamExecCorrelate( + new StreamPhysicalCorrelate( cluster, traitSet, newChild, @@ -68,32 +67,14 @@ class StreamExecCorrelate( joinType) } - override protected def translateToPlanInternal( - planner: StreamPlanner): Transformation[RowData] = { - val tableConfig = planner.getTableConfig - val inputTransformation = getInputNodes.get(0).translateToPlan(planner) - .asInstanceOf[Transformation[RowData]] - val operatorCtx = CodeGeneratorContext(tableConfig) - .setOperatorBaseClass(classOf[AbstractProcessStreamOperator[_]]) - val transform = CorrelateCodeGenerator.generateCorrelateTransformation( - tableConfig, - operatorCtx, - inputTransformation, - inputRel.getRowType, - projectProgram, - scan, - condition, - outputRowType, - joinType, - inputTransformation.getParallelism, - retainHeader = true, - getExpressionString, - "StreamExecCorrelate", + override def translateToExecNode(): ExecNode[_] = { + new StreamExecCorrelate( + JoinTypeUtil.getFlinkJoinType(joinType), + projectProgram.orNull, + scan.getCall.asInstanceOf[RexCall], + condition.orNull, + ExecEdge.DEFAULT, + FlinkTypeFactory.toLogicalRowType(getRowType), getRelDetailedDescription) - if (inputsContainSingleton()) { - transform.setParallelism(1) - transform.setMaxParallelism(1) - } - transform } } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecCorrelateBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalCorrelateBase.scala similarity index 92% rename from flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecCorrelateBase.scala rename to flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalCorrelateBase.scala index b129e8e..0f0642c 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecCorrelateBase.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalCorrelateBase.scala @@ -17,8 +17,6 @@ */ package org.apache.flink.table.planner.plan.nodes.physical.stream -import org.apache.flink.table.data.RowData -import org.apache.flink.table.planner.plan.nodes.exec.LegacyStreamExecNode import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan import org.apache.flink.table.planner.plan.utils.RelExplainUtil @@ -33,7 +31,7 @@ import scala.collection.JavaConversions._ /** * Base Flink RelNode which matches along with join a user defined table function. */ -abstract class StreamExecCorrelateBase( +abstract class StreamPhysicalCorrelateBase( cluster: RelOptCluster, traitSet: RelTraitSet, inputRel: RelNode, @@ -43,8 +41,7 @@ abstract class StreamExecCorrelateBase( outputRowType: RelDataType, joinType: JoinRelType) extends SingleRel(cluster, traitSet, inputRel) - with StreamPhysicalRel - with LegacyStreamExecNode[RowData] { + with StreamPhysicalRel { require(joinType == JoinRelType.INNER || joinType == JoinRelType.LEFT) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecPythonCorrelate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalPythonCorrelate.scala similarity index 64% rename from flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecPythonCorrelate.scala rename to flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalPythonCorrelate.scala index 3dc7699..9a29323 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecPythonCorrelate.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalPythonCorrelate.scala @@ -17,24 +17,22 @@ */ package org.apache.flink.table.planner.plan.nodes.physical.stream -import org.apache.flink.api.dag.Transformation -import org.apache.flink.core.memory.ManagedMemoryUseCase -import org.apache.flink.table.api.TableException -import org.apache.flink.table.data.RowData -import org.apache.flink.table.planner.delegation.StreamPlanner +import org.apache.flink.table.planner.calcite.FlinkTypeFactory import org.apache.flink.table.planner.plan.nodes.common.CommonPythonCorrelate +import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonCorrelate +import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, ExecNode} import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.JoinRelType -import org.apache.calcite.rex.{RexNode, RexProgram} +import org.apache.calcite.rex.{RexCall, RexNode, RexProgram} /** * Flink RelNode which matches along with join a python user defined table function. */ -class StreamExecPythonCorrelate( +class StreamPhysicalPythonCorrelate( cluster: RelOptCluster, traitSet: RelTraitSet, inputRel: RelNode, @@ -43,7 +41,7 @@ class StreamExecPythonCorrelate( condition: Option[RexNode], outputRowType: RelDataType, joinType: JoinRelType) - extends StreamExecCorrelateBase( + extends StreamPhysicalCorrelateBase( cluster, traitSet, inputRel, @@ -54,16 +52,12 @@ class StreamExecPythonCorrelate( joinType) with CommonPythonCorrelate { - if (condition.isDefined) { - throw new TableException("Currently Python correlate does not support conditions in left join.") - } - def copy( traitSet: RelTraitSet, newChild: RelNode, projectProgram: Option[RexProgram], outputType: RelDataType): RelNode = { - new StreamExecPythonCorrelate( + new StreamPhysicalPythonCorrelate( cluster, traitSet, newChild, @@ -74,20 +68,14 @@ class StreamExecPythonCorrelate( joinType) } - override protected def translateToPlanInternal( - planner: StreamPlanner): Transformation[RowData] = { - val inputTransformation = getInputNodes.get(0).translateToPlan(planner) - .asInstanceOf[Transformation[RowData]] - val ret = createPythonOneInputTransformation( - inputTransformation, - scan, - "StreamExecPythonCorrelate", - outputRowType, - getConfig(planner.getExecEnv, planner.getTableConfig), - joinType) - if (isPythonWorkerUsingManagedMemory(planner.getTableConfig.getConfiguration)) { - ret.declareManagedMemoryUseCaseAtSlotScope(ManagedMemoryUseCase.PYTHON) - } - ret + override def translateToExecNode(): ExecNode[_] = { + new StreamExecPythonCorrelate( + joinType, + scan.getCall.asInstanceOf[RexCall], + condition, + ExecEdge.DEFAULT, + FlinkTypeFactory.toLogicalRowType(getRowType), + getRelDetailedDescription + ) } } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala index 0c46116..6b5643f 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala @@ -271,8 +271,8 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti val leftTrait = children.head.getTraitSet.getTrait(ModifyKindSetTraitDef.INSTANCE) createNewNode(temporalJoin, children, leftTrait, requiredTrait, requester) - case _: StreamPhysicalCalcBase | _: StreamExecCorrelate | - _: StreamExecPythonCorrelate | _: StreamExecLookupJoin | _: StreamPhysicalExchange | + case _: StreamPhysicalCalcBase | _: StreamPhysicalCorrelateBase | + _: StreamExecLookupJoin | _: StreamPhysicalExchange | _: StreamExecExpand | _: StreamExecMiniBatchAssigner | _: StreamPhysicalWatermarkAssigner => // transparent forward requiredTrait to children @@ -571,7 +571,7 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti } } - case _: StreamExecCorrelate | _: StreamExecPythonCorrelate | _: StreamExecLookupJoin | + case _: StreamPhysicalCorrelateBase | _: StreamExecLookupJoin | _: StreamPhysicalExchange | _: StreamExecExpand | _: StreamExecMiniBatchAssigner | _: StreamPhysicalWatermarkAssigner => // transparent forward requiredTrait to children diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala index 59948b0..12855b8 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/FlinkStreamRuleSets.scala @@ -437,9 +437,9 @@ object FlinkStreamRuleSets { // CEP StreamExecMatchRule.INSTANCE, // correlate - StreamExecConstantTableFunctionScanRule.INSTANCE, - StreamExecCorrelateRule.INSTANCE, - StreamExecPythonCorrelateRule.INSTANCE, + StreamPhysicalConstantTableFunctionScanRule.INSTANCE, + StreamPhysicalCorrelateRule.INSTANCE, + StreamPhysicalPythonCorrelateRule.INSTANCE, // sink StreamExecSinkRule.INSTANCE, StreamExecLegacySinkRule.INSTANCE diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitPythonConditionFromCorrelateRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitPythonConditionFromCorrelateRule.scala index 4806d40..9663152 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitPythonConditionFromCorrelateRule.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/SplitPythonConditionFromCorrelateRule.scala @@ -18,14 +18,15 @@ package org.apache.flink.table.planner.plan.rules.logical +import org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalCorrelate, FlinkLogicalRel, FlinkLogicalTableFunctionScan} +import org.apache.flink.table.planner.plan.rules.physical.stream.StreamPhysicalCorrelateRule +import org.apache.flink.table.planner.plan.utils.PythonUtil.{containsPythonCall, isNonPythonCall} +import org.apache.flink.table.planner.plan.utils.RexDefaultVisitor + import org.apache.calcite.plan.RelOptRule.{any, operand} import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptUtil} import org.apache.calcite.rel.core.JoinRelType import org.apache.calcite.rex._ -import org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalCorrelate, FlinkLogicalRel, FlinkLogicalTableFunctionScan} -import org.apache.flink.table.planner.plan.rules.physical.stream.StreamExecCorrelateRule -import org.apache.flink.table.planner.plan.utils.PythonUtil.{containsPythonCall, isNonPythonCall} -import org.apache.flink.table.planner.plan.utils.RexDefaultVisitor import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ @@ -51,8 +52,8 @@ class SplitPythonConditionFromCorrelateRule val correlate: FlinkLogicalCorrelate = call.rel(0).asInstanceOf[FlinkLogicalCorrelate] val right: FlinkLogicalCalc = call.rel(2).asInstanceOf[FlinkLogicalCalc] val joinType: JoinRelType = correlate.getJoinType - val mergedCalc = StreamExecCorrelateRule.getMergedCalc(right) - val tableScan = StreamExecCorrelateRule + val mergedCalc = StreamPhysicalCorrelateRule.getMergedCalc(right) + val tableScan = StreamPhysicalCorrelateRule .getTableScan(mergedCalc) .asInstanceOf[FlinkLogicalTableFunctionScan] joinType == JoinRelType.INNER && @@ -66,7 +67,7 @@ class SplitPythonConditionFromCorrelateRule val correlate: FlinkLogicalCorrelate = call.rel(0).asInstanceOf[FlinkLogicalCorrelate] val right: FlinkLogicalCalc = call.rel(2).asInstanceOf[FlinkLogicalCalc] val rexBuilder = call.builder().getRexBuilder - val mergedCalc = StreamExecCorrelateRule.getMergedCalc(right) + val mergedCalc = StreamPhysicalCorrelateRule.getMergedCalc(right) val mergedCalcProgram = mergedCalc.getProgram val input = mergedCalc.getInput diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecConstantTableFunctionScanRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalConstantTableFunctionScanRule.scala similarity index 83% rename from flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecConstantTableFunctionScanRule.scala rename to flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalConstantTableFunctionScanRule.scala index 23eb212..ed9edbf 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecConstantTableFunctionScanRule.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalConstantTableFunctionScanRule.scala @@ -20,7 +20,7 @@ package org.apache.flink.table.planner.plan.rules.physical.stream import org.apache.flink.table.planner.plan.nodes.FlinkConventions import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableFunctionScan -import org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamExecCorrelate, StreamExecValues} +import org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamPhysicalCorrelate, StreamExecValues} import com.google.common.collect.ImmutableList import org.apache.calcite.plan.RelOptRule._ @@ -31,7 +31,7 @@ import org.apache.calcite.rex.{RexLiteral, RexUtil} /** * Converts [[FlinkLogicalTableFunctionScan]] with constant RexCall to * {{{ - * [[StreamExecCorrelate]] + * [[StreamPhysicalCorrelate]] * / \ * empty [[StreamExecValues]] [[FlinkLogicalTableFunctionScan]] * }}} @@ -39,15 +39,15 @@ import org.apache.calcite.rex.{RexLiteral, RexUtil} * Add the rule to support select from a UDF directly, such as the following SQL: * SELECT * FROM LATERAL TABLE(func()) as T(c) * - * Note: [[StreamExecCorrelateRule]] is responsible for converting a reasonable physical plan for - * the normal correlate query, such as the following SQL: + * Note: [[StreamPhysicalCorrelateRule]] is responsible for converting a reasonable physical plan + * for the normal correlate query, such as the following SQL: * example1: SELECT * FROM T, LATERAL TABLE(func()) as T(c) * example2: SELECT a, c FROM T, LATERAL TABLE(func(a)) as T(c) */ -class StreamExecConstantTableFunctionScanRule +class StreamPhysicalConstantTableFunctionScanRule extends RelOptRule( operand(classOf[FlinkLogicalTableFunctionScan], any), - "StreamExecConstantTableFunctionScanRule") { + "StreamPhysicalConstantTableFunctionScanRule") { override def matches(call: RelOptRuleCall): Boolean = { val scan: FlinkLogicalTableFunctionScan = call.rel(0) @@ -66,7 +66,7 @@ class StreamExecConstantTableFunctionScanRule ImmutableList.of(ImmutableList.of[RexLiteral]()), cluster.getTypeFactory.createStructType(ImmutableList.of(), ImmutableList.of())) - val correlate = new StreamExecCorrelate( + val correlate = new StreamPhysicalCorrelate( cluster, traitSet, values, @@ -80,6 +80,6 @@ class StreamExecConstantTableFunctionScanRule } -object StreamExecConstantTableFunctionScanRule { - val INSTANCE = new StreamExecConstantTableFunctionScanRule +object StreamPhysicalConstantTableFunctionScanRule { + val INSTANCE = new StreamPhysicalConstantTableFunctionScanRule } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecCorrelateRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalCorrelateRule.scala similarity index 89% rename from flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecCorrelateRule.scala rename to flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalCorrelateRule.scala index 776d81a..182ebeb 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecCorrelateRule.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalCorrelateRule.scala @@ -17,31 +17,36 @@ */ package org.apache.flink.table.planner.plan.rules.physical.stream -import org.apache.calcite.plan.hep.HepRelVertex import org.apache.flink.table.api.TableException import org.apache.flink.table.planner.plan.nodes.FlinkConventions import org.apache.flink.table.planner.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalCorrelate, FlinkLogicalTableFunctionScan} -import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCorrelate -import org.apache.flink.table.planner.plan.rules.physical.stream.StreamExecCorrelateRule.{getMergedCalc, getTableScan} +import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalCorrelate +import org.apache.flink.table.planner.plan.rules.physical.stream.StreamPhysicalCorrelateRule.{getMergedCalc, getTableScan} +import org.apache.flink.table.planner.plan.utils.PythonUtil + +import org.apache.calcite.plan.hep.HepRelVertex import org.apache.calcite.plan.volcano.RelSubset import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet} import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.convert.ConverterRule import org.apache.calcite.rex.{RexNode, RexProgram, RexProgramBuilder} -import org.apache.flink.table.planner.plan.utils.PythonUtil -class StreamExecCorrelateRule +/** + * Rule that converts [[FlinkLogicalCorrelate]] to [[StreamPhysicalCorrelate]]. + */ +class StreamPhysicalCorrelateRule extends ConverterRule( classOf[FlinkLogicalCorrelate], FlinkConventions.LOGICAL, FlinkConventions.STREAM_PHYSICAL, - "StreamExecCorrelateRule") { + "StreamPhysicalCorrelateRule") { override def matches(call: RelOptRuleCall): Boolean = { val correlate: FlinkLogicalCorrelate = call.rel(0) val right = correlate.getRight.asInstanceOf[RelSubset].getOriginal // find only calc and table function + @scala.annotation.tailrec def findTableFunction(calc: FlinkLogicalCalc): Boolean = { val child = calc.getInput.asInstanceOf[RelSubset].getOriginal child match { @@ -67,7 +72,10 @@ class StreamExecCorrelateRule correlate.getInput(0), FlinkConventions.STREAM_PHYSICAL) val right: RelNode = correlate.getInput(1) - def convertToCorrelate(relNode: RelNode, condition: Option[RexNode]): StreamExecCorrelate = { + @scala.annotation.tailrec + def convertToCorrelate( + relNode: RelNode, + condition: Option[RexNode]): StreamPhysicalCorrelate = { relNode match { case rel: RelSubset => convertToCorrelate(rel.getRelList.get(0), condition) @@ -80,7 +88,7 @@ class StreamExecCorrelateRule Some(newCalc.getProgram.expandLocalRef(newCalc.getProgram.getCondition))) case scan: FlinkLogicalTableFunctionScan => - new StreamExecCorrelate( + new StreamPhysicalCorrelate( rel.getCluster, traitSet, convInput, @@ -96,8 +104,8 @@ class StreamExecCorrelateRule } -object StreamExecCorrelateRule { - val INSTANCE: RelOptRule = new StreamExecCorrelateRule +object StreamPhysicalCorrelateRule { + val INSTANCE: RelOptRule = new StreamPhysicalCorrelateRule def getMergedCalc(calc: FlinkLogicalCalc): FlinkLogicalCalc = { val child = calc.getInput match { @@ -122,6 +130,7 @@ object StreamExecCorrelateRule { } } + @scala.annotation.tailrec def getTableScan(calc: FlinkLogicalCalc): FlinkLogicalTableFunctionScan = { val child = calc.getInput match { case relSubset: RelSubset => relSubset.getOriginal
