zhuzhurk commented on code in PR #25822:
URL: https://github.com/apache/flink/pull/25822#discussion_r1903910976
##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/StreamEdgeUpdateRequestInfo.java:
##########
@@ -29,6 +29,7 @@ public class StreamEdgeUpdateRequestInfo {
private final Integer targetId;
private StreamPartitioner<?> outputPartitioner;
+ private int typeNumber;
Review Comment:
A comment is needed to explain what if it is 0.
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java:
##########
@@ -166,7 +166,7 @@ public class OptimizerConfigOptions {
TABLE_OPTIMIZER_ADAPTIVE_BROADCAST_JOIN_STRATEGY =
key("table.optimizer.adaptive-broadcast-join.strategy")
.enumType(AdaptiveBroadcastJoinStrategy.class)
- .defaultValue(AdaptiveBroadcastJoinStrategy.NONE)
+ .defaultValue(AdaptiveBroadcastJoinStrategy.AUTO)
Review Comment:
I prefer this commit to be part of the previous one, given that the change
is not complex and there is no specific test added for it.
##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/StreamEdgeUpdateRequestInfo.java:
##########
@@ -41,6 +42,11 @@ public StreamEdgeUpdateRequestInfo
outputPartitioner(StreamPartitioner<?> output
return this;
}
+ public StreamEdgeUpdateRequestInfo typeNumber(int typeNumber) {
Review Comment:
-> withTypeNumber
and for the above method -> withOutputPartitioner
##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/StreamNodeUpdateRequestInfo.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+/** Helper class carries the data required to updates a stream edge. */
+@Internal
+public class StreamNodeUpdateRequestInfo {
+ private final Integer nodeId;
+
+ private TypeSerializer<?>[] typeSerializersIn;
+
+ public StreamNodeUpdateRequestInfo(Integer nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ public StreamNodeUpdateRequestInfo typeSerializersIn(TypeSerializer<?>[]
typeSerializersIn) {
Review Comment:
-> withTypeSerializersIn
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/adaptive/AdaptiveJoinOperatorGenerator.java:
##########
@@ -155,4 +155,15 @@ public void markAsBroadcastJoin(boolean canBroadcast,
boolean leftIsBuild) {
this.isBroadcastJoin = canBroadcast;
this.leftIsBuild = leftIsBuild;
}
+
+ @Override
+ public boolean shouldReorderInputs() {
+ // Sort merge join requires the left side to be read first if the
broadcast threshold is not
+ // met.
+ if (!isBroadcastJoin && originalJoin == OperatorType.SortMergeJoin) {
Review Comment:
This means it may re-order inputs even if a ShuffledHashJoin is not
converted into a broadcast-join?
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/strategy/PostProcessAdaptiveJoinStrategy.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.strategy;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.runtime.scheduler.adaptivebatch.OperatorsFinished;
+import org.apache.flink.streaming.api.graph.StreamGraphContext;
+import org.apache.flink.streaming.api.graph.util.ImmutableStreamEdge;
+import org.apache.flink.streaming.api.graph.util.ImmutableStreamNode;
+import org.apache.flink.streaming.api.graph.util.StreamEdgeUpdateRequestInfo;
+import org.apache.flink.streaming.api.graph.util.StreamNodeUpdateRequestInfo;
+import org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoin;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The post-processing phase of adaptive join optimization, which must be
placed at the end of all
+ * adaptive join optimization strategies.
+ */
+public class PostProcessAdaptiveJoinStrategy extends
BaseAdaptiveJoinOperatorOptimizationStrategy {
Review Comment:
Comments are needed to explain why it's needed.
##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/DefaultStreamGraphContext.java:
##########
@@ -161,21 +216,23 @@ private boolean
validateStreamEdgeUpdateRequest(StreamEdgeUpdateRequestInfo requ
}
// Modification is not allowed when the subscribing output is reused.
- Map<StreamEdge, NonChainedOutput> opIntermediateOutputs =
- opIntermediateOutputsCaches.get(sourceNodeId);
- NonChainedOutput output =
- opIntermediateOutputs != null ?
opIntermediateOutputs.get(targetEdge) : null;
- if (output != null) {
- Set<StreamEdge> consumerStreamEdges =
- opIntermediateOutputs.entrySet().stream()
- .filter(entry -> entry.getValue().equals(output))
- .map(Map.Entry::getKey)
- .collect(Collectors.toSet());
- if (consumerStreamEdges.size() != 1) {
- LOG.info(
- "Skip modifying edge {} because the subscribing output
is reused.",
- targetEdge);
- return false;
+ if (requestInfo.getOutputPartitioner() != null) {
Review Comment:
Why is this change needed?
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/strategy/PostProcessAdaptiveJoinStrategy.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.strategy;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.runtime.scheduler.adaptivebatch.OperatorsFinished;
+import org.apache.flink.streaming.api.graph.StreamGraphContext;
+import org.apache.flink.streaming.api.graph.util.ImmutableStreamEdge;
+import org.apache.flink.streaming.api.graph.util.ImmutableStreamNode;
+import org.apache.flink.streaming.api.graph.util.StreamEdgeUpdateRequestInfo;
+import org.apache.flink.streaming.api.graph.util.StreamNodeUpdateRequestInfo;
+import org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoin;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The post-processing phase of adaptive join optimization, which must be
placed at the end of all
+ * adaptive join optimization strategies.
+ */
+public class PostProcessAdaptiveJoinStrategy extends
BaseAdaptiveJoinOperatorOptimizationStrategy {
+
+ @Override
+ public boolean onOperatorsFinished(
+ OperatorsFinished operatorsFinished, StreamGraphContext context) {
+ visitDownstreamAdaptiveJoinNode(operatorsFinished, context);
+
+ return true;
+ }
+
+ @Override
+ protected void tryOptimizeAdaptiveJoin(
+ OperatorsFinished operatorsFinished,
+ StreamGraphContext context,
+ ImmutableStreamNode adaptiveJoinNode,
+ List<ImmutableStreamEdge> upstreamStreamEdges,
+ AdaptiveJoin adaptiveJoin) {
+ if (context.areAllUpstreamNodesFinished(adaptiveJoinNode)) {
+ // For hash join, reorder the join node inputs so the build side
is read first.
+ if (adaptiveJoin.shouldReorderInputs()) {
+ if (!context.modifyStreamEdge(
+
generateStreamEdgeUpdateRequestInfos(adaptiveJoinNode))
+ || !context.modifyStreamNode(
+
generateStreamNodeUpdateRequestInfos(adaptiveJoinNode))) {
+ throw new RuntimeException(
+ "Unexpected error occurs while reordering the
inputs "
+ + "of the adaptive join node, potentially
leading to data inaccuracies. "
+ + "Exceptions will be thrown.");
+ }
+ }
+
+ // Lazily generate OperatorFactory for adaptive join operator.
+ ReadableConfig config =
context.getStreamGraph().getConfiguration();
+ ClassLoader userClassLoader =
context.getStreamGraph().getUserClassLoader();
+ adaptiveJoin.genOperatorFactory(userClassLoader, config);
+ }
+ }
+
+ private static List<StreamEdgeUpdateRequestInfo>
generateStreamEdgeUpdateRequestInfos(
Review Comment:
-> generateStreamEdgeUpdateRequestInfosForInputsReordered
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/strategy/PostProcessAdaptiveJoinStrategy.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.strategy;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.runtime.scheduler.adaptivebatch.OperatorsFinished;
+import org.apache.flink.streaming.api.graph.StreamGraphContext;
+import org.apache.flink.streaming.api.graph.util.ImmutableStreamEdge;
+import org.apache.flink.streaming.api.graph.util.ImmutableStreamNode;
+import org.apache.flink.streaming.api.graph.util.StreamEdgeUpdateRequestInfo;
+import org.apache.flink.streaming.api.graph.util.StreamNodeUpdateRequestInfo;
+import org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoin;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The post-processing phase of adaptive join optimization, which must be
placed at the end of all
+ * adaptive join optimization strategies.
+ */
+public class PostProcessAdaptiveJoinStrategy extends
BaseAdaptiveJoinOperatorOptimizationStrategy {
+
+ @Override
+ public boolean onOperatorsFinished(
+ OperatorsFinished operatorsFinished, StreamGraphContext context) {
+ visitDownstreamAdaptiveJoinNode(operatorsFinished, context);
+
+ return true;
+ }
+
+ @Override
+ protected void tryOptimizeAdaptiveJoin(
+ OperatorsFinished operatorsFinished,
+ StreamGraphContext context,
+ ImmutableStreamNode adaptiveJoinNode,
+ List<ImmutableStreamEdge> upstreamStreamEdges,
+ AdaptiveJoin adaptiveJoin) {
+ if (context.areAllUpstreamNodesFinished(adaptiveJoinNode)) {
+ // For hash join, reorder the join node inputs so the build side
is read first.
+ if (adaptiveJoin.shouldReorderInputs()) {
+ if (!context.modifyStreamEdge(
+
generateStreamEdgeUpdateRequestInfos(adaptiveJoinNode))
+ || !context.modifyStreamNode(
+
generateStreamNodeUpdateRequestInfos(adaptiveJoinNode))) {
+ throw new RuntimeException(
+ "Unexpected error occurs while reordering the
inputs "
+ + "of the adaptive join node, potentially
leading to data inaccuracies. "
+ + "Exceptions will be thrown.");
+ }
+ }
+
+ // Lazily generate OperatorFactory for adaptive join operator.
Review Comment:
-> generate OperatorFactory for adaptive join operator after inputs are
reordered.
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/strategy/PostProcessAdaptiveJoinStrategy.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.strategy;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.runtime.scheduler.adaptivebatch.OperatorsFinished;
+import org.apache.flink.streaming.api.graph.StreamGraphContext;
+import org.apache.flink.streaming.api.graph.util.ImmutableStreamEdge;
+import org.apache.flink.streaming.api.graph.util.ImmutableStreamNode;
+import org.apache.flink.streaming.api.graph.util.StreamEdgeUpdateRequestInfo;
+import org.apache.flink.streaming.api.graph.util.StreamNodeUpdateRequestInfo;
+import org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoin;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The post-processing phase of adaptive join optimization, which must be
placed at the end of all
+ * adaptive join optimization strategies.
+ */
+public class PostProcessAdaptiveJoinStrategy extends
BaseAdaptiveJoinOperatorOptimizationStrategy {
+
+ @Override
+ public boolean onOperatorsFinished(
+ OperatorsFinished operatorsFinished, StreamGraphContext context) {
+ visitDownstreamAdaptiveJoinNode(operatorsFinished, context);
+
+ return true;
+ }
+
+ @Override
+ protected void tryOptimizeAdaptiveJoin(
+ OperatorsFinished operatorsFinished,
+ StreamGraphContext context,
+ ImmutableStreamNode adaptiveJoinNode,
+ List<ImmutableStreamEdge> upstreamStreamEdges,
+ AdaptiveJoin adaptiveJoin) {
+ if (context.areAllUpstreamNodesFinished(adaptiveJoinNode)) {
+ // For hash join, reorder the join node inputs so the build side
is read first.
+ if (adaptiveJoin.shouldReorderInputs()) {
+ if (!context.modifyStreamEdge(
+
generateStreamEdgeUpdateRequestInfos(adaptiveJoinNode))
+ || !context.modifyStreamNode(
+
generateStreamNodeUpdateRequestInfos(adaptiveJoinNode))) {
+ throw new RuntimeException(
+ "Unexpected error occurs while reordering the
inputs "
+ + "of the adaptive join node, potentially
leading to data inaccuracies. "
+ + "Exceptions will be thrown.");
+ }
+ }
+
+ // Lazily generate OperatorFactory for adaptive join operator.
+ ReadableConfig config =
context.getStreamGraph().getConfiguration();
+ ClassLoader userClassLoader =
context.getStreamGraph().getUserClassLoader();
+ adaptiveJoin.genOperatorFactory(userClassLoader, config);
+ }
+ }
+
+ private static List<StreamEdgeUpdateRequestInfo>
generateStreamEdgeUpdateRequestInfos(
+ ImmutableStreamNode adaptiveJoinNode) {
+ List<StreamEdgeUpdateRequestInfo> streamEdgeUpdateRequestInfos = new
ArrayList<>();
+ for (ImmutableStreamEdge inEdge : adaptiveJoinNode.getInEdges()) {
+ StreamEdgeUpdateRequestInfo streamEdgeUpdateRequestInfo =
+ new StreamEdgeUpdateRequestInfo(
+ inEdge.getEdgeId(), inEdge.getSourceId(),
inEdge.getTargetId());
+ streamEdgeUpdateRequestInfo.typeNumber(inEdge.getTypeNumber() == 1
? 2 : 1);
Review Comment:
Why not make typeNumber part of the constructor of
`StreamEdgeUpdateRequestInfo`?
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/strategy/AdaptiveBroadcastJoinOptimizationStrategy.java:
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.strategy;
+
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.scheduler.adaptivebatch.BlockingResultInfo;
+import org.apache.flink.runtime.scheduler.adaptivebatch.OperatorsFinished;
+import org.apache.flink.streaming.api.graph.StreamGraphContext;
+import org.apache.flink.streaming.api.graph.util.ImmutableStreamEdge;
+import org.apache.flink.streaming.api.graph.util.ImmutableStreamNode;
+import org.apache.flink.streaming.api.graph.util.StreamEdgeUpdateRequestInfo;
+import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoin;
+
+import org.apache.flink.shaded.guava32.com.google.common.base.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** The stream graph optimization strategy of adaptive broadcast join. */
+public class AdaptiveBroadcastJoinOptimizationStrategy
+ extends BaseAdaptiveJoinOperatorOptimizationStrategy {
+ private static final Logger LOG =
+
LoggerFactory.getLogger(AdaptiveBroadcastJoinOptimizationStrategy.class);
+
+ private Long broadcastThreshold;
+
+ private Map<Integer, Map<Integer, Long>>
aggregatedProducedBytesByTypeNumberAndNodeId;
+
+ @Override
+ public boolean onOperatorsFinished(
+ OperatorsFinished operatorsFinished, StreamGraphContext context) {
+ initialize(context.getStreamGraph().getConfiguration());
+ visitDownstreamAdaptiveJoinNode(operatorsFinished, context);
+
+ return true;
+ }
+
+ @Override
+ protected void tryOptimizeAdaptiveJoin(
+ OperatorsFinished operatorsFinished,
+ StreamGraphContext context,
+ ImmutableStreamNode adaptiveJoinNode,
+ List<ImmutableStreamEdge> upstreamStreamEdges,
+ AdaptiveJoin adaptiveJoin) {
+ for (ImmutableStreamEdge upstreamEdge : upstreamStreamEdges) {
+ IntermediateDataSetID relatedDataSetID =
+
context.getConsumedIntermediateDataSetId(upstreamEdge.getEdgeId());
+ long producedBytes =
+
operatorsFinished.getResultInfoMap().get(upstreamEdge.getSourceId()).stream()
+ .filter(
+ blockingResultInfo ->
+ blockingResultInfo.getResultId()
== relatedDataSetID)
+ .mapToLong(BlockingResultInfo::getNumBytesProduced)
+ .sum();
+ aggregatedProducedBytesByTypeNumber(
Review Comment:
aggregatedProducedBytesByTypeNumber -> aggregatedInputBytesByTypeNumber
For the joinNode, it is input bytes instead of produced bytes.
This also applies to `aggregatedProducedBytesByTypeNumberAndNodeId`.
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/strategy/AdaptiveBroadcastJoinOptimizationStrategy.java:
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.strategy;
+
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.scheduler.adaptivebatch.BlockingResultInfo;
+import org.apache.flink.runtime.scheduler.adaptivebatch.OperatorsFinished;
+import org.apache.flink.streaming.api.graph.StreamGraphContext;
+import org.apache.flink.streaming.api.graph.util.ImmutableStreamEdge;
+import org.apache.flink.streaming.api.graph.util.ImmutableStreamNode;
+import org.apache.flink.streaming.api.graph.util.StreamEdgeUpdateRequestInfo;
+import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoin;
+
+import org.apache.flink.shaded.guava32.com.google.common.base.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** The stream graph optimization strategy of adaptive broadcast join. */
+public class AdaptiveBroadcastJoinOptimizationStrategy
+ extends BaseAdaptiveJoinOperatorOptimizationStrategy {
+ private static final Logger LOG =
+
LoggerFactory.getLogger(AdaptiveBroadcastJoinOptimizationStrategy.class);
+
+ private Long broadcastThreshold;
+
+ private Map<Integer, Map<Integer, Long>>
aggregatedProducedBytesByTypeNumberAndNodeId;
+
+ @Override
+ public boolean onOperatorsFinished(
+ OperatorsFinished operatorsFinished, StreamGraphContext context) {
+ initialize(context.getStreamGraph().getConfiguration());
+ visitDownstreamAdaptiveJoinNode(operatorsFinished, context);
+
+ return true;
+ }
+
+ @Override
+ protected void tryOptimizeAdaptiveJoin(
+ OperatorsFinished operatorsFinished,
+ StreamGraphContext context,
+ ImmutableStreamNode adaptiveJoinNode,
+ List<ImmutableStreamEdge> upstreamStreamEdges,
+ AdaptiveJoin adaptiveJoin) {
+ for (ImmutableStreamEdge upstreamEdge : upstreamStreamEdges) {
+ IntermediateDataSetID relatedDataSetID =
+
context.getConsumedIntermediateDataSetId(upstreamEdge.getEdgeId());
+ long producedBytes =
+
operatorsFinished.getResultInfoMap().get(upstreamEdge.getSourceId()).stream()
+ .filter(
+ blockingResultInfo ->
+ blockingResultInfo.getResultId()
== relatedDataSetID)
+ .mapToLong(BlockingResultInfo::getNumBytesProduced)
+ .sum();
+ aggregatedProducedBytesByTypeNumber(
+ adaptiveJoinNode, upstreamEdge.getTypeNumber(),
producedBytes);
+ }
+
+ // If all upstream nodes have finished, we attempt to optimize the
AdaptiveJoin node.
+ if (context.areAllUpstreamNodesFinished(adaptiveJoinNode)) {
+ Long leftInputSize =
+ aggregatedProducedBytesByTypeNumberAndNodeId
+ .get(adaptiveJoinNode.getId())
+ .get(1);
+ Long rightInputSize =
+ aggregatedProducedBytesByTypeNumberAndNodeId
+ .get(adaptiveJoinNode.getId())
+ .get(2);
+ Preconditions.checkArgument(
+ leftInputSize != null && rightInputSize != null,
Review Comment:
It's better to check `leftInputSize` and `rightInputSize` separately so that
the exception can show which side is null.
Maybe change the error message to 'Left input bytes of adaptive join {} is
unknown, which is unexpected.'
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/strategy/AdaptiveBroadcastJoinOptimizationStrategy.java:
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.strategy;
+
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.scheduler.adaptivebatch.BlockingResultInfo;
+import org.apache.flink.runtime.scheduler.adaptivebatch.OperatorsFinished;
+import org.apache.flink.streaming.api.graph.StreamGraphContext;
+import org.apache.flink.streaming.api.graph.util.ImmutableStreamEdge;
+import org.apache.flink.streaming.api.graph.util.ImmutableStreamNode;
+import org.apache.flink.streaming.api.graph.util.StreamEdgeUpdateRequestInfo;
+import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoin;
+
+import org.apache.flink.shaded.guava32.com.google.common.base.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** The stream graph optimization strategy of adaptive broadcast join. */
+public class AdaptiveBroadcastJoinOptimizationStrategy
+ extends BaseAdaptiveJoinOperatorOptimizationStrategy {
+ private static final Logger LOG =
+
LoggerFactory.getLogger(AdaptiveBroadcastJoinOptimizationStrategy.class);
+
+ private Long broadcastThreshold;
+
+ private Map<Integer, Map<Integer, Long>>
aggregatedProducedBytesByTypeNumberAndNodeId;
+
+ @Override
+ public boolean onOperatorsFinished(
+ OperatorsFinished operatorsFinished, StreamGraphContext context) {
+ initialize(context.getStreamGraph().getConfiguration());
+ visitDownstreamAdaptiveJoinNode(operatorsFinished, context);
+
+ return true;
+ }
+
+ @Override
+ protected void tryOptimizeAdaptiveJoin(
+ OperatorsFinished operatorsFinished,
+ StreamGraphContext context,
+ ImmutableStreamNode adaptiveJoinNode,
+ List<ImmutableStreamEdge> upstreamStreamEdges,
+ AdaptiveJoin adaptiveJoin) {
+ for (ImmutableStreamEdge upstreamEdge : upstreamStreamEdges) {
+ IntermediateDataSetID relatedDataSetID =
+
context.getConsumedIntermediateDataSetId(upstreamEdge.getEdgeId());
+ long producedBytes =
+
operatorsFinished.getResultInfoMap().get(upstreamEdge.getSourceId()).stream()
+ .filter(
+ blockingResultInfo ->
+ blockingResultInfo.getResultId()
== relatedDataSetID)
Review Comment:
relatedDataSetId.equals(blockingResultInfo.getResultId())
relatedDataSetID - > relatedDataSetId
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/strategy/PostProcessAdaptiveJoinStrategy.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.strategy;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.runtime.scheduler.adaptivebatch.OperatorsFinished;
+import org.apache.flink.streaming.api.graph.StreamGraphContext;
+import org.apache.flink.streaming.api.graph.util.ImmutableStreamEdge;
+import org.apache.flink.streaming.api.graph.util.ImmutableStreamNode;
+import org.apache.flink.streaming.api.graph.util.StreamEdgeUpdateRequestInfo;
+import org.apache.flink.streaming.api.graph.util.StreamNodeUpdateRequestInfo;
+import org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoin;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The post-processing phase of adaptive join optimization, which must be
placed at the end of all
+ * adaptive join optimization strategies.
+ */
+public class PostProcessAdaptiveJoinStrategy extends
BaseAdaptiveJoinOperatorOptimizationStrategy {
+
+ @Override
+ public boolean onOperatorsFinished(
+ OperatorsFinished operatorsFinished, StreamGraphContext context) {
+ visitDownstreamAdaptiveJoinNode(operatorsFinished, context);
+
+ return true;
+ }
+
+ @Override
+ protected void tryOptimizeAdaptiveJoin(
+ OperatorsFinished operatorsFinished,
+ StreamGraphContext context,
+ ImmutableStreamNode adaptiveJoinNode,
+ List<ImmutableStreamEdge> upstreamStreamEdges,
+ AdaptiveJoin adaptiveJoin) {
+ if (context.areAllUpstreamNodesFinished(adaptiveJoinNode)) {
+ // For hash join, reorder the join node inputs so the build side
is read first.
+ if (adaptiveJoin.shouldReorderInputs()) {
+ if (!context.modifyStreamEdge(
+
generateStreamEdgeUpdateRequestInfos(adaptiveJoinNode))
+ || !context.modifyStreamNode(
+
generateStreamNodeUpdateRequestInfos(adaptiveJoinNode))) {
+ throw new RuntimeException(
+ "Unexpected error occurs while reordering the
inputs "
+ + "of the adaptive join node, potentially
leading to data inaccuracies. "
+ + "Exceptions will be thrown.");
+ }
+ }
+
+ // Lazily generate OperatorFactory for adaptive join operator.
+ ReadableConfig config =
context.getStreamGraph().getConfiguration();
+ ClassLoader userClassLoader =
context.getStreamGraph().getUserClassLoader();
+ adaptiveJoin.genOperatorFactory(userClassLoader, config);
+ }
+ }
+
+ private static List<StreamEdgeUpdateRequestInfo>
generateStreamEdgeUpdateRequestInfos(
+ ImmutableStreamNode adaptiveJoinNode) {
+ List<StreamEdgeUpdateRequestInfo> streamEdgeUpdateRequestInfos = new
ArrayList<>();
+ for (ImmutableStreamEdge inEdge : adaptiveJoinNode.getInEdges()) {
+ StreamEdgeUpdateRequestInfo streamEdgeUpdateRequestInfo =
+ new StreamEdgeUpdateRequestInfo(
+ inEdge.getEdgeId(), inEdge.getSourceId(),
inEdge.getTargetId());
+ streamEdgeUpdateRequestInfo.typeNumber(inEdge.getTypeNumber() == 1
? 2 : 1);
+ streamEdgeUpdateRequestInfos.add(streamEdgeUpdateRequestInfo);
+ }
+ return streamEdgeUpdateRequestInfos;
+ }
+
+ private List<StreamNodeUpdateRequestInfo>
generateStreamNodeUpdateRequestInfos(
+ ImmutableStreamNode modifiedNode) {
+ List<StreamNodeUpdateRequestInfo> streamEdgeUpdateRequestInfos = new
ArrayList<>();
+
+ TypeSerializer<?>[] typeSerializers =
modifiedNode.getTypeSerializersIn();
+ Preconditions.checkState(
+ typeSerializers.length == 2,
+ String.format(
+ "Adaptive join currently only supports two "
+ + "inputs, but the join node [%s] has received
%s inputs.",
+ modifiedNode.getId(), typeSerializers.length));
+ TypeSerializer<?>[] swappedTypeSerializers = new TypeSerializer<?>[2];
+ swappedTypeSerializers[0] = typeSerializers[1];
+ swappedTypeSerializers[1] = typeSerializers[0];
+ StreamNodeUpdateRequestInfo requestInfo =
+ new StreamNodeUpdateRequestInfo(modifiedNode.getId())
+ .typeSerializersIn(swappedTypeSerializers);
Review Comment:
If it always sets this field immediately after the requestInfo is created,
it's better to add it to the constructor so that the field can be final.
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/strategy/AdaptiveBroadcastJoinOptimizationStrategy.java:
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.strategy;
+
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.scheduler.adaptivebatch.BlockingResultInfo;
+import org.apache.flink.runtime.scheduler.adaptivebatch.OperatorsFinished;
+import org.apache.flink.streaming.api.graph.StreamGraphContext;
+import org.apache.flink.streaming.api.graph.util.ImmutableStreamEdge;
+import org.apache.flink.streaming.api.graph.util.ImmutableStreamNode;
+import org.apache.flink.streaming.api.graph.util.StreamEdgeUpdateRequestInfo;
+import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoin;
+
+import org.apache.flink.shaded.guava32.com.google.common.base.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** The stream graph optimization strategy of adaptive broadcast join. */
+public class AdaptiveBroadcastJoinOptimizationStrategy
+ extends BaseAdaptiveJoinOperatorOptimizationStrategy {
+ private static final Logger LOG =
+
LoggerFactory.getLogger(AdaptiveBroadcastJoinOptimizationStrategy.class);
+
+ private Long broadcastThreshold;
+
+ private Map<Integer, Map<Integer, Long>>
aggregatedProducedBytesByTypeNumberAndNodeId;
+
+ @Override
+ public boolean onOperatorsFinished(
+ OperatorsFinished operatorsFinished, StreamGraphContext context) {
+ initialize(context.getStreamGraph().getConfiguration());
+ visitDownstreamAdaptiveJoinNode(operatorsFinished, context);
+
+ return true;
+ }
+
+ @Override
+ protected void tryOptimizeAdaptiveJoin(
+ OperatorsFinished operatorsFinished,
+ StreamGraphContext context,
+ ImmutableStreamNode adaptiveJoinNode,
+ List<ImmutableStreamEdge> upstreamStreamEdges,
+ AdaptiveJoin adaptiveJoin) {
+ for (ImmutableStreamEdge upstreamEdge : upstreamStreamEdges) {
+ IntermediateDataSetID relatedDataSetID =
+
context.getConsumedIntermediateDataSetId(upstreamEdge.getEdgeId());
+ long producedBytes =
+
operatorsFinished.getResultInfoMap().get(upstreamEdge.getSourceId()).stream()
+ .filter(
+ blockingResultInfo ->
+ blockingResultInfo.getResultId()
== relatedDataSetID)
+ .mapToLong(BlockingResultInfo::getNumBytesProduced)
+ .sum();
+ aggregatedProducedBytesByTypeNumber(
+ adaptiveJoinNode, upstreamEdge.getTypeNumber(),
producedBytes);
+ }
+
+ // If all upstream nodes have finished, we attempt to optimize the
AdaptiveJoin node.
+ if (context.areAllUpstreamNodesFinished(adaptiveJoinNode)) {
+ Long leftInputSize =
+ aggregatedProducedBytesByTypeNumberAndNodeId
+ .get(adaptiveJoinNode.getId())
+ .get(1);
+ Long rightInputSize =
+ aggregatedProducedBytesByTypeNumberAndNodeId
+ .get(adaptiveJoinNode.getId())
+ .get(2);
+ Preconditions.checkArgument(
+ leftInputSize != null && rightInputSize != null,
+ "Adaptive join node currently supports only two inputs, "
+ + "but received input bytes with left [%s] and
right [%s] for stream "
+ + "node id [%s].",
+ leftInputSize,
+ rightInputSize,
+ adaptiveJoinNode.getId());
+
+ boolean leftSizeSmallerThanThreshold = leftInputSize <=
broadcastThreshold;
+ boolean rightSizeSmallerThanThreshold = rightInputSize <=
broadcastThreshold;
+ boolean leftSmallerThanRight = leftInputSize < rightInputSize;
+ FlinkJoinType joinType = adaptiveJoin.getJoinType();
+ boolean canBeBroadcast;
+ boolean leftIsBuild;
+ switch (joinType) {
+ case RIGHT:
+ // For a right outer join, if the left side can be
broadcast, then the left side
+ // is
+ // always the build side; otherwise, the smaller side is
the build side.
+ canBeBroadcast = leftSizeSmallerThanThreshold;
+ leftIsBuild = true;
+ break;
+ case INNER:
+ canBeBroadcast = leftSizeSmallerThanThreshold ||
rightSizeSmallerThanThreshold;
+ leftIsBuild = leftSmallerThanRight;
+ break;
+ case LEFT:
+ case SEMI:
+ case ANTI:
+ // For left outer / semi / anti join, if the right side
can be broadcast, then
+ // the
+ // right side is always the build side; otherwise, the
smaller side is the build
+ // side.
+ canBeBroadcast = rightSizeSmallerThanThreshold;
+ leftIsBuild = false;
+ break;
+ case FULL:
+ default:
+ throw new RuntimeException(String.format("Unexpected join
type %s.", joinType));
+ }
+
+ boolean isBroadcast = false;
+ if (canBeBroadcast) {
+ isBroadcast =
+ tryModifyStreamEdgesForBroadcastJoin(
+ adaptiveJoinNode.getInEdges(), context,
leftIsBuild);
+
+ if (isBroadcast) {
+ LOG.info(
+ "The {} input data size of the join node [{}] is
small enough which is {} bytes, "
+ + "adaptively convert it to a broadcast
hash join.",
Review Comment:
Maybe 'The {} input data size of the join node [{}] is small enough,
adaptively convert it to a broadcast hash join. Broadcast threshold bytes: {},
left input bytes: {}, right input bytes: {}.'
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/strategy/PostProcessAdaptiveJoinStrategy.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.strategy;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.runtime.scheduler.adaptivebatch.OperatorsFinished;
+import org.apache.flink.streaming.api.graph.StreamGraphContext;
+import org.apache.flink.streaming.api.graph.util.ImmutableStreamEdge;
+import org.apache.flink.streaming.api.graph.util.ImmutableStreamNode;
+import org.apache.flink.streaming.api.graph.util.StreamEdgeUpdateRequestInfo;
+import org.apache.flink.streaming.api.graph.util.StreamNodeUpdateRequestInfo;
+import org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoin;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The post-processing phase of adaptive join optimization, which must be
placed at the end of all
+ * adaptive join optimization strategies.
+ */
+public class PostProcessAdaptiveJoinStrategy extends
BaseAdaptiveJoinOperatorOptimizationStrategy {
+
+ @Override
+ public boolean onOperatorsFinished(
+ OperatorsFinished operatorsFinished, StreamGraphContext context) {
+ visitDownstreamAdaptiveJoinNode(operatorsFinished, context);
+
+ return true;
+ }
+
+ @Override
+ protected void tryOptimizeAdaptiveJoin(
+ OperatorsFinished operatorsFinished,
+ StreamGraphContext context,
+ ImmutableStreamNode adaptiveJoinNode,
+ List<ImmutableStreamEdge> upstreamStreamEdges,
+ AdaptiveJoin adaptiveJoin) {
+ if (context.areAllUpstreamNodesFinished(adaptiveJoinNode)) {
+ // For hash join, reorder the join node inputs so the build side
is read first.
+ if (adaptiveJoin.shouldReorderInputs()) {
+ if (!context.modifyStreamEdge(
+
generateStreamEdgeUpdateRequestInfos(adaptiveJoinNode))
+ || !context.modifyStreamNode(
+
generateStreamNodeUpdateRequestInfos(adaptiveJoinNode))) {
+ throw new RuntimeException(
+ "Unexpected error occurs while reordering the
inputs "
+ + "of the adaptive join node, potentially
leading to data inaccuracies. "
+ + "Exceptions will be thrown.");
+ }
+ }
+
+ // Lazily generate OperatorFactory for adaptive join operator.
+ ReadableConfig config =
context.getStreamGraph().getConfiguration();
+ ClassLoader userClassLoader =
context.getStreamGraph().getUserClassLoader();
+ adaptiveJoin.genOperatorFactory(userClassLoader, config);
+ }
+ }
+
+ private static List<StreamEdgeUpdateRequestInfo>
generateStreamEdgeUpdateRequestInfos(
+ ImmutableStreamNode adaptiveJoinNode) {
+ List<StreamEdgeUpdateRequestInfo> streamEdgeUpdateRequestInfos = new
ArrayList<>();
+ for (ImmutableStreamEdge inEdge : adaptiveJoinNode.getInEdges()) {
+ StreamEdgeUpdateRequestInfo streamEdgeUpdateRequestInfo =
+ new StreamEdgeUpdateRequestInfo(
+ inEdge.getEdgeId(), inEdge.getSourceId(),
inEdge.getTargetId());
+ streamEdgeUpdateRequestInfo.typeNumber(inEdge.getTypeNumber() == 1
? 2 : 1);
+ streamEdgeUpdateRequestInfos.add(streamEdgeUpdateRequestInfo);
+ }
+ return streamEdgeUpdateRequestInfos;
+ }
+
+ private List<StreamNodeUpdateRequestInfo>
generateStreamNodeUpdateRequestInfos(
Review Comment:
-> generateStreamNodeUpdateRequestInfosForInputsReordered
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/strategy/BaseAdaptiveJoinOperatorOptimizationStrategy.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.strategy;
+
+import org.apache.flink.runtime.scheduler.adaptivebatch.OperatorsFinished;
+import
org.apache.flink.runtime.scheduler.adaptivebatch.StreamGraphOptimizationStrategy;
+import org.apache.flink.streaming.api.graph.StreamGraphContext;
+import org.apache.flink.streaming.api.graph.util.ImmutableStreamEdge;
+import org.apache.flink.streaming.api.graph.util.ImmutableStreamGraph;
+import org.apache.flink.streaming.api.graph.util.ImmutableStreamNode;
+import org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoin;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** The base stream graph optimization strategy class for adaptive join
operator. */
+public abstract class BaseAdaptiveJoinOperatorOptimizationStrategy
+ implements StreamGraphOptimizationStrategy {
+
+ protected void visitDownstreamAdaptiveJoinNode(
+ OperatorsFinished operatorsFinished, StreamGraphContext context) {
+ ImmutableStreamGraph streamGraph = context.getStreamGraph();
+ List<Integer> finishedStreamNodeIds =
operatorsFinished.getFinishedStreamNodeIds();
+ Map<ImmutableStreamNode, List<ImmutableStreamEdge>> nodesWithOutEdges
= new HashMap<>();
Review Comment:
The name is misleading because for the join node the edges are in-edges.
Maybe `joinNodesWithInEdges`?
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/strategy/AdaptiveBroadcastJoinOptimizationStrategy.java:
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.strategy;
+
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.scheduler.adaptivebatch.BlockingResultInfo;
+import org.apache.flink.runtime.scheduler.adaptivebatch.OperatorsFinished;
+import org.apache.flink.streaming.api.graph.StreamGraphContext;
+import org.apache.flink.streaming.api.graph.util.ImmutableStreamEdge;
+import org.apache.flink.streaming.api.graph.util.ImmutableStreamNode;
+import org.apache.flink.streaming.api.graph.util.StreamEdgeUpdateRequestInfo;
+import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoin;
+
+import org.apache.flink.shaded.guava32.com.google.common.base.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** The stream graph optimization strategy of adaptive broadcast join. */
+public class AdaptiveBroadcastJoinOptimizationStrategy
+ extends BaseAdaptiveJoinOperatorOptimizationStrategy {
+ private static final Logger LOG =
+
LoggerFactory.getLogger(AdaptiveBroadcastJoinOptimizationStrategy.class);
+
+ private Long broadcastThreshold;
+
+ private Map<Integer, Map<Integer, Long>>
aggregatedProducedBytesByTypeNumberAndNodeId;
+
+ @Override
+ public boolean onOperatorsFinished(
+ OperatorsFinished operatorsFinished, StreamGraphContext context) {
+ initialize(context.getStreamGraph().getConfiguration());
Review Comment:
Looks to me it's better to introduce a `initialize(StreamGraphContext
context)` method to `StreamGraphOptimizationStrategy`. WDYT?
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/strategy/AdaptiveBroadcastJoinOptimizationStrategy.java:
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.strategy;
+
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.scheduler.adaptivebatch.BlockingResultInfo;
+import org.apache.flink.runtime.scheduler.adaptivebatch.OperatorsFinished;
+import org.apache.flink.streaming.api.graph.StreamGraphContext;
+import org.apache.flink.streaming.api.graph.util.ImmutableStreamEdge;
+import org.apache.flink.streaming.api.graph.util.ImmutableStreamNode;
+import org.apache.flink.streaming.api.graph.util.StreamEdgeUpdateRequestInfo;
+import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoin;
+
+import org.apache.flink.shaded.guava32.com.google.common.base.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** The stream graph optimization strategy of adaptive broadcast join. */
+public class AdaptiveBroadcastJoinOptimizationStrategy
+ extends BaseAdaptiveJoinOperatorOptimizationStrategy {
+ private static final Logger LOG =
+
LoggerFactory.getLogger(AdaptiveBroadcastJoinOptimizationStrategy.class);
+
+ private Long broadcastThreshold;
+
+ private Map<Integer, Map<Integer, Long>>
aggregatedProducedBytesByTypeNumberAndNodeId;
+
+ @Override
+ public boolean onOperatorsFinished(
+ OperatorsFinished operatorsFinished, StreamGraphContext context) {
+ initialize(context.getStreamGraph().getConfiguration());
+ visitDownstreamAdaptiveJoinNode(operatorsFinished, context);
+
+ return true;
+ }
+
+ @Override
+ protected void tryOptimizeAdaptiveJoin(
+ OperatorsFinished operatorsFinished,
+ StreamGraphContext context,
+ ImmutableStreamNode adaptiveJoinNode,
+ List<ImmutableStreamEdge> upstreamStreamEdges,
+ AdaptiveJoin adaptiveJoin) {
+ for (ImmutableStreamEdge upstreamEdge : upstreamStreamEdges) {
+ IntermediateDataSetID relatedDataSetID =
+
context.getConsumedIntermediateDataSetId(upstreamEdge.getEdgeId());
+ long producedBytes =
+
operatorsFinished.getResultInfoMap().get(upstreamEdge.getSourceId()).stream()
+ .filter(
+ blockingResultInfo ->
+ blockingResultInfo.getResultId()
== relatedDataSetID)
+ .mapToLong(BlockingResultInfo::getNumBytesProduced)
+ .sum();
+ aggregatedProducedBytesByTypeNumber(
+ adaptiveJoinNode, upstreamEdge.getTypeNumber(),
producedBytes);
+ }
+
+ // If all upstream nodes have finished, we attempt to optimize the
AdaptiveJoin node.
+ if (context.areAllUpstreamNodesFinished(adaptiveJoinNode)) {
+ Long leftInputSize =
+ aggregatedProducedBytesByTypeNumberAndNodeId
+ .get(adaptiveJoinNode.getId())
+ .get(1);
+ Long rightInputSize =
+ aggregatedProducedBytesByTypeNumberAndNodeId
+ .get(adaptiveJoinNode.getId())
+ .get(2);
+ Preconditions.checkArgument(
Review Comment:
checkArgument -> checkState
And it's better to import `Preconditions.checkState` in ahead.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]