godfreyhe commented on a change in pull request #13742:
URL: https://github.com/apache/flink/pull/13742#discussion_r510718313
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
##########
@@ -26,20 +26,20 @@ import
org.apache.flink.table.operations.{CatalogSinkModifyOperation, ModifyOper
import org.apache.flink.table.planner.operations.PlannerQueryOperation
import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistributionTraitDef
import org.apache.flink.table.planner.plan.nodes.exec.{BatchExecNode, ExecNode}
-import org.apache.flink.table.planner.plan.nodes.process.DAGProcessContext
+import org.apache.flink.table.planner.plan.nodes.process.{DAGProcessContext,
DAGProcessor}
import
org.apache.flink.table.planner.plan.optimize.{BatchCommonSubGraphBasedOptimizer,
Optimizer}
-import org.apache.flink.table.planner.plan.reuse.DeadlockBreakupProcessor
+import org.apache.flink.table.planner.plan.reuse.{DeadlockBreakupProcessor,
MultipleInputNodeCreationProcessor}
import org.apache.flink.table.planner.plan.utils.{ExecNodePlanDumper,
FlinkRelOptUtil}
import org.apache.flink.table.planner.sinks.{BatchSelectTableSink,
SelectTableSinkBase}
import org.apache.flink.table.planner.utils.{DummyStreamExecutionEnvironment,
ExecutorUtils, PlanUtil}
-
import org.apache.calcite.plan.{ConventionTraitDef, RelTrait, RelTraitDef}
import org.apache.calcite.rel.logical.LogicalTableModify
import org.apache.calcite.rel.{RelCollationTraitDef, RelNode}
import org.apache.calcite.sql.SqlExplainLevel
-
import java.util
+import org.apache.flink.table.api.config.OptimizerConfigOptions
Review comment:
reorder the import
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java
##########
@@ -99,4 +99,12 @@
key("table.optimizer.join-reorder-enabled")
.defaultValue(false)
.withDescription("Enables join reorder in optimizer.
Default is disabled.");
+
+ @Documentation.TableOption(execMode =
Documentation.ExecMode.BATCH_STREAMING)
+ public static final ConfigOption<Boolean>
TABLE_OPTIMIZER_MULTIPLE_INPUT_ENABLED =
+ key("table.optimizer.multiple-input-enabled")
+ .defaultValue(false)
+ .withDescription("Enables creating multiple input nodes
to reduce shuffling. " +
Review comment:
nodes -> operators
##########
File path:
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/reuse/MultipleInputNodeCreationProcessor.java
##########
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.reuse;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.transformations.ShuffleMode;
+import org.apache.flink.streaming.api.transformations.SourceTransformation;
+import
org.apache.flink.table.planner.plan.nodes.exec.AbstractExecNodeExactlyOnceVisitor;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecBoundedStreamScan;
+import
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecExchange;
+import
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecMultipleInputNode;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecUnion;
+import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecDataStreamScan;
+import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange;
+import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecMultipleInputNode;
+import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecUnion;
+import org.apache.flink.table.planner.plan.nodes.process.DAGProcessContext;
+import org.apache.flink.table.planner.plan.nodes.process.DAGProcessor;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelNode;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+/**
+ * A {@link DAGProcessor} which organize {@link ExecNode}s into multiple input
nodes.
+ *
+ * <p>For a detailed explanation of the algorithm, see appendix of the
+ * <a
href="https://docs.google.com/document/d/1qKVohV12qn-bM51cBZ8Hcgp31ntwClxjoiNBUOqVHsI">design
doc</a>.
+ */
+public class MultipleInputNodeCreationProcessor implements DAGProcessor {
+
+ private final boolean isStreaming;
+
+ public MultipleInputNodeCreationProcessor(boolean isStreaming) {
+ this.isStreaming = isStreaming;
+ }
+
+ @Override
+ public List<ExecNode<?, ?>> process(List<ExecNode<?, ?>> sinkNodes,
DAGProcessContext context) {
+ if (!isStreaming) {
+ // As multiple input nodes use function call to deliver
records between sub-operators,
+ // we cannot rely on network buffers to buffer records
not yet ready to be read,
+ // so only BLOCKING dam behavior is safe here.
+ // If conflict is detected under this stricter
constraint,
+ // we add a PIPELINED exchange to mark that its input
and output node cannot be merged
+ // into the same multiple input node
+ InputPriorityConflictResolver resolver = new
InputPriorityConflictResolver(
+ sinkNodes,
+ Collections.emptySet(),
+ ExecEdge.DamBehavior.BLOCKING,
+ ShuffleMode.PIPELINED);
+ resolver.detectAndResolve();
+ }
+
+ List<ExecNodeWrapper> sinkWrappers = wrapExecNodes(sinkNodes);
+ // sort all nodes in topological order, sinks come first and
sources come last
+ List<ExecNodeWrapper> orderedWrappers =
topologicalSort(sinkWrappers);
+ // group nodes into multiple input groups
+ createMultipleInputGroups(orderedWrappers);
+ // apply optimizations to remove unnecessary nodes out of
multiple input groups
+ optimizeMultipleInputGroups(orderedWrappers);
+
+ // create the real multiple input nodes
+ return createMultipleInputNodes(sinkWrappers);
+ }
+
+ //
--------------------------------------------------------------------------------
+ // Wrapping and Sorting
+ //
--------------------------------------------------------------------------------
+
+ private List<ExecNodeWrapper> wrapExecNodes(List<ExecNode<?, ?>>
sinkNodes) {
+ Map<ExecNode<?, ?>, ExecNodeWrapper> wrapperMap = new
HashMap<>();
+ AbstractExecNodeExactlyOnceVisitor visitor = new
AbstractExecNodeExactlyOnceVisitor() {
+ @Override
+ protected void visitNode(ExecNode<?, ?> node) {
+ ExecNodeWrapper wrapper =
wrapperMap.computeIfAbsent(node, k -> new ExecNodeWrapper(node));
+ for (ExecNode<?, ?> input :
node.getInputNodes()) {
+ ExecNodeWrapper inputWrapper =
wrapperMap.computeIfAbsent(input, k -> new ExecNodeWrapper(input));
+ wrapper.inputs.add(inputWrapper);
+ inputWrapper.outputs.add(wrapper);
+ }
+ visitInputs(node);
+ }
+ };
+ sinkNodes.forEach(s -> s.accept(visitor));
+
+ List<ExecNodeWrapper> sinkWrappers = new ArrayList<>();
+ for (ExecNode<?, ?> sink : sinkNodes) {
+ ExecNodeWrapper sinkWrapper = wrapperMap.get(sink);
+ Preconditions.checkNotNull(sinkWrapper, "Sink node is
not wrapped. This is a bug.");
+ sinkWrappers.add(sinkWrapper);
+ }
+ return sinkWrappers;
+ }
+
+ private List<ExecNodeWrapper> topologicalSort(List<ExecNodeWrapper>
sinkWrappers) {
+ List<ExecNodeWrapper> result = new ArrayList<>();
+ Queue<ExecNodeWrapper> queue = new LinkedList<>(sinkWrappers);
+ Map<ExecNodeWrapper, Integer> visitCountMap = new HashMap<>();
+
+ while (!queue.isEmpty()) {
+ ExecNodeWrapper wrapper = queue.poll();
+ result.add(wrapper);
+ for (ExecNodeWrapper inputWrapper : wrapper.inputs) {
+ int visitCount =
visitCountMap.compute(inputWrapper, (k, v) -> v == null ? 1 : v + 1);
+ if (visitCount == inputWrapper.outputs.size()) {
+ queue.offer(inputWrapper);
+ }
+ }
+ }
+
+ return result;
+ }
+
+ //
--------------------------------------------------------------------------------
+ // Multiple Input Groups Creating
+ //
--------------------------------------------------------------------------------
+
+ private void createMultipleInputGroups(List<ExecNodeWrapper>
orderedWrappers) {
+ // wrappers are checked in topological order from sinks to
sources
+ for (ExecNodeWrapper wrapper : orderedWrappers) {
+ // we skip nodes which cannot be a member of a multiple
input node
+ if (!canBeMultipleInputNodeMember(wrapper)) {
+ continue;
+ }
+
+ // we first try to assign this wrapper into the same
group with its outputs
+ MultipleInputGroup outputGroup =
canBeInSameGroupWithOutputs(wrapper);
+ if (outputGroup != null) {
+ wrapper.addToGroup(outputGroup);
+ continue;
+ }
+
+ // we then try to create a new multiple input group
with this node as the root
+ if (canBeRootOfMultipleInputGroup(wrapper)) {
+ wrapper.createGroup();
+ }
+
+ // all our attempts failed, this node will not be in a
multiple input node
+ }
+ }
+
+ private boolean canBeMultipleInputNodeMember(ExecNodeWrapper wrapper) {
+ if (wrapper.inputs.isEmpty()) {
+ // sources cannot be a member of multiple input node
+ return false;
+ }
+ if (wrapper.execNode instanceof BatchExecExchange) {
+ // exchange cannot be a member of multiple input node
+ return false;
+ }
+ if (wrapper.execNode instanceof StreamExecExchange) {
+ // exchange cannot be a member of multiple input node
+ return false;
+ }
Review comment:
`wrapper.execNode instanceof Exchange`
##########
File path:
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/reuse/MultipleInputNodeCreationProcessor.java
##########
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.reuse;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.transformations.ShuffleMode;
+import org.apache.flink.streaming.api.transformations.SourceTransformation;
+import
org.apache.flink.table.planner.plan.nodes.exec.AbstractExecNodeExactlyOnceVisitor;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecBoundedStreamScan;
+import
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecExchange;
+import
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecMultipleInputNode;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecUnion;
+import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecDataStreamScan;
+import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange;
+import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecMultipleInputNode;
+import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecUnion;
+import org.apache.flink.table.planner.plan.nodes.process.DAGProcessContext;
+import org.apache.flink.table.planner.plan.nodes.process.DAGProcessor;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelNode;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+/**
+ * A {@link DAGProcessor} which organize {@link ExecNode}s into multiple input
nodes.
+ *
+ * <p>For a detailed explanation of the algorithm, see appendix of the
+ * <a
href="https://docs.google.com/document/d/1qKVohV12qn-bM51cBZ8Hcgp31ntwClxjoiNBUOqVHsI">design
doc</a>.
+ */
+public class MultipleInputNodeCreationProcessor implements DAGProcessor {
+
+ private final boolean isStreaming;
+
+ public MultipleInputNodeCreationProcessor(boolean isStreaming) {
+ this.isStreaming = isStreaming;
+ }
+
+ @Override
+ public List<ExecNode<?, ?>> process(List<ExecNode<?, ?>> sinkNodes,
DAGProcessContext context) {
+ if (!isStreaming) {
+ // As multiple input nodes use function call to deliver
records between sub-operators,
+ // we cannot rely on network buffers to buffer records
not yet ready to be read,
+ // so only BLOCKING dam behavior is safe here.
+ // If conflict is detected under this stricter
constraint,
+ // we add a PIPELINED exchange to mark that its input
and output node cannot be merged
+ // into the same multiple input node
+ InputPriorityConflictResolver resolver = new
InputPriorityConflictResolver(
+ sinkNodes,
+ Collections.emptySet(),
+ ExecEdge.DamBehavior.BLOCKING,
+ ShuffleMode.PIPELINED);
+ resolver.detectAndResolve();
+ }
+
+ List<ExecNodeWrapper> sinkWrappers = wrapExecNodes(sinkNodes);
+ // sort all nodes in topological order, sinks come first and
sources come last
+ List<ExecNodeWrapper> orderedWrappers =
topologicalSort(sinkWrappers);
+ // group nodes into multiple input groups
+ createMultipleInputGroups(orderedWrappers);
+ // apply optimizations to remove unnecessary nodes out of
multiple input groups
+ optimizeMultipleInputGroups(orderedWrappers);
+
+ // create the real multiple input nodes
+ return createMultipleInputNodes(sinkWrappers);
+ }
+
+ //
--------------------------------------------------------------------------------
+ // Wrapping and Sorting
+ //
--------------------------------------------------------------------------------
+
+ private List<ExecNodeWrapper> wrapExecNodes(List<ExecNode<?, ?>>
sinkNodes) {
+ Map<ExecNode<?, ?>, ExecNodeWrapper> wrapperMap = new
HashMap<>();
+ AbstractExecNodeExactlyOnceVisitor visitor = new
AbstractExecNodeExactlyOnceVisitor() {
+ @Override
+ protected void visitNode(ExecNode<?, ?> node) {
+ ExecNodeWrapper wrapper =
wrapperMap.computeIfAbsent(node, k -> new ExecNodeWrapper(node));
+ for (ExecNode<?, ?> input :
node.getInputNodes()) {
+ ExecNodeWrapper inputWrapper =
wrapperMap.computeIfAbsent(input, k -> new ExecNodeWrapper(input));
+ wrapper.inputs.add(inputWrapper);
+ inputWrapper.outputs.add(wrapper);
+ }
+ visitInputs(node);
+ }
+ };
+ sinkNodes.forEach(s -> s.accept(visitor));
+
+ List<ExecNodeWrapper> sinkWrappers = new ArrayList<>();
+ for (ExecNode<?, ?> sink : sinkNodes) {
+ ExecNodeWrapper sinkWrapper = wrapperMap.get(sink);
+ Preconditions.checkNotNull(sinkWrapper, "Sink node is
not wrapped. This is a bug.");
+ sinkWrappers.add(sinkWrapper);
+ }
+ return sinkWrappers;
+ }
+
+ private List<ExecNodeWrapper> topologicalSort(List<ExecNodeWrapper>
sinkWrappers) {
+ List<ExecNodeWrapper> result = new ArrayList<>();
+ Queue<ExecNodeWrapper> queue = new LinkedList<>(sinkWrappers);
+ Map<ExecNodeWrapper, Integer> visitCountMap = new HashMap<>();
+
+ while (!queue.isEmpty()) {
+ ExecNodeWrapper wrapper = queue.poll();
+ result.add(wrapper);
+ for (ExecNodeWrapper inputWrapper : wrapper.inputs) {
+ int visitCount =
visitCountMap.compute(inputWrapper, (k, v) -> v == null ? 1 : v + 1);
+ if (visitCount == inputWrapper.outputs.size()) {
+ queue.offer(inputWrapper);
+ }
+ }
+ }
+
+ return result;
+ }
+
+ //
--------------------------------------------------------------------------------
+ // Multiple Input Groups Creating
+ //
--------------------------------------------------------------------------------
+
+ private void createMultipleInputGroups(List<ExecNodeWrapper>
orderedWrappers) {
+ // wrappers are checked in topological order from sinks to
sources
+ for (ExecNodeWrapper wrapper : orderedWrappers) {
+ // we skip nodes which cannot be a member of a multiple
input node
+ if (!canBeMultipleInputNodeMember(wrapper)) {
+ continue;
+ }
+
+ // we first try to assign this wrapper into the same
group with its outputs
+ MultipleInputGroup outputGroup =
canBeInSameGroupWithOutputs(wrapper);
+ if (outputGroup != null) {
+ wrapper.addToGroup(outputGroup);
+ continue;
+ }
+
+ // we then try to create a new multiple input group
with this node as the root
+ if (canBeRootOfMultipleInputGroup(wrapper)) {
+ wrapper.createGroup();
+ }
+
+ // all our attempts failed, this node will not be in a
multiple input node
+ }
+ }
+
+ private boolean canBeMultipleInputNodeMember(ExecNodeWrapper wrapper) {
+ if (wrapper.inputs.isEmpty()) {
+ // sources cannot be a member of multiple input node
+ return false;
+ }
+ if (wrapper.execNode instanceof BatchExecExchange) {
+ // exchange cannot be a member of multiple input node
+ return false;
+ }
+ if (wrapper.execNode instanceof StreamExecExchange) {
+ // exchange cannot be a member of multiple input node
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
+ * A node can only be assigned into the same multiple input group of
its outputs
+ * if all outputs have a group and are the same.
+ *
+ * @return the {@link MultipleInputGroup} of the outputs if all outputs
have a
+ * group and are the same, null otherwise
+ */
+ private MultipleInputGroup canBeInSameGroupWithOutputs(ExecNodeWrapper
wrapper) {
+ if (wrapper.outputs.isEmpty()) {
+ return null;
+ }
+
+ MultipleInputGroup outputGroup = wrapper.outputs.get(0).group;
+ if (outputGroup == null) {
+ return null;
+ }
+
+ for (ExecNodeWrapper outputWrapper : wrapper.outputs) {
+ if (outputWrapper.group != outputGroup) {
+ return null;
+ }
+ }
+
+ return outputGroup;
+ }
+
+ private boolean canBeRootOfMultipleInputGroup(ExecNodeWrapper wrapper) {
+ // only a node with more than one input can be the root,
+ // as one-input operator chaining are handled by operator chains
+ return wrapper.inputs.size() >= 2;
+ }
+
+ //
--------------------------------------------------------------------------------
+ // Multiple Input Groups Optimizing
+ //
--------------------------------------------------------------------------------
+
+ private void optimizeMultipleInputGroups(List<ExecNodeWrapper>
orderedWrappers) {
+ // wrappers are checked in topological order from sources to
sinks
+ for (int i = orderedWrappers.size() - 1; i >= 0; i--) {
+ ExecNodeWrapper wrapper = orderedWrappers.get(i);
+ MultipleInputGroup group = wrapper.group;
+ if (group == null) {
+ // we only consider nodes currently in a
multiple input group
+ continue;
+ }
+
+ boolean isUnion =
+ wrapper.execNode instanceof BatchExecUnion ||
wrapper.execNode instanceof StreamExecUnion;
+
+ if (group.members.size() == 1) {
+ Preconditions.checkState(
+ wrapper == group.root,
+ "The only member of a multiple input
group is not its root. This is a bug.");
+ // optimization 1. we clean up multiple input
groups with only 1 member,
+ // unless one of its input is a FLIP-27 source
(for maximizing source chaining),
+ // however unions do not apply to this
optimization because they're not real operators
+ if (isUnion ||
wrapper.inputs.stream().noneMatch(inputWrapper ->
isNewSource(inputWrapper.execNode))) {
+ wrapper.removeFromGroup();
+ }
+ continue;
+ }
+
+ if (!isTailOfMultipleInputGroup(wrapper)) {
+ // we're not removing a node from the middle of
a multiple input group
+ continue;
+ }
+
+ boolean shouldRemove = false;
+ if (isUnion) {
+ // optimization 2. we do not allow union to be
the tail of a multiple input
+ // as we're paying extra function calls for
this, unless one of the united
+ // input is a FLIP-27 source
+ shouldRemove =
wrapper.inputs.stream().noneMatch(inputWrapper ->
isNewSource(inputWrapper.execNode));
+ } else if (wrapper.inputs.size() == 1) {
+ // optimization 3. for one-input operators
we'll remove it unless its input
+ // is an exchange or a FLIP-27 source, this is
mainly to avoid the following
+ // pattern:
+ // non-chainable source -> calc --\
+ // join ->
+ // non-chainable source -> calc --/
+ // if we move two calcs into the multiple input
group rooted at the join, we're
+ // directly shuffling large amount of records
from the source without filtering
+ // by the calc
+ ExecNode<?, ?> input =
wrapper.inputs.get(0).execNode;
+ shouldRemove = !(input instanceof
BatchExecExchange) &&
+ !(input instanceof StreamExecExchange)
&&
+ !isNewSource(input);
+ }
+
+ // optimization 4. for singleton operations (for
example singleton global agg)
+ // we're not including it into the multiple input node
as we have to ensure that
+ // the whole multiple input can only have 1 parallelism.
+ // continuous singleton operations connected by
forwarding shuffle will be dealt
+ // together with optimization 3
+ shouldRemove |=
wrapper.inputs.stream().anyMatch(inputWrapper ->
+ inputWrapper.execNode instanceof
BatchExecExchange &&
+ ((BatchExecExchange)
inputWrapper.execNode)
+ .distribution.getType() ==
RelDistribution.Type.SINGLETON);
+
+ if (shouldRemove) {
+ wrapper.removeFromGroup();
+ }
+ }
+ }
+
+ private boolean isTailOfMultipleInputGroup(ExecNodeWrapper wrapper) {
+ Preconditions.checkNotNull(
+ wrapper.group,
+ "Exec node wrapper does not have a multiple input
group. This is a bug.");
+ for (ExecNodeWrapper inputWrapper : wrapper.inputs) {
+ if (inputWrapper.group == wrapper.group) {
+ // one of the input is in the same group, so
this node is not the tail of the group
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private boolean isNewSource(ExecNode<?, ?> node) {
+ if (node instanceof BatchExecBoundedStreamScan) {
+ BatchExecBoundedStreamScan scan =
(BatchExecBoundedStreamScan) node;
+ return
scan.boundedStreamTable().dataStream().getTransformation() instanceof
SourceTransformation;
+ } else if (node instanceof StreamExecDataStreamScan) {
+ StreamExecDataStreamScan scan =
(StreamExecDataStreamScan) node;
+ return
scan.dataStreamTable().dataStream().getTransformation() instanceof
SourceTransformation;
+ }
+ return false;
+ }
+
+ //
--------------------------------------------------------------------------------
+ // Multiple Input Nodes Creating
+ //
--------------------------------------------------------------------------------
+
+ private List<ExecNode<?, ?>>
createMultipleInputNodes(List<ExecNodeWrapper> sinkWrappers) {
+ List<ExecNode<?, ?>> result = new ArrayList<>();
+ Map<ExecNodeWrapper, ExecNode<?, ?>> visitMap = new HashMap<>();
+ for (ExecNodeWrapper sinkWrapper : sinkWrappers) {
+ result.add(getMultipleInputNode(sinkWrapper, visitMap));
+ }
+ return result;
+ }
+
+ private ExecNode<?, ?> getMultipleInputNode(
+ ExecNodeWrapper wrapper,
+ Map<ExecNodeWrapper, ExecNode<?, ?>> visitMap) {
+ if (visitMap.containsKey(wrapper)) {
+ return visitMap.get(wrapper);
+ }
+
+ for (int i = 0; i < wrapper.inputs.size(); i++) {
+ wrapper.execNode.replaceInputNode(i, (ExecNode)
getMultipleInputNode(wrapper.inputs.get(i), visitMap));
+ }
+
+ ExecNode<?, ?> ret;
+ if (wrapper.group != null && wrapper == wrapper.group.root) {
+ ret = createMultipleInputNode(wrapper.group, visitMap);
+ } else {
+ ret = wrapper.execNode;
+ }
+ visitMap.put(wrapper, ret);
+ return ret;
+ }
+
+ private ExecNode<?, ?> createMultipleInputNode(
+ MultipleInputGroup group,
+ Map<ExecNodeWrapper, ExecNode<?, ?>> visitMap) {
+ // calculate the inputs of the multiple input node
+ List<Tuple2<ExecNode<?, ?>, ExecEdge>> inputs = new
ArrayList<>();
+ for (ExecNodeWrapper member : group.members) {
+ for (int i = 0; i < member.inputs.size(); i++) {
+ ExecNodeWrapper memberInput =
member.inputs.get(i);
+ if (group.members.contains(memberInput)) {
+ continue;
+ }
+ Preconditions.checkState(
+ visitMap.containsKey(memberInput),
+ "Input of a multiple input member is
not visited. This is a bug.");
+
+ ExecNode<?, ?> inputNode =
visitMap.get(memberInput);
+ ExecEdge inputEdge =
member.execNode.getInputEdges().get(i);
+ inputs.add(Tuple2.of(inputNode, inputEdge));
+ }
+ }
+
+ if (isStreaming) {
+ return createStreamMultipleInputNode(group, inputs);
+ } else {
+ return createBatchMultipleInputNode(group, inputs);
+ }
+ }
+
+ private StreamExecMultipleInputNode createStreamMultipleInputNode(
+ MultipleInputGroup group,
+ List<Tuple2<ExecNode<?, ?>, ExecEdge>> inputs) {
+ RelNode outputRel = (RelNode) group.root.execNode;
+ RelNode[] inputRels = new RelNode[inputs.size()];
+ for (int i = 0; i < inputs.size(); i++) {
+ inputRels[i] = (RelNode) inputs.get(i).f0;
+ }
+
+ return new StreamExecMultipleInputNode(
+ outputRel.getCluster(),
+ outputRel.getTraitSet(),
+ inputRels,
+ outputRel);
+ }
+
+ private BatchExecMultipleInputNode createBatchMultipleInputNode(
+ MultipleInputGroup group,
+ List<Tuple2<ExecNode<?, ?>, ExecEdge>> inputs) {
+ // first calculate the input orders using
InputPriorityConflictResolver
+ Set<ExecNode<?, ?>> inputSet = new HashSet<>();
+ for (Tuple2<ExecNode<?, ?>, ExecEdge> t : inputs) {
+ inputSet.add(t.f0);
+ }
+ InputPriorityConflictResolver resolver = new
InputPriorityConflictResolver(
+ Collections.singletonList(group.root.execNode),
+ inputSet,
+ ExecEdge.DamBehavior.BLOCKING,
+ ShuffleMode.PIPELINED);
+ Map<ExecNode<?, ?>, Integer> inputOrderMap =
resolver.calculateInputOrder();
+
+ // then create input rels and edges with the input orders
+ RelNode outputRel = (RelNode) group.root.execNode;
+ RelNode[] inputRels = new RelNode[inputs.size()];
+ ExecEdge[] inputEdges = new ExecEdge[inputs.size()];
+ for (int i = 0; i < inputs.size(); i++) {
+ ExecNode<?, ?> inputNode = inputs.get(i).f0;
+ ExecEdge originalInputEdge = inputs.get(i).f1;
+ inputRels[i] = (RelNode) inputNode;
+ inputEdges[i] = ExecEdge.builder()
+
.requiredShuffle(originalInputEdge.getRequiredShuffle())
+ .damBehavior(originalInputEdge.getDamBehavior())
+ .priority(inputOrderMap.get(inputNode))
+ .build();
+ }
+
+ return new BatchExecMultipleInputNode(
+ outputRel.getCluster(),
+ outputRel.getTraitSet(),
+ inputRels,
+ outputRel,
+ inputEdges);
+ }
+
+ //
--------------------------------------------------------------------------------
+ // Helper Classes
+ //
--------------------------------------------------------------------------------
+
+ private static class ExecNodeWrapper {
+ private final ExecNode<?, ?> execNode;
+ private final List<ExecNodeWrapper> inputs;
+ private final List<ExecNodeWrapper> outputs;
+ private MultipleInputGroup group;
+
+ private ExecNodeWrapper(ExecNode<?, ?> execNode) {
+ this.execNode = execNode;
+ this.inputs = new ArrayList<>();
+ this.outputs = new ArrayList<>();
+ this.group = null;
+ }
+
+ private void createGroup() {
+ this.group = new MultipleInputGroup(this);
+ }
+
+ private void addToGroup(MultipleInputGroup group) {
Review comment:
rename to `replaceGroup` ?
##########
File path:
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/reuse/MultipleInputNodeCreationProcessor.java
##########
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.reuse;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.transformations.ShuffleMode;
+import org.apache.flink.streaming.api.transformations.SourceTransformation;
+import
org.apache.flink.table.planner.plan.nodes.exec.AbstractExecNodeExactlyOnceVisitor;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecBoundedStreamScan;
+import
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecExchange;
+import
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecMultipleInputNode;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecUnion;
+import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecDataStreamScan;
+import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange;
+import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecMultipleInputNode;
+import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecUnion;
+import org.apache.flink.table.planner.plan.nodes.process.DAGProcessContext;
+import org.apache.flink.table.planner.plan.nodes.process.DAGProcessor;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelNode;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+/**
+ * A {@link DAGProcessor} which organize {@link ExecNode}s into multiple input
nodes.
+ *
+ * <p>For a detailed explanation of the algorithm, see appendix of the
+ * <a
href="https://docs.google.com/document/d/1qKVohV12qn-bM51cBZ8Hcgp31ntwClxjoiNBUOqVHsI">design
doc</a>.
+ */
+public class MultipleInputNodeCreationProcessor implements DAGProcessor {
+
+ private final boolean isStreaming;
+
+ public MultipleInputNodeCreationProcessor(boolean isStreaming) {
+ this.isStreaming = isStreaming;
+ }
+
+ @Override
+ public List<ExecNode<?, ?>> process(List<ExecNode<?, ?>> sinkNodes,
DAGProcessContext context) {
+ if (!isStreaming) {
+ // As multiple input nodes use function call to deliver
records between sub-operators,
+ // we cannot rely on network buffers to buffer records
not yet ready to be read,
+ // so only BLOCKING dam behavior is safe here.
+ // If conflict is detected under this stricter
constraint,
+ // we add a PIPELINED exchange to mark that its input
and output node cannot be merged
+ // into the same multiple input node
+ InputPriorityConflictResolver resolver = new
InputPriorityConflictResolver(
+ sinkNodes,
+ Collections.emptySet(),
+ ExecEdge.DamBehavior.BLOCKING,
+ ShuffleMode.PIPELINED);
+ resolver.detectAndResolve();
+ }
+
+ List<ExecNodeWrapper> sinkWrappers = wrapExecNodes(sinkNodes);
+ // sort all nodes in topological order, sinks come first and
sources come last
+ List<ExecNodeWrapper> orderedWrappers =
topologicalSort(sinkWrappers);
+ // group nodes into multiple input groups
+ createMultipleInputGroups(orderedWrappers);
+ // apply optimizations to remove unnecessary nodes out of
multiple input groups
Review comment:
Do these optimizations necessary, or can we delete any optimization, but
the result is correct
##########
File path:
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/reuse/InputPriorityConflictResolver.java
##########
@@ -97,31 +98,79 @@
public class InputPriorityConflictResolver {
private final List<ExecNode<?, ?>> roots;
+ private final Set<ExecNode<?, ?>> boundaries;
+ private final ExecEdge.DamBehavior safeDamBehavior;
+ private final ShuffleMode shuffleMode;
Review comment:
add some comments to explain the fields
##########
File path:
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/reuse/InputPriorityConflictResolver.java
##########
@@ -97,31 +98,79 @@
public class InputPriorityConflictResolver {
private final List<ExecNode<?, ?>> roots;
+ private final Set<ExecNode<?, ?>> boundaries;
+ private final ExecEdge.DamBehavior safeDamBehavior;
+ private final ShuffleMode shuffleMode;
private TopologyGraph graph;
- public InputPriorityConflictResolver(List<ExecNode<?, ?>> roots) {
+ public InputPriorityConflictResolver(
+ List<ExecNode<?, ?>> roots,
+ Set<ExecNode<?, ?>> boundaries,
+ ExecEdge.DamBehavior safeDamBehavior,
+ ShuffleMode shuffleMode) {
Preconditions.checkArgument(
roots.stream().allMatch(root -> root instanceof
BatchExecNode),
"InputPriorityConflictResolver can only be used for
batch jobs.");
this.roots = roots;
+ this.boundaries = boundaries;
+ this.safeDamBehavior = safeDamBehavior;
+ this.shuffleMode = shuffleMode;
}
public void detectAndResolve() {
// build an initial topology graph
- graph = new TopologyGraph(roots);
+ graph = new TopologyGraph(roots, boundaries);
// check and resolve conflicts about input priorities
AbstractExecNodeExactlyOnceVisitor inputPriorityVisitor = new
AbstractExecNodeExactlyOnceVisitor() {
@Override
protected void visitNode(ExecNode<?, ?> node) {
- visitInputs(node);
+ if (!boundaries.contains(node)) {
+ visitInputs(node);
+ }
checkInputPriorities(node);
}
};
roots.forEach(n -> n.accept(inputPriorityVisitor));
}
+ public Map<ExecNode<?, ?>, Integer> calculateInputOrder() {
+ // we first calculate the topological order of all nodes in the
graph
+ detectAndResolve();
+ // check that no exchange is contained in the multiple input
node
+ AbstractExecNodeExactlyOnceVisitor inputPriorityVisitor = new
AbstractExecNodeExactlyOnceVisitor() {
+ @Override
+ protected void visitNode(ExecNode<?, ?> node) {
+ if (boundaries.contains(node)) {
+ return;
+ }
+ visitInputs(node);
+ Preconditions.checkState(
+ !(node instanceof BatchExecExchange),
+ "There is exchange in a multiple input
node. This is a bug.");
+ }
+ };
+ roots.forEach(n -> n.accept(inputPriorityVisitor));
Review comment:
please check the number of `roots` should always be 1. If we separate
this part of logic into another class, we can give `ExecNode<?, ?> root`
instead of `List<ExecNode<?, ?>> roots` as a part of the constructor parameters
##########
File path:
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/reuse/InputPriorityConflictResolver.java
##########
@@ -97,31 +98,79 @@
public class InputPriorityConflictResolver {
private final List<ExecNode<?, ?>> roots;
+ private final Set<ExecNode<?, ?>> boundaries;
+ private final ExecEdge.DamBehavior safeDamBehavior;
+ private final ShuffleMode shuffleMode;
private TopologyGraph graph;
- public InputPriorityConflictResolver(List<ExecNode<?, ?>> roots) {
+ public InputPriorityConflictResolver(
+ List<ExecNode<?, ?>> roots,
+ Set<ExecNode<?, ?>> boundaries,
+ ExecEdge.DamBehavior safeDamBehavior,
+ ShuffleMode shuffleMode) {
Preconditions.checkArgument(
roots.stream().allMatch(root -> root instanceof
BatchExecNode),
"InputPriorityConflictResolver can only be used for
batch jobs.");
this.roots = roots;
+ this.boundaries = boundaries;
+ this.safeDamBehavior = safeDamBehavior;
+ this.shuffleMode = shuffleMode;
}
public void detectAndResolve() {
// build an initial topology graph
- graph = new TopologyGraph(roots);
+ graph = new TopologyGraph(roots, boundaries);
// check and resolve conflicts about input priorities
AbstractExecNodeExactlyOnceVisitor inputPriorityVisitor = new
AbstractExecNodeExactlyOnceVisitor() {
@Override
protected void visitNode(ExecNode<?, ?> node) {
- visitInputs(node);
+ if (!boundaries.contains(node)) {
+ visitInputs(node);
+ }
checkInputPriorities(node);
}
};
roots.forEach(n -> n.accept(inputPriorityVisitor));
}
+ public Map<ExecNode<?, ?>, Integer> calculateInputOrder() {
Review comment:
We should introduce another class to calculate the input orders, named:
InputOrderDerivation ? maybe we also need a Base class of
`InputOrderDerivation` and `InputPriorityConflictResolver`
##########
File path:
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/reuse/MultipleInputNodeCreationProcessor.java
##########
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.reuse;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.transformations.ShuffleMode;
+import org.apache.flink.streaming.api.transformations.SourceTransformation;
+import
org.apache.flink.table.planner.plan.nodes.exec.AbstractExecNodeExactlyOnceVisitor;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecBoundedStreamScan;
+import
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecExchange;
+import
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecMultipleInputNode;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecUnion;
+import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecDataStreamScan;
+import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange;
+import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecMultipleInputNode;
+import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecUnion;
+import org.apache.flink.table.planner.plan.nodes.process.DAGProcessContext;
+import org.apache.flink.table.planner.plan.nodes.process.DAGProcessor;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelNode;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+/**
+ * A {@link DAGProcessor} which organize {@link ExecNode}s into multiple input
nodes.
+ *
+ * <p>For a detailed explanation of the algorithm, see appendix of the
+ * <a
href="https://docs.google.com/document/d/1qKVohV12qn-bM51cBZ8Hcgp31ntwClxjoiNBUOqVHsI">design
doc</a>.
+ */
+public class MultipleInputNodeCreationProcessor implements DAGProcessor {
+
+ private final boolean isStreaming;
+
+ public MultipleInputNodeCreationProcessor(boolean isStreaming) {
+ this.isStreaming = isStreaming;
+ }
+
+ @Override
+ public List<ExecNode<?, ?>> process(List<ExecNode<?, ?>> sinkNodes,
DAGProcessContext context) {
+ if (!isStreaming) {
+ // As multiple input nodes use function call to deliver
records between sub-operators,
+ // we cannot rely on network buffers to buffer records
not yet ready to be read,
+ // so only BLOCKING dam behavior is safe here.
+ // If conflict is detected under this stricter
constraint,
+ // we add a PIPELINED exchange to mark that its input
and output node cannot be merged
+ // into the same multiple input node
+ InputPriorityConflictResolver resolver = new
InputPriorityConflictResolver(
+ sinkNodes,
+ Collections.emptySet(),
+ ExecEdge.DamBehavior.BLOCKING,
+ ShuffleMode.PIPELINED);
+ resolver.detectAndResolve();
+ }
+
+ List<ExecNodeWrapper> sinkWrappers = wrapExecNodes(sinkNodes);
Review comment:
sink -> root
##########
File path:
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/reuse/MultipleInputNodeCreationProcessor.java
##########
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.reuse;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.transformations.ShuffleMode;
+import org.apache.flink.streaming.api.transformations.SourceTransformation;
+import
org.apache.flink.table.planner.plan.nodes.exec.AbstractExecNodeExactlyOnceVisitor;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecBoundedStreamScan;
+import
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecExchange;
+import
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecMultipleInputNode;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecUnion;
+import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecDataStreamScan;
+import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange;
+import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecMultipleInputNode;
+import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecUnion;
+import org.apache.flink.table.planner.plan.nodes.process.DAGProcessContext;
+import org.apache.flink.table.planner.plan.nodes.process.DAGProcessor;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelNode;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+/**
+ * A {@link DAGProcessor} which organize {@link ExecNode}s into multiple input
nodes.
+ *
+ * <p>For a detailed explanation of the algorithm, see appendix of the
+ * <a
href="https://docs.google.com/document/d/1qKVohV12qn-bM51cBZ8Hcgp31ntwClxjoiNBUOqVHsI">design
doc</a>.
+ */
+public class MultipleInputNodeCreationProcessor implements DAGProcessor {
+
+ private final boolean isStreaming;
+
+ public MultipleInputNodeCreationProcessor(boolean isStreaming) {
+ this.isStreaming = isStreaming;
+ }
+
+ @Override
+ public List<ExecNode<?, ?>> process(List<ExecNode<?, ?>> sinkNodes,
DAGProcessContext context) {
+ if (!isStreaming) {
+ // As multiple input nodes use function call to deliver
records between sub-operators,
+ // we cannot rely on network buffers to buffer records
not yet ready to be read,
+ // so only BLOCKING dam behavior is safe here.
+ // If conflict is detected under this stricter
constraint,
+ // we add a PIPELINED exchange to mark that its input
and output node cannot be merged
+ // into the same multiple input node
+ InputPriorityConflictResolver resolver = new
InputPriorityConflictResolver(
+ sinkNodes,
+ Collections.emptySet(),
+ ExecEdge.DamBehavior.BLOCKING,
+ ShuffleMode.PIPELINED);
+ resolver.detectAndResolve();
+ }
+
+ List<ExecNodeWrapper> sinkWrappers = wrapExecNodes(sinkNodes);
+ // sort all nodes in topological order, sinks come first and
sources come last
+ List<ExecNodeWrapper> orderedWrappers =
topologicalSort(sinkWrappers);
+ // group nodes into multiple input groups
+ createMultipleInputGroups(orderedWrappers);
+ // apply optimizations to remove unnecessary nodes out of
multiple input groups
+ optimizeMultipleInputGroups(orderedWrappers);
+
+ // create the real multiple input nodes
+ return createMultipleInputNodes(sinkWrappers);
+ }
+
+ //
--------------------------------------------------------------------------------
+ // Wrapping and Sorting
+ //
--------------------------------------------------------------------------------
+
+ private List<ExecNodeWrapper> wrapExecNodes(List<ExecNode<?, ?>>
sinkNodes) {
+ Map<ExecNode<?, ?>, ExecNodeWrapper> wrapperMap = new
HashMap<>();
+ AbstractExecNodeExactlyOnceVisitor visitor = new
AbstractExecNodeExactlyOnceVisitor() {
+ @Override
+ protected void visitNode(ExecNode<?, ?> node) {
+ ExecNodeWrapper wrapper =
wrapperMap.computeIfAbsent(node, k -> new ExecNodeWrapper(node));
+ for (ExecNode<?, ?> input :
node.getInputNodes()) {
+ ExecNodeWrapper inputWrapper =
wrapperMap.computeIfAbsent(input, k -> new ExecNodeWrapper(input));
+ wrapper.inputs.add(inputWrapper);
+ inputWrapper.outputs.add(wrapper);
+ }
+ visitInputs(node);
+ }
+ };
+ sinkNodes.forEach(s -> s.accept(visitor));
+
+ List<ExecNodeWrapper> sinkWrappers = new ArrayList<>();
+ for (ExecNode<?, ?> sink : sinkNodes) {
+ ExecNodeWrapper sinkWrapper = wrapperMap.get(sink);
+ Preconditions.checkNotNull(sinkWrapper, "Sink node is
not wrapped. This is a bug.");
+ sinkWrappers.add(sinkWrapper);
+ }
+ return sinkWrappers;
+ }
+
+ private List<ExecNodeWrapper> topologicalSort(List<ExecNodeWrapper>
sinkWrappers) {
+ List<ExecNodeWrapper> result = new ArrayList<>();
+ Queue<ExecNodeWrapper> queue = new LinkedList<>(sinkWrappers);
+ Map<ExecNodeWrapper, Integer> visitCountMap = new HashMap<>();
+
+ while (!queue.isEmpty()) {
+ ExecNodeWrapper wrapper = queue.poll();
+ result.add(wrapper);
+ for (ExecNodeWrapper inputWrapper : wrapper.inputs) {
+ int visitCount =
visitCountMap.compute(inputWrapper, (k, v) -> v == null ? 1 : v + 1);
+ if (visitCount == inputWrapper.outputs.size()) {
+ queue.offer(inputWrapper);
+ }
+ }
+ }
+
+ return result;
+ }
+
+ //
--------------------------------------------------------------------------------
+ // Multiple Input Groups Creating
+ //
--------------------------------------------------------------------------------
+
+ private void createMultipleInputGroups(List<ExecNodeWrapper>
orderedWrappers) {
+ // wrappers are checked in topological order from sinks to
sources
+ for (ExecNodeWrapper wrapper : orderedWrappers) {
+ // we skip nodes which cannot be a member of a multiple
input node
+ if (!canBeMultipleInputNodeMember(wrapper)) {
+ continue;
+ }
+
+ // we first try to assign this wrapper into the same
group with its outputs
+ MultipleInputGroup outputGroup =
canBeInSameGroupWithOutputs(wrapper);
+ if (outputGroup != null) {
+ wrapper.addToGroup(outputGroup);
+ continue;
+ }
+
+ // we then try to create a new multiple input group
with this node as the root
+ if (canBeRootOfMultipleInputGroup(wrapper)) {
+ wrapper.createGroup();
+ }
+
+ // all our attempts failed, this node will not be in a
multiple input node
+ }
+ }
+
+ private boolean canBeMultipleInputNodeMember(ExecNodeWrapper wrapper) {
+ if (wrapper.inputs.isEmpty()) {
+ // sources cannot be a member of multiple input node
+ return false;
+ }
+ if (wrapper.execNode instanceof BatchExecExchange) {
+ // exchange cannot be a member of multiple input node
+ return false;
+ }
+ if (wrapper.execNode instanceof StreamExecExchange) {
+ // exchange cannot be a member of multiple input node
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
+ * A node can only be assigned into the same multiple input group of
its outputs
+ * if all outputs have a group and are the same.
+ *
+ * @return the {@link MultipleInputGroup} of the outputs if all outputs
have a
+ * group and are the same, null otherwise
+ */
+ private MultipleInputGroup canBeInSameGroupWithOutputs(ExecNodeWrapper
wrapper) {
+ if (wrapper.outputs.isEmpty()) {
+ return null;
+ }
+
+ MultipleInputGroup outputGroup = wrapper.outputs.get(0).group;
+ if (outputGroup == null) {
+ return null;
+ }
+
+ for (ExecNodeWrapper outputWrapper : wrapper.outputs) {
+ if (outputWrapper.group != outputGroup) {
+ return null;
+ }
+ }
+
+ return outputGroup;
+ }
+
+ private boolean canBeRootOfMultipleInputGroup(ExecNodeWrapper wrapper) {
+ // only a node with more than one input can be the root,
+ // as one-input operator chaining are handled by operator chains
+ return wrapper.inputs.size() >= 2;
+ }
+
+ //
--------------------------------------------------------------------------------
+ // Multiple Input Groups Optimizing
+ //
--------------------------------------------------------------------------------
+
+ private void optimizeMultipleInputGroups(List<ExecNodeWrapper>
orderedWrappers) {
+ // wrappers are checked in topological order from sources to
sinks
+ for (int i = orderedWrappers.size() - 1; i >= 0; i--) {
+ ExecNodeWrapper wrapper = orderedWrappers.get(i);
+ MultipleInputGroup group = wrapper.group;
+ if (group == null) {
+ // we only consider nodes currently in a
multiple input group
+ continue;
+ }
+
+ boolean isUnion =
+ wrapper.execNode instanceof BatchExecUnion ||
wrapper.execNode instanceof StreamExecUnion;
+
+ if (group.members.size() == 1) {
+ Preconditions.checkState(
+ wrapper == group.root,
+ "The only member of a multiple input
group is not its root. This is a bug.");
+ // optimization 1. we clean up multiple input
groups with only 1 member,
+ // unless one of its input is a FLIP-27 source
(for maximizing source chaining),
+ // however unions do not apply to this
optimization because they're not real operators
+ if (isUnion ||
wrapper.inputs.stream().noneMatch(inputWrapper ->
isNewSource(inputWrapper.execNode))) {
+ wrapper.removeFromGroup();
+ }
+ continue;
+ }
+
+ if (!isTailOfMultipleInputGroup(wrapper)) {
+ // we're not removing a node from the middle of
a multiple input group
+ continue;
+ }
+
+ boolean shouldRemove = false;
+ if (isUnion) {
+ // optimization 2. we do not allow union to be
the tail of a multiple input
+ // as we're paying extra function calls for
this, unless one of the united
+ // input is a FLIP-27 source
+ shouldRemove =
wrapper.inputs.stream().noneMatch(inputWrapper ->
isNewSource(inputWrapper.execNode));
+ } else if (wrapper.inputs.size() == 1) {
+ // optimization 3. for one-input operators
we'll remove it unless its input
+ // is an exchange or a FLIP-27 source, this is
mainly to avoid the following
+ // pattern:
+ // non-chainable source -> calc --\
+ // join ->
+ // non-chainable source -> calc --/
+ // if we move two calcs into the multiple input
group rooted at the join, we're
+ // directly shuffling large amount of records
from the source without filtering
+ // by the calc
+ ExecNode<?, ?> input =
wrapper.inputs.get(0).execNode;
+ shouldRemove = !(input instanceof
BatchExecExchange) &&
+ !(input instanceof StreamExecExchange)
&&
+ !isNewSource(input);
+ }
+
+ // optimization 4. for singleton operations (for
example singleton global agg)
+ // we're not including it into the multiple input node
as we have to ensure that
+ // the whole multiple input can only have 1 parallelism.
+ // continuous singleton operations connected by
forwarding shuffle will be dealt
+ // together with optimization 3
+ shouldRemove |=
wrapper.inputs.stream().anyMatch(inputWrapper ->
+ inputWrapper.execNode instanceof
BatchExecExchange &&
+ ((BatchExecExchange)
inputWrapper.execNode)
+ .distribution.getType() ==
RelDistribution.Type.SINGLETON);
+
+ if (shouldRemove) {
+ wrapper.removeFromGroup();
+ }
+ }
+ }
+
+ private boolean isTailOfMultipleInputGroup(ExecNodeWrapper wrapper) {
+ Preconditions.checkNotNull(
+ wrapper.group,
+ "Exec node wrapper does not have a multiple input
group. This is a bug.");
+ for (ExecNodeWrapper inputWrapper : wrapper.inputs) {
+ if (inputWrapper.group == wrapper.group) {
+ // one of the input is in the same group, so
this node is not the tail of the group
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private boolean isNewSource(ExecNode<?, ?> node) {
+ if (node instanceof BatchExecBoundedStreamScan) {
+ BatchExecBoundedStreamScan scan =
(BatchExecBoundedStreamScan) node;
+ return
scan.boundedStreamTable().dataStream().getTransformation() instanceof
SourceTransformation;
+ } else if (node instanceof StreamExecDataStreamScan) {
+ StreamExecDataStreamScan scan =
(StreamExecDataStreamScan) node;
+ return
scan.dataStreamTable().dataStream().getTransformation() instanceof
SourceTransformation;
+ }
+ return false;
+ }
+
+ //
--------------------------------------------------------------------------------
+ // Multiple Input Nodes Creating
+ //
--------------------------------------------------------------------------------
+
+ private List<ExecNode<?, ?>>
createMultipleInputNodes(List<ExecNodeWrapper> sinkWrappers) {
+ List<ExecNode<?, ?>> result = new ArrayList<>();
+ Map<ExecNodeWrapper, ExecNode<?, ?>> visitMap = new HashMap<>();
Review comment:
visitMap -> visitedMap
##########
File path:
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/reuse/MultipleInputNodeCreationProcessor.java
##########
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.reuse;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.transformations.ShuffleMode;
+import org.apache.flink.streaming.api.transformations.SourceTransformation;
+import
org.apache.flink.table.planner.plan.nodes.exec.AbstractExecNodeExactlyOnceVisitor;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecBoundedStreamScan;
+import
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecExchange;
+import
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecMultipleInputNode;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecUnion;
+import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecDataStreamScan;
+import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange;
+import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecMultipleInputNode;
+import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecUnion;
+import org.apache.flink.table.planner.plan.nodes.process.DAGProcessContext;
+import org.apache.flink.table.planner.plan.nodes.process.DAGProcessor;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelNode;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+/**
+ * A {@link DAGProcessor} which organize {@link ExecNode}s into multiple input
nodes.
+ *
+ * <p>For a detailed explanation of the algorithm, see appendix of the
+ * <a
href="https://docs.google.com/document/d/1qKVohV12qn-bM51cBZ8Hcgp31ntwClxjoiNBUOqVHsI">design
doc</a>.
+ */
+public class MultipleInputNodeCreationProcessor implements DAGProcessor {
+
+ private final boolean isStreaming;
+
+ public MultipleInputNodeCreationProcessor(boolean isStreaming) {
+ this.isStreaming = isStreaming;
+ }
+
+ @Override
+ public List<ExecNode<?, ?>> process(List<ExecNode<?, ?>> sinkNodes,
DAGProcessContext context) {
+ if (!isStreaming) {
+ // As multiple input nodes use function call to deliver
records between sub-operators,
+ // we cannot rely on network buffers to buffer records
not yet ready to be read,
+ // so only BLOCKING dam behavior is safe here.
+ // If conflict is detected under this stricter
constraint,
+ // we add a PIPELINED exchange to mark that its input
and output node cannot be merged
+ // into the same multiple input node
+ InputPriorityConflictResolver resolver = new
InputPriorityConflictResolver(
+ sinkNodes,
+ Collections.emptySet(),
+ ExecEdge.DamBehavior.BLOCKING,
+ ShuffleMode.PIPELINED);
+ resolver.detectAndResolve();
+ }
+
+ List<ExecNodeWrapper> sinkWrappers = wrapExecNodes(sinkNodes);
+ // sort all nodes in topological order, sinks come first and
sources come last
+ List<ExecNodeWrapper> orderedWrappers =
topologicalSort(sinkWrappers);
+ // group nodes into multiple input groups
+ createMultipleInputGroups(orderedWrappers);
+ // apply optimizations to remove unnecessary nodes out of
multiple input groups
+ optimizeMultipleInputGroups(orderedWrappers);
+
+ // create the real multiple input nodes
+ return createMultipleInputNodes(sinkWrappers);
+ }
+
+ //
--------------------------------------------------------------------------------
+ // Wrapping and Sorting
+ //
--------------------------------------------------------------------------------
+
+ private List<ExecNodeWrapper> wrapExecNodes(List<ExecNode<?, ?>>
sinkNodes) {
+ Map<ExecNode<?, ?>, ExecNodeWrapper> wrapperMap = new
HashMap<>();
+ AbstractExecNodeExactlyOnceVisitor visitor = new
AbstractExecNodeExactlyOnceVisitor() {
+ @Override
+ protected void visitNode(ExecNode<?, ?> node) {
+ ExecNodeWrapper wrapper =
wrapperMap.computeIfAbsent(node, k -> new ExecNodeWrapper(node));
+ for (ExecNode<?, ?> input :
node.getInputNodes()) {
+ ExecNodeWrapper inputWrapper =
wrapperMap.computeIfAbsent(input, k -> new ExecNodeWrapper(input));
+ wrapper.inputs.add(inputWrapper);
+ inputWrapper.outputs.add(wrapper);
+ }
+ visitInputs(node);
+ }
+ };
+ sinkNodes.forEach(s -> s.accept(visitor));
+
+ List<ExecNodeWrapper> sinkWrappers = new ArrayList<>();
+ for (ExecNode<?, ?> sink : sinkNodes) {
+ ExecNodeWrapper sinkWrapper = wrapperMap.get(sink);
+ Preconditions.checkNotNull(sinkWrapper, "Sink node is
not wrapped. This is a bug.");
+ sinkWrappers.add(sinkWrapper);
+ }
+ return sinkWrappers;
+ }
+
+ private List<ExecNodeWrapper> topologicalSort(List<ExecNodeWrapper>
sinkWrappers) {
+ List<ExecNodeWrapper> result = new ArrayList<>();
+ Queue<ExecNodeWrapper> queue = new LinkedList<>(sinkWrappers);
+ Map<ExecNodeWrapper, Integer> visitCountMap = new HashMap<>();
+
+ while (!queue.isEmpty()) {
+ ExecNodeWrapper wrapper = queue.poll();
+ result.add(wrapper);
+ for (ExecNodeWrapper inputWrapper : wrapper.inputs) {
+ int visitCount =
visitCountMap.compute(inputWrapper, (k, v) -> v == null ? 1 : v + 1);
+ if (visitCount == inputWrapper.outputs.size()) {
+ queue.offer(inputWrapper);
+ }
+ }
+ }
+
+ return result;
+ }
+
+ //
--------------------------------------------------------------------------------
+ // Multiple Input Groups Creating
+ //
--------------------------------------------------------------------------------
+
+ private void createMultipleInputGroups(List<ExecNodeWrapper>
orderedWrappers) {
+ // wrappers are checked in topological order from sinks to
sources
+ for (ExecNodeWrapper wrapper : orderedWrappers) {
+ // we skip nodes which cannot be a member of a multiple
input node
+ if (!canBeMultipleInputNodeMember(wrapper)) {
+ continue;
+ }
+
+ // we first try to assign this wrapper into the same
group with its outputs
+ MultipleInputGroup outputGroup =
canBeInSameGroupWithOutputs(wrapper);
+ if (outputGroup != null) {
+ wrapper.addToGroup(outputGroup);
+ continue;
+ }
+
+ // we then try to create a new multiple input group
with this node as the root
+ if (canBeRootOfMultipleInputGroup(wrapper)) {
+ wrapper.createGroup();
+ }
+
+ // all our attempts failed, this node will not be in a
multiple input node
+ }
+ }
+
+ private boolean canBeMultipleInputNodeMember(ExecNodeWrapper wrapper) {
+ if (wrapper.inputs.isEmpty()) {
+ // sources cannot be a member of multiple input node
+ return false;
+ }
+ if (wrapper.execNode instanceof BatchExecExchange) {
+ // exchange cannot be a member of multiple input node
+ return false;
+ }
+ if (wrapper.execNode instanceof StreamExecExchange) {
+ // exchange cannot be a member of multiple input node
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
+ * A node can only be assigned into the same multiple input group of
its outputs
+ * if all outputs have a group and are the same.
+ *
+ * @return the {@link MultipleInputGroup} of the outputs if all outputs
have a
+ * group and are the same, null otherwise
+ */
+ private MultipleInputGroup canBeInSameGroupWithOutputs(ExecNodeWrapper
wrapper) {
+ if (wrapper.outputs.isEmpty()) {
+ return null;
+ }
+
+ MultipleInputGroup outputGroup = wrapper.outputs.get(0).group;
+ if (outputGroup == null) {
+ return null;
+ }
+
+ for (ExecNodeWrapper outputWrapper : wrapper.outputs) {
+ if (outputWrapper.group != outputGroup) {
+ return null;
+ }
+ }
+
+ return outputGroup;
+ }
+
+ private boolean canBeRootOfMultipleInputGroup(ExecNodeWrapper wrapper) {
+ // only a node with more than one input can be the root,
+ // as one-input operator chaining are handled by operator chains
+ return wrapper.inputs.size() >= 2;
+ }
+
+ //
--------------------------------------------------------------------------------
+ // Multiple Input Groups Optimizing
+ //
--------------------------------------------------------------------------------
+
+ private void optimizeMultipleInputGroups(List<ExecNodeWrapper>
orderedWrappers) {
+ // wrappers are checked in topological order from sources to
sinks
+ for (int i = orderedWrappers.size() - 1; i >= 0; i--) {
+ ExecNodeWrapper wrapper = orderedWrappers.get(i);
+ MultipleInputGroup group = wrapper.group;
+ if (group == null) {
+ // we only consider nodes currently in a
multiple input group
+ continue;
+ }
+
+ boolean isUnion =
+ wrapper.execNode instanceof BatchExecUnion ||
wrapper.execNode instanceof StreamExecUnion;
Review comment:
`wrapper.execNode instanceof Union`
##########
File path:
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/reuse/MultipleInputNodeCreationProcessor.java
##########
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.reuse;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.transformations.ShuffleMode;
+import org.apache.flink.streaming.api.transformations.SourceTransformation;
+import
org.apache.flink.table.planner.plan.nodes.exec.AbstractExecNodeExactlyOnceVisitor;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecBoundedStreamScan;
+import
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecExchange;
+import
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecMultipleInputNode;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecUnion;
+import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecDataStreamScan;
+import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange;
+import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecMultipleInputNode;
+import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecUnion;
+import org.apache.flink.table.planner.plan.nodes.process.DAGProcessContext;
+import org.apache.flink.table.planner.plan.nodes.process.DAGProcessor;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelNode;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+/**
+ * A {@link DAGProcessor} which organize {@link ExecNode}s into multiple input
nodes.
+ *
+ * <p>For a detailed explanation of the algorithm, see appendix of the
+ * <a
href="https://docs.google.com/document/d/1qKVohV12qn-bM51cBZ8Hcgp31ntwClxjoiNBUOqVHsI">design
doc</a>.
+ */
+public class MultipleInputNodeCreationProcessor implements DAGProcessor {
+
+ private final boolean isStreaming;
+
+ public MultipleInputNodeCreationProcessor(boolean isStreaming) {
+ this.isStreaming = isStreaming;
+ }
+
+ @Override
+ public List<ExecNode<?, ?>> process(List<ExecNode<?, ?>> sinkNodes,
DAGProcessContext context) {
+ if (!isStreaming) {
+ // As multiple input nodes use function call to deliver
records between sub-operators,
+ // we cannot rely on network buffers to buffer records
not yet ready to be read,
+ // so only BLOCKING dam behavior is safe here.
+ // If conflict is detected under this stricter
constraint,
+ // we add a PIPELINED exchange to mark that its input
and output node cannot be merged
+ // into the same multiple input node
+ InputPriorityConflictResolver resolver = new
InputPriorityConflictResolver(
+ sinkNodes,
+ Collections.emptySet(),
+ ExecEdge.DamBehavior.BLOCKING,
+ ShuffleMode.PIPELINED);
+ resolver.detectAndResolve();
+ }
+
+ List<ExecNodeWrapper> sinkWrappers = wrapExecNodes(sinkNodes);
+ // sort all nodes in topological order, sinks come first and
sources come last
+ List<ExecNodeWrapper> orderedWrappers =
topologicalSort(sinkWrappers);
+ // group nodes into multiple input groups
+ createMultipleInputGroups(orderedWrappers);
+ // apply optimizations to remove unnecessary nodes out of
multiple input groups
+ optimizeMultipleInputGroups(orderedWrappers);
+
+ // create the real multiple input nodes
+ return createMultipleInputNodes(sinkWrappers);
+ }
+
+ //
--------------------------------------------------------------------------------
+ // Wrapping and Sorting
+ //
--------------------------------------------------------------------------------
+
+ private List<ExecNodeWrapper> wrapExecNodes(List<ExecNode<?, ?>>
sinkNodes) {
+ Map<ExecNode<?, ?>, ExecNodeWrapper> wrapperMap = new
HashMap<>();
+ AbstractExecNodeExactlyOnceVisitor visitor = new
AbstractExecNodeExactlyOnceVisitor() {
+ @Override
+ protected void visitNode(ExecNode<?, ?> node) {
+ ExecNodeWrapper wrapper =
wrapperMap.computeIfAbsent(node, k -> new ExecNodeWrapper(node));
+ for (ExecNode<?, ?> input :
node.getInputNodes()) {
+ ExecNodeWrapper inputWrapper =
wrapperMap.computeIfAbsent(input, k -> new ExecNodeWrapper(input));
+ wrapper.inputs.add(inputWrapper);
+ inputWrapper.outputs.add(wrapper);
+ }
+ visitInputs(node);
+ }
+ };
+ sinkNodes.forEach(s -> s.accept(visitor));
+
+ List<ExecNodeWrapper> sinkWrappers = new ArrayList<>();
+ for (ExecNode<?, ?> sink : sinkNodes) {
+ ExecNodeWrapper sinkWrapper = wrapperMap.get(sink);
+ Preconditions.checkNotNull(sinkWrapper, "Sink node is
not wrapped. This is a bug.");
+ sinkWrappers.add(sinkWrapper);
+ }
+ return sinkWrappers;
+ }
+
+ private List<ExecNodeWrapper> topologicalSort(List<ExecNodeWrapper>
sinkWrappers) {
+ List<ExecNodeWrapper> result = new ArrayList<>();
+ Queue<ExecNodeWrapper> queue = new LinkedList<>(sinkWrappers);
+ Map<ExecNodeWrapper, Integer> visitCountMap = new HashMap<>();
+
+ while (!queue.isEmpty()) {
+ ExecNodeWrapper wrapper = queue.poll();
+ result.add(wrapper);
+ for (ExecNodeWrapper inputWrapper : wrapper.inputs) {
+ int visitCount =
visitCountMap.compute(inputWrapper, (k, v) -> v == null ? 1 : v + 1);
+ if (visitCount == inputWrapper.outputs.size()) {
+ queue.offer(inputWrapper);
+ }
+ }
+ }
+
+ return result;
+ }
+
+ //
--------------------------------------------------------------------------------
+ // Multiple Input Groups Creating
+ //
--------------------------------------------------------------------------------
+
+ private void createMultipleInputGroups(List<ExecNodeWrapper>
orderedWrappers) {
+ // wrappers are checked in topological order from sinks to
sources
+ for (ExecNodeWrapper wrapper : orderedWrappers) {
+ // we skip nodes which cannot be a member of a multiple
input node
+ if (!canBeMultipleInputNodeMember(wrapper)) {
+ continue;
+ }
+
+ // we first try to assign this wrapper into the same
group with its outputs
+ MultipleInputGroup outputGroup =
canBeInSameGroupWithOutputs(wrapper);
+ if (outputGroup != null) {
+ wrapper.addToGroup(outputGroup);
+ continue;
+ }
+
+ // we then try to create a new multiple input group
with this node as the root
+ if (canBeRootOfMultipleInputGroup(wrapper)) {
+ wrapper.createGroup();
+ }
+
+ // all our attempts failed, this node will not be in a
multiple input node
+ }
+ }
+
+ private boolean canBeMultipleInputNodeMember(ExecNodeWrapper wrapper) {
+ if (wrapper.inputs.isEmpty()) {
+ // sources cannot be a member of multiple input node
+ return false;
+ }
+ if (wrapper.execNode instanceof BatchExecExchange) {
+ // exchange cannot be a member of multiple input node
+ return false;
+ }
+ if (wrapper.execNode instanceof StreamExecExchange) {
+ // exchange cannot be a member of multiple input node
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
+ * A node can only be assigned into the same multiple input group of
its outputs
+ * if all outputs have a group and are the same.
+ *
+ * @return the {@link MultipleInputGroup} of the outputs if all outputs
have a
+ * group and are the same, null otherwise
+ */
+ private MultipleInputGroup canBeInSameGroupWithOutputs(ExecNodeWrapper
wrapper) {
+ if (wrapper.outputs.isEmpty()) {
+ return null;
+ }
+
+ MultipleInputGroup outputGroup = wrapper.outputs.get(0).group;
+ if (outputGroup == null) {
+ return null;
+ }
+
+ for (ExecNodeWrapper outputWrapper : wrapper.outputs) {
+ if (outputWrapper.group != outputGroup) {
+ return null;
+ }
+ }
+
+ return outputGroup;
+ }
+
+ private boolean canBeRootOfMultipleInputGroup(ExecNodeWrapper wrapper) {
+ // only a node with more than one input can be the root,
+ // as one-input operator chaining are handled by operator chains
+ return wrapper.inputs.size() >= 2;
+ }
+
+ //
--------------------------------------------------------------------------------
+ // Multiple Input Groups Optimizing
+ //
--------------------------------------------------------------------------------
+
+ private void optimizeMultipleInputGroups(List<ExecNodeWrapper>
orderedWrappers) {
+ // wrappers are checked in topological order from sources to
sinks
+ for (int i = orderedWrappers.size() - 1; i >= 0; i--) {
+ ExecNodeWrapper wrapper = orderedWrappers.get(i);
+ MultipleInputGroup group = wrapper.group;
+ if (group == null) {
+ // we only consider nodes currently in a
multiple input group
+ continue;
+ }
+
+ boolean isUnion =
+ wrapper.execNode instanceof BatchExecUnion ||
wrapper.execNode instanceof StreamExecUnion;
+
+ if (group.members.size() == 1) {
+ Preconditions.checkState(
+ wrapper == group.root,
+ "The only member of a multiple input
group is not its root. This is a bug.");
+ // optimization 1. we clean up multiple input
groups with only 1 member,
+ // unless one of its input is a FLIP-27 source
(for maximizing source chaining),
+ // however unions do not apply to this
optimization because they're not real operators
+ if (isUnion ||
wrapper.inputs.stream().noneMatch(inputWrapper ->
isNewSource(inputWrapper.execNode))) {
+ wrapper.removeFromGroup();
+ }
+ continue;
+ }
+
+ if (!isTailOfMultipleInputGroup(wrapper)) {
+ // we're not removing a node from the middle of
a multiple input group
+ continue;
+ }
+
+ boolean shouldRemove = false;
+ if (isUnion) {
+ // optimization 2. we do not allow union to be
the tail of a multiple input
+ // as we're paying extra function calls for
this, unless one of the united
+ // input is a FLIP-27 source
+ shouldRemove =
wrapper.inputs.stream().noneMatch(inputWrapper ->
isNewSource(inputWrapper.execNode));
+ } else if (wrapper.inputs.size() == 1) {
+ // optimization 3. for one-input operators
we'll remove it unless its input
+ // is an exchange or a FLIP-27 source, this is
mainly to avoid the following
+ // pattern:
+ // non-chainable source -> calc --\
+ // join ->
+ // non-chainable source -> calc --/
+ // if we move two calcs into the multiple input
group rooted at the join, we're
+ // directly shuffling large amount of records
from the source without filtering
+ // by the calc
+ ExecNode<?, ?> input =
wrapper.inputs.get(0).execNode;
+ shouldRemove = !(input instanceof
BatchExecExchange) &&
+ !(input instanceof StreamExecExchange)
&&
+ !isNewSource(input);
+ }
+
+ // optimization 4. for singleton operations (for
example singleton global agg)
+ // we're not including it into the multiple input node
as we have to ensure that
+ // the whole multiple input can only have 1 parallelism.
+ // continuous singleton operations connected by
forwarding shuffle will be dealt
+ // together with optimization 3
+ shouldRemove |=
wrapper.inputs.stream().anyMatch(inputWrapper ->
+ inputWrapper.execNode instanceof
BatchExecExchange &&
+ ((BatchExecExchange)
inputWrapper.execNode)
+ .distribution.getType() ==
RelDistribution.Type.SINGLETON);
+
+ if (shouldRemove) {
+ wrapper.removeFromGroup();
+ }
+ }
+ }
+
+ private boolean isTailOfMultipleInputGroup(ExecNodeWrapper wrapper) {
+ Preconditions.checkNotNull(
+ wrapper.group,
+ "Exec node wrapper does not have a multiple input
group. This is a bug.");
+ for (ExecNodeWrapper inputWrapper : wrapper.inputs) {
+ if (inputWrapper.group == wrapper.group) {
+ // one of the input is in the same group, so
this node is not the tail of the group
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private boolean isNewSource(ExecNode<?, ?> node) {
Review comment:
`SourceProvider` could also provide new `Source`
##########
File path:
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/reuse/MultipleInputNodeCreationProcessor.java
##########
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.reuse;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.transformations.ShuffleMode;
+import org.apache.flink.streaming.api.transformations.SourceTransformation;
+import
org.apache.flink.table.planner.plan.nodes.exec.AbstractExecNodeExactlyOnceVisitor;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecBoundedStreamScan;
+import
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecExchange;
+import
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecMultipleInputNode;
+import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchExecUnion;
+import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecDataStreamScan;
+import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange;
+import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecMultipleInputNode;
+import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecUnion;
+import org.apache.flink.table.planner.plan.nodes.process.DAGProcessContext;
+import org.apache.flink.table.planner.plan.nodes.process.DAGProcessor;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelNode;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+
+/**
+ * A {@link DAGProcessor} which organize {@link ExecNode}s into multiple input
nodes.
+ *
+ * <p>For a detailed explanation of the algorithm, see appendix of the
+ * <a
href="https://docs.google.com/document/d/1qKVohV12qn-bM51cBZ8Hcgp31ntwClxjoiNBUOqVHsI">design
doc</a>.
+ */
+public class MultipleInputNodeCreationProcessor implements DAGProcessor {
+
+ private final boolean isStreaming;
+
+ public MultipleInputNodeCreationProcessor(boolean isStreaming) {
+ this.isStreaming = isStreaming;
+ }
+
+ @Override
+ public List<ExecNode<?, ?>> process(List<ExecNode<?, ?>> sinkNodes,
DAGProcessContext context) {
+ if (!isStreaming) {
+ // As multiple input nodes use function call to deliver
records between sub-operators,
+ // we cannot rely on network buffers to buffer records
not yet ready to be read,
+ // so only BLOCKING dam behavior is safe here.
+ // If conflict is detected under this stricter
constraint,
+ // we add a PIPELINED exchange to mark that its input
and output node cannot be merged
+ // into the same multiple input node
+ InputPriorityConflictResolver resolver = new
InputPriorityConflictResolver(
+ sinkNodes,
+ Collections.emptySet(),
+ ExecEdge.DamBehavior.BLOCKING,
+ ShuffleMode.PIPELINED);
+ resolver.detectAndResolve();
+ }
+
+ List<ExecNodeWrapper> sinkWrappers = wrapExecNodes(sinkNodes);
+ // sort all nodes in topological order, sinks come first and
sources come last
+ List<ExecNodeWrapper> orderedWrappers =
topologicalSort(sinkWrappers);
+ // group nodes into multiple input groups
+ createMultipleInputGroups(orderedWrappers);
+ // apply optimizations to remove unnecessary nodes out of
multiple input groups
+ optimizeMultipleInputGroups(orderedWrappers);
+
+ // create the real multiple input nodes
+ return createMultipleInputNodes(sinkWrappers);
+ }
+
+ //
--------------------------------------------------------------------------------
+ // Wrapping and Sorting
+ //
--------------------------------------------------------------------------------
+
+ private List<ExecNodeWrapper> wrapExecNodes(List<ExecNode<?, ?>>
sinkNodes) {
+ Map<ExecNode<?, ?>, ExecNodeWrapper> wrapperMap = new
HashMap<>();
+ AbstractExecNodeExactlyOnceVisitor visitor = new
AbstractExecNodeExactlyOnceVisitor() {
+ @Override
+ protected void visitNode(ExecNode<?, ?> node) {
+ ExecNodeWrapper wrapper =
wrapperMap.computeIfAbsent(node, k -> new ExecNodeWrapper(node));
+ for (ExecNode<?, ?> input :
node.getInputNodes()) {
+ ExecNodeWrapper inputWrapper =
wrapperMap.computeIfAbsent(input, k -> new ExecNodeWrapper(input));
+ wrapper.inputs.add(inputWrapper);
+ inputWrapper.outputs.add(wrapper);
+ }
+ visitInputs(node);
+ }
+ };
+ sinkNodes.forEach(s -> s.accept(visitor));
+
+ List<ExecNodeWrapper> sinkWrappers = new ArrayList<>();
+ for (ExecNode<?, ?> sink : sinkNodes) {
+ ExecNodeWrapper sinkWrapper = wrapperMap.get(sink);
+ Preconditions.checkNotNull(sinkWrapper, "Sink node is
not wrapped. This is a bug.");
+ sinkWrappers.add(sinkWrapper);
+ }
+ return sinkWrappers;
+ }
+
+ private List<ExecNodeWrapper> topologicalSort(List<ExecNodeWrapper>
sinkWrappers) {
+ List<ExecNodeWrapper> result = new ArrayList<>();
+ Queue<ExecNodeWrapper> queue = new LinkedList<>(sinkWrappers);
+ Map<ExecNodeWrapper, Integer> visitCountMap = new HashMap<>();
+
+ while (!queue.isEmpty()) {
+ ExecNodeWrapper wrapper = queue.poll();
+ result.add(wrapper);
+ for (ExecNodeWrapper inputWrapper : wrapper.inputs) {
+ int visitCount =
visitCountMap.compute(inputWrapper, (k, v) -> v == null ? 1 : v + 1);
+ if (visitCount == inputWrapper.outputs.size()) {
+ queue.offer(inputWrapper);
+ }
+ }
+ }
+
+ return result;
+ }
+
+ //
--------------------------------------------------------------------------------
+ // Multiple Input Groups Creating
+ //
--------------------------------------------------------------------------------
+
+ private void createMultipleInputGroups(List<ExecNodeWrapper>
orderedWrappers) {
+ // wrappers are checked in topological order from sinks to
sources
+ for (ExecNodeWrapper wrapper : orderedWrappers) {
+ // we skip nodes which cannot be a member of a multiple
input node
+ if (!canBeMultipleInputNodeMember(wrapper)) {
+ continue;
+ }
+
+ // we first try to assign this wrapper into the same
group with its outputs
+ MultipleInputGroup outputGroup =
canBeInSameGroupWithOutputs(wrapper);
+ if (outputGroup != null) {
+ wrapper.addToGroup(outputGroup);
+ continue;
+ }
+
+ // we then try to create a new multiple input group
with this node as the root
+ if (canBeRootOfMultipleInputGroup(wrapper)) {
+ wrapper.createGroup();
+ }
+
+ // all our attempts failed, this node will not be in a
multiple input node
+ }
+ }
+
+ private boolean canBeMultipleInputNodeMember(ExecNodeWrapper wrapper) {
+ if (wrapper.inputs.isEmpty()) {
+ // sources cannot be a member of multiple input node
+ return false;
+ }
+ if (wrapper.execNode instanceof BatchExecExchange) {
+ // exchange cannot be a member of multiple input node
+ return false;
+ }
+ if (wrapper.execNode instanceof StreamExecExchange) {
+ // exchange cannot be a member of multiple input node
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
+ * A node can only be assigned into the same multiple input group of
its outputs
+ * if all outputs have a group and are the same.
+ *
+ * @return the {@link MultipleInputGroup} of the outputs if all outputs
have a
+ * group and are the same, null otherwise
+ */
+ private MultipleInputGroup canBeInSameGroupWithOutputs(ExecNodeWrapper
wrapper) {
+ if (wrapper.outputs.isEmpty()) {
+ return null;
+ }
+
+ MultipleInputGroup outputGroup = wrapper.outputs.get(0).group;
+ if (outputGroup == null) {
+ return null;
+ }
+
+ for (ExecNodeWrapper outputWrapper : wrapper.outputs) {
+ if (outputWrapper.group != outputGroup) {
+ return null;
+ }
+ }
+
+ return outputGroup;
+ }
+
+ private boolean canBeRootOfMultipleInputGroup(ExecNodeWrapper wrapper) {
+ // only a node with more than one input can be the root,
+ // as one-input operator chaining are handled by operator chains
+ return wrapper.inputs.size() >= 2;
+ }
+
+ //
--------------------------------------------------------------------------------
+ // Multiple Input Groups Optimizing
+ //
--------------------------------------------------------------------------------
+
+ private void optimizeMultipleInputGroups(List<ExecNodeWrapper>
orderedWrappers) {
+ // wrappers are checked in topological order from sources to
sinks
+ for (int i = orderedWrappers.size() - 1; i >= 0; i--) {
+ ExecNodeWrapper wrapper = orderedWrappers.get(i);
+ MultipleInputGroup group = wrapper.group;
+ if (group == null) {
+ // we only consider nodes currently in a
multiple input group
+ continue;
+ }
+
+ boolean isUnion =
+ wrapper.execNode instanceof BatchExecUnion ||
wrapper.execNode instanceof StreamExecUnion;
+
+ if (group.members.size() == 1) {
+ Preconditions.checkState(
+ wrapper == group.root,
+ "The only member of a multiple input
group is not its root. This is a bug.");
+ // optimization 1. we clean up multiple input
groups with only 1 member,
+ // unless one of its input is a FLIP-27 source
(for maximizing source chaining),
+ // however unions do not apply to this
optimization because they're not real operators
+ if (isUnion ||
wrapper.inputs.stream().noneMatch(inputWrapper ->
isNewSource(inputWrapper.execNode))) {
+ wrapper.removeFromGroup();
+ }
+ continue;
+ }
+
+ if (!isTailOfMultipleInputGroup(wrapper)) {
+ // we're not removing a node from the middle of
a multiple input group
+ continue;
+ }
+
+ boolean shouldRemove = false;
+ if (isUnion) {
+ // optimization 2. we do not allow union to be
the tail of a multiple input
+ // as we're paying extra function calls for
this, unless one of the united
+ // input is a FLIP-27 source
+ shouldRemove =
wrapper.inputs.stream().noneMatch(inputWrapper ->
isNewSource(inputWrapper.execNode));
+ } else if (wrapper.inputs.size() == 1) {
+ // optimization 3. for one-input operators
we'll remove it unless its input
+ // is an exchange or a FLIP-27 source, this is
mainly to avoid the following
+ // pattern:
+ // non-chainable source -> calc --\
+ // join ->
+ // non-chainable source -> calc --/
+ // if we move two calcs into the multiple input
group rooted at the join, we're
+ // directly shuffling large amount of records
from the source without filtering
+ // by the calc
+ ExecNode<?, ?> input =
wrapper.inputs.get(0).execNode;
+ shouldRemove = !(input instanceof
BatchExecExchange) &&
+ !(input instanceof StreamExecExchange)
&&
+ !isNewSource(input);
+ }
+
+ // optimization 4. for singleton operations (for
example singleton global agg)
+ // we're not including it into the multiple input node
as we have to ensure that
+ // the whole multiple input can only have 1 parallelism.
+ // continuous singleton operations connected by
forwarding shuffle will be dealt
+ // together with optimization 3
+ shouldRemove |=
wrapper.inputs.stream().anyMatch(inputWrapper ->
+ inputWrapper.execNode instanceof
BatchExecExchange &&
+ ((BatchExecExchange)
inputWrapper.execNode)
+ .distribution.getType() ==
RelDistribution.Type.SINGLETON);
+
+ if (shouldRemove) {
+ wrapper.removeFromGroup();
+ }
+ }
+ }
+
+ private boolean isTailOfMultipleInputGroup(ExecNodeWrapper wrapper) {
Review comment:
`Tail` => `Header` ?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]