zhuzhurk commented on code in PR #25822:
URL: https://github.com/apache/flink/pull/25822#discussion_r1903910976


##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/StreamEdgeUpdateRequestInfo.java:
##########
@@ -29,6 +29,7 @@ public class StreamEdgeUpdateRequestInfo {
     private final Integer targetId;
 
     private StreamPartitioner<?> outputPartitioner;
+    private int typeNumber;

Review Comment:
   A comment is needed to explain what if it is 0.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java:
##########
@@ -166,7 +166,7 @@ public class OptimizerConfigOptions {
             TABLE_OPTIMIZER_ADAPTIVE_BROADCAST_JOIN_STRATEGY =
                     key("table.optimizer.adaptive-broadcast-join.strategy")
                             .enumType(AdaptiveBroadcastJoinStrategy.class)
-                            .defaultValue(AdaptiveBroadcastJoinStrategy.NONE)
+                            .defaultValue(AdaptiveBroadcastJoinStrategy.AUTO)

Review Comment:
   I prefer this commit to be part of the previous one, given that the change 
is not complex and there is no specific test added for it.



##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/StreamEdgeUpdateRequestInfo.java:
##########
@@ -41,6 +42,11 @@ public StreamEdgeUpdateRequestInfo 
outputPartitioner(StreamPartitioner<?> output
         return this;
     }
 
+    public StreamEdgeUpdateRequestInfo typeNumber(int typeNumber) {

Review Comment:
   -> withTypeNumber
   
   and for the above method -> withOutputPartitioner



##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/StreamNodeUpdateRequestInfo.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+/** Helper class carries the data required to updates a stream edge. */
+@Internal
+public class StreamNodeUpdateRequestInfo {
+    private final Integer nodeId;
+
+    private TypeSerializer<?>[] typeSerializersIn;
+
+    public StreamNodeUpdateRequestInfo(Integer nodeId) {
+        this.nodeId = nodeId;
+    }
+
+    public StreamNodeUpdateRequestInfo typeSerializersIn(TypeSerializer<?>[] 
typeSerializersIn) {

Review Comment:
   -> withTypeSerializersIn



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/adaptive/AdaptiveJoinOperatorGenerator.java:
##########
@@ -155,4 +155,15 @@ public void markAsBroadcastJoin(boolean canBroadcast, 
boolean leftIsBuild) {
         this.isBroadcastJoin = canBroadcast;
         this.leftIsBuild = leftIsBuild;
     }
+
+    @Override
+    public boolean shouldReorderInputs() {
+        // Sort merge join requires the left side to be read first if the 
broadcast threshold is not
+        // met.
+        if (!isBroadcastJoin && originalJoin == OperatorType.SortMergeJoin) {

Review Comment:
   This means it may re-order inputs even if a ShuffledHashJoin is not 
converted into a broadcast-join?



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/strategy/PostProcessAdaptiveJoinStrategy.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.strategy;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.runtime.scheduler.adaptivebatch.OperatorsFinished;
+import org.apache.flink.streaming.api.graph.StreamGraphContext;
+import org.apache.flink.streaming.api.graph.util.ImmutableStreamEdge;
+import org.apache.flink.streaming.api.graph.util.ImmutableStreamNode;
+import org.apache.flink.streaming.api.graph.util.StreamEdgeUpdateRequestInfo;
+import org.apache.flink.streaming.api.graph.util.StreamNodeUpdateRequestInfo;
+import org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoin;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The post-processing phase of adaptive join optimization, which must be 
placed at the end of all
+ * adaptive join optimization strategies.
+ */
+public class PostProcessAdaptiveJoinStrategy extends 
BaseAdaptiveJoinOperatorOptimizationStrategy {

Review Comment:
   Comments are needed to explain why it's needed.



##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/DefaultStreamGraphContext.java:
##########
@@ -161,21 +216,23 @@ private boolean 
validateStreamEdgeUpdateRequest(StreamEdgeUpdateRequestInfo requ
         }
 
         // Modification is not allowed when the subscribing output is reused.
-        Map<StreamEdge, NonChainedOutput> opIntermediateOutputs =
-                opIntermediateOutputsCaches.get(sourceNodeId);
-        NonChainedOutput output =
-                opIntermediateOutputs != null ? 
opIntermediateOutputs.get(targetEdge) : null;
-        if (output != null) {
-            Set<StreamEdge> consumerStreamEdges =
-                    opIntermediateOutputs.entrySet().stream()
-                            .filter(entry -> entry.getValue().equals(output))
-                            .map(Map.Entry::getKey)
-                            .collect(Collectors.toSet());
-            if (consumerStreamEdges.size() != 1) {
-                LOG.info(
-                        "Skip modifying edge {} because the subscribing output 
is reused.",
-                        targetEdge);
-                return false;
+        if (requestInfo.getOutputPartitioner() != null) {

Review Comment:
   Why is this change needed?



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/strategy/PostProcessAdaptiveJoinStrategy.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.strategy;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.runtime.scheduler.adaptivebatch.OperatorsFinished;
+import org.apache.flink.streaming.api.graph.StreamGraphContext;
+import org.apache.flink.streaming.api.graph.util.ImmutableStreamEdge;
+import org.apache.flink.streaming.api.graph.util.ImmutableStreamNode;
+import org.apache.flink.streaming.api.graph.util.StreamEdgeUpdateRequestInfo;
+import org.apache.flink.streaming.api.graph.util.StreamNodeUpdateRequestInfo;
+import org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoin;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The post-processing phase of adaptive join optimization, which must be 
placed at the end of all
+ * adaptive join optimization strategies.
+ */
+public class PostProcessAdaptiveJoinStrategy extends 
BaseAdaptiveJoinOperatorOptimizationStrategy {
+
+    @Override
+    public boolean onOperatorsFinished(
+            OperatorsFinished operatorsFinished, StreamGraphContext context) {
+        visitDownstreamAdaptiveJoinNode(operatorsFinished, context);
+
+        return true;
+    }
+
+    @Override
+    protected void tryOptimizeAdaptiveJoin(
+            OperatorsFinished operatorsFinished,
+            StreamGraphContext context,
+            ImmutableStreamNode adaptiveJoinNode,
+            List<ImmutableStreamEdge> upstreamStreamEdges,
+            AdaptiveJoin adaptiveJoin) {
+        if (context.areAllUpstreamNodesFinished(adaptiveJoinNode)) {
+            // For hash join, reorder the join node inputs so the build side 
is read first.
+            if (adaptiveJoin.shouldReorderInputs()) {
+                if (!context.modifyStreamEdge(
+                                
generateStreamEdgeUpdateRequestInfos(adaptiveJoinNode))
+                        || !context.modifyStreamNode(
+                                
generateStreamNodeUpdateRequestInfos(adaptiveJoinNode))) {
+                    throw new RuntimeException(
+                            "Unexpected error occurs while reordering the 
inputs "
+                                    + "of the adaptive join node, potentially 
leading to data inaccuracies. "
+                                    + "Exceptions will be thrown.");
+                }
+            }
+
+            // Lazily generate OperatorFactory for adaptive join operator.
+            ReadableConfig config = 
context.getStreamGraph().getConfiguration();
+            ClassLoader userClassLoader = 
context.getStreamGraph().getUserClassLoader();
+            adaptiveJoin.genOperatorFactory(userClassLoader, config);
+        }
+    }
+
+    private static List<StreamEdgeUpdateRequestInfo> 
generateStreamEdgeUpdateRequestInfos(

Review Comment:
   -> generateStreamEdgeUpdateRequestInfosForInputsReordered



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/strategy/PostProcessAdaptiveJoinStrategy.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.strategy;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.runtime.scheduler.adaptivebatch.OperatorsFinished;
+import org.apache.flink.streaming.api.graph.StreamGraphContext;
+import org.apache.flink.streaming.api.graph.util.ImmutableStreamEdge;
+import org.apache.flink.streaming.api.graph.util.ImmutableStreamNode;
+import org.apache.flink.streaming.api.graph.util.StreamEdgeUpdateRequestInfo;
+import org.apache.flink.streaming.api.graph.util.StreamNodeUpdateRequestInfo;
+import org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoin;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The post-processing phase of adaptive join optimization, which must be 
placed at the end of all
+ * adaptive join optimization strategies.
+ */
+public class PostProcessAdaptiveJoinStrategy extends 
BaseAdaptiveJoinOperatorOptimizationStrategy {
+
+    @Override
+    public boolean onOperatorsFinished(
+            OperatorsFinished operatorsFinished, StreamGraphContext context) {
+        visitDownstreamAdaptiveJoinNode(operatorsFinished, context);
+
+        return true;
+    }
+
+    @Override
+    protected void tryOptimizeAdaptiveJoin(
+            OperatorsFinished operatorsFinished,
+            StreamGraphContext context,
+            ImmutableStreamNode adaptiveJoinNode,
+            List<ImmutableStreamEdge> upstreamStreamEdges,
+            AdaptiveJoin adaptiveJoin) {
+        if (context.areAllUpstreamNodesFinished(adaptiveJoinNode)) {
+            // For hash join, reorder the join node inputs so the build side 
is read first.
+            if (adaptiveJoin.shouldReorderInputs()) {
+                if (!context.modifyStreamEdge(
+                                
generateStreamEdgeUpdateRequestInfos(adaptiveJoinNode))
+                        || !context.modifyStreamNode(
+                                
generateStreamNodeUpdateRequestInfos(adaptiveJoinNode))) {
+                    throw new RuntimeException(
+                            "Unexpected error occurs while reordering the 
inputs "
+                                    + "of the adaptive join node, potentially 
leading to data inaccuracies. "
+                                    + "Exceptions will be thrown.");
+                }
+            }
+
+            // Lazily generate OperatorFactory for adaptive join operator.

Review Comment:
   -> generate OperatorFactory for adaptive join operator after inputs are 
reordered.



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/strategy/PostProcessAdaptiveJoinStrategy.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.strategy;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.runtime.scheduler.adaptivebatch.OperatorsFinished;
+import org.apache.flink.streaming.api.graph.StreamGraphContext;
+import org.apache.flink.streaming.api.graph.util.ImmutableStreamEdge;
+import org.apache.flink.streaming.api.graph.util.ImmutableStreamNode;
+import org.apache.flink.streaming.api.graph.util.StreamEdgeUpdateRequestInfo;
+import org.apache.flink.streaming.api.graph.util.StreamNodeUpdateRequestInfo;
+import org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoin;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The post-processing phase of adaptive join optimization, which must be 
placed at the end of all
+ * adaptive join optimization strategies.
+ */
+public class PostProcessAdaptiveJoinStrategy extends 
BaseAdaptiveJoinOperatorOptimizationStrategy {
+
+    @Override
+    public boolean onOperatorsFinished(
+            OperatorsFinished operatorsFinished, StreamGraphContext context) {
+        visitDownstreamAdaptiveJoinNode(operatorsFinished, context);
+
+        return true;
+    }
+
+    @Override
+    protected void tryOptimizeAdaptiveJoin(
+            OperatorsFinished operatorsFinished,
+            StreamGraphContext context,
+            ImmutableStreamNode adaptiveJoinNode,
+            List<ImmutableStreamEdge> upstreamStreamEdges,
+            AdaptiveJoin adaptiveJoin) {
+        if (context.areAllUpstreamNodesFinished(adaptiveJoinNode)) {
+            // For hash join, reorder the join node inputs so the build side 
is read first.
+            if (adaptiveJoin.shouldReorderInputs()) {
+                if (!context.modifyStreamEdge(
+                                
generateStreamEdgeUpdateRequestInfos(adaptiveJoinNode))
+                        || !context.modifyStreamNode(
+                                
generateStreamNodeUpdateRequestInfos(adaptiveJoinNode))) {
+                    throw new RuntimeException(
+                            "Unexpected error occurs while reordering the 
inputs "
+                                    + "of the adaptive join node, potentially 
leading to data inaccuracies. "
+                                    + "Exceptions will be thrown.");
+                }
+            }
+
+            // Lazily generate OperatorFactory for adaptive join operator.
+            ReadableConfig config = 
context.getStreamGraph().getConfiguration();
+            ClassLoader userClassLoader = 
context.getStreamGraph().getUserClassLoader();
+            adaptiveJoin.genOperatorFactory(userClassLoader, config);
+        }
+    }
+
+    private static List<StreamEdgeUpdateRequestInfo> 
generateStreamEdgeUpdateRequestInfos(
+            ImmutableStreamNode adaptiveJoinNode) {
+        List<StreamEdgeUpdateRequestInfo> streamEdgeUpdateRequestInfos = new 
ArrayList<>();
+        for (ImmutableStreamEdge inEdge : adaptiveJoinNode.getInEdges()) {
+            StreamEdgeUpdateRequestInfo streamEdgeUpdateRequestInfo =
+                    new StreamEdgeUpdateRequestInfo(
+                            inEdge.getEdgeId(), inEdge.getSourceId(), 
inEdge.getTargetId());
+            streamEdgeUpdateRequestInfo.typeNumber(inEdge.getTypeNumber() == 1 
? 2 : 1);

Review Comment:
   Why not make typeNumber part of the constructor of 
`StreamEdgeUpdateRequestInfo`?



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/strategy/AdaptiveBroadcastJoinOptimizationStrategy.java:
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.strategy;
+
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.scheduler.adaptivebatch.BlockingResultInfo;
+import org.apache.flink.runtime.scheduler.adaptivebatch.OperatorsFinished;
+import org.apache.flink.streaming.api.graph.StreamGraphContext;
+import org.apache.flink.streaming.api.graph.util.ImmutableStreamEdge;
+import org.apache.flink.streaming.api.graph.util.ImmutableStreamNode;
+import org.apache.flink.streaming.api.graph.util.StreamEdgeUpdateRequestInfo;
+import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoin;
+
+import org.apache.flink.shaded.guava32.com.google.common.base.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** The stream graph optimization strategy of adaptive broadcast join. */
+public class AdaptiveBroadcastJoinOptimizationStrategy
+        extends BaseAdaptiveJoinOperatorOptimizationStrategy {
+    private static final Logger LOG =
+            
LoggerFactory.getLogger(AdaptiveBroadcastJoinOptimizationStrategy.class);
+
+    private Long broadcastThreshold;
+
+    private Map<Integer, Map<Integer, Long>> 
aggregatedProducedBytesByTypeNumberAndNodeId;
+
+    @Override
+    public boolean onOperatorsFinished(
+            OperatorsFinished operatorsFinished, StreamGraphContext context) {
+        initialize(context.getStreamGraph().getConfiguration());
+        visitDownstreamAdaptiveJoinNode(operatorsFinished, context);
+
+        return true;
+    }
+
+    @Override
+    protected void tryOptimizeAdaptiveJoin(
+            OperatorsFinished operatorsFinished,
+            StreamGraphContext context,
+            ImmutableStreamNode adaptiveJoinNode,
+            List<ImmutableStreamEdge> upstreamStreamEdges,
+            AdaptiveJoin adaptiveJoin) {
+        for (ImmutableStreamEdge upstreamEdge : upstreamStreamEdges) {
+            IntermediateDataSetID relatedDataSetID =
+                    
context.getConsumedIntermediateDataSetId(upstreamEdge.getEdgeId());
+            long producedBytes =
+                    
operatorsFinished.getResultInfoMap().get(upstreamEdge.getSourceId()).stream()
+                            .filter(
+                                    blockingResultInfo ->
+                                            blockingResultInfo.getResultId() 
== relatedDataSetID)
+                            .mapToLong(BlockingResultInfo::getNumBytesProduced)
+                            .sum();
+            aggregatedProducedBytesByTypeNumber(

Review Comment:
   aggregatedProducedBytesByTypeNumber -> aggregatedInputBytesByTypeNumber
   
   For the joinNode, it is input bytes instead of produced bytes.
   
   This also applies to `aggregatedProducedBytesByTypeNumberAndNodeId`.



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/strategy/AdaptiveBroadcastJoinOptimizationStrategy.java:
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.strategy;
+
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.scheduler.adaptivebatch.BlockingResultInfo;
+import org.apache.flink.runtime.scheduler.adaptivebatch.OperatorsFinished;
+import org.apache.flink.streaming.api.graph.StreamGraphContext;
+import org.apache.flink.streaming.api.graph.util.ImmutableStreamEdge;
+import org.apache.flink.streaming.api.graph.util.ImmutableStreamNode;
+import org.apache.flink.streaming.api.graph.util.StreamEdgeUpdateRequestInfo;
+import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoin;
+
+import org.apache.flink.shaded.guava32.com.google.common.base.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** The stream graph optimization strategy of adaptive broadcast join. */
+public class AdaptiveBroadcastJoinOptimizationStrategy
+        extends BaseAdaptiveJoinOperatorOptimizationStrategy {
+    private static final Logger LOG =
+            
LoggerFactory.getLogger(AdaptiveBroadcastJoinOptimizationStrategy.class);
+
+    private Long broadcastThreshold;
+
+    private Map<Integer, Map<Integer, Long>> 
aggregatedProducedBytesByTypeNumberAndNodeId;
+
+    @Override
+    public boolean onOperatorsFinished(
+            OperatorsFinished operatorsFinished, StreamGraphContext context) {
+        initialize(context.getStreamGraph().getConfiguration());
+        visitDownstreamAdaptiveJoinNode(operatorsFinished, context);
+
+        return true;
+    }
+
+    @Override
+    protected void tryOptimizeAdaptiveJoin(
+            OperatorsFinished operatorsFinished,
+            StreamGraphContext context,
+            ImmutableStreamNode adaptiveJoinNode,
+            List<ImmutableStreamEdge> upstreamStreamEdges,
+            AdaptiveJoin adaptiveJoin) {
+        for (ImmutableStreamEdge upstreamEdge : upstreamStreamEdges) {
+            IntermediateDataSetID relatedDataSetID =
+                    
context.getConsumedIntermediateDataSetId(upstreamEdge.getEdgeId());
+            long producedBytes =
+                    
operatorsFinished.getResultInfoMap().get(upstreamEdge.getSourceId()).stream()
+                            .filter(
+                                    blockingResultInfo ->
+                                            blockingResultInfo.getResultId() 
== relatedDataSetID)
+                            .mapToLong(BlockingResultInfo::getNumBytesProduced)
+                            .sum();
+            aggregatedProducedBytesByTypeNumber(
+                    adaptiveJoinNode, upstreamEdge.getTypeNumber(), 
producedBytes);
+        }
+
+        // If all upstream nodes have finished, we attempt to optimize the 
AdaptiveJoin node.
+        if (context.areAllUpstreamNodesFinished(adaptiveJoinNode)) {
+            Long leftInputSize =
+                    aggregatedProducedBytesByTypeNumberAndNodeId
+                            .get(adaptiveJoinNode.getId())
+                            .get(1);
+            Long rightInputSize =
+                    aggregatedProducedBytesByTypeNumberAndNodeId
+                            .get(adaptiveJoinNode.getId())
+                            .get(2);
+            Preconditions.checkArgument(
+                    leftInputSize != null && rightInputSize != null,

Review Comment:
   It's better to check `leftInputSize` and `rightInputSize` separately so that 
the exception can show which side is null.
   
   Maybe change the error message to 'Left input bytes of adaptive join {} is 
unknown, which is unexpected.'



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/strategy/AdaptiveBroadcastJoinOptimizationStrategy.java:
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.strategy;
+
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.scheduler.adaptivebatch.BlockingResultInfo;
+import org.apache.flink.runtime.scheduler.adaptivebatch.OperatorsFinished;
+import org.apache.flink.streaming.api.graph.StreamGraphContext;
+import org.apache.flink.streaming.api.graph.util.ImmutableStreamEdge;
+import org.apache.flink.streaming.api.graph.util.ImmutableStreamNode;
+import org.apache.flink.streaming.api.graph.util.StreamEdgeUpdateRequestInfo;
+import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoin;
+
+import org.apache.flink.shaded.guava32.com.google.common.base.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** The stream graph optimization strategy of adaptive broadcast join. */
+public class AdaptiveBroadcastJoinOptimizationStrategy
+        extends BaseAdaptiveJoinOperatorOptimizationStrategy {
+    private static final Logger LOG =
+            
LoggerFactory.getLogger(AdaptiveBroadcastJoinOptimizationStrategy.class);
+
+    private Long broadcastThreshold;
+
+    private Map<Integer, Map<Integer, Long>> 
aggregatedProducedBytesByTypeNumberAndNodeId;
+
+    @Override
+    public boolean onOperatorsFinished(
+            OperatorsFinished operatorsFinished, StreamGraphContext context) {
+        initialize(context.getStreamGraph().getConfiguration());
+        visitDownstreamAdaptiveJoinNode(operatorsFinished, context);
+
+        return true;
+    }
+
+    @Override
+    protected void tryOptimizeAdaptiveJoin(
+            OperatorsFinished operatorsFinished,
+            StreamGraphContext context,
+            ImmutableStreamNode adaptiveJoinNode,
+            List<ImmutableStreamEdge> upstreamStreamEdges,
+            AdaptiveJoin adaptiveJoin) {
+        for (ImmutableStreamEdge upstreamEdge : upstreamStreamEdges) {
+            IntermediateDataSetID relatedDataSetID =
+                    
context.getConsumedIntermediateDataSetId(upstreamEdge.getEdgeId());
+            long producedBytes =
+                    
operatorsFinished.getResultInfoMap().get(upstreamEdge.getSourceId()).stream()
+                            .filter(
+                                    blockingResultInfo ->
+                                            blockingResultInfo.getResultId() 
== relatedDataSetID)

Review Comment:
   relatedDataSetId.equals(blockingResultInfo.getResultId())
   
   relatedDataSetID - > relatedDataSetId



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/strategy/PostProcessAdaptiveJoinStrategy.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.strategy;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.runtime.scheduler.adaptivebatch.OperatorsFinished;
+import org.apache.flink.streaming.api.graph.StreamGraphContext;
+import org.apache.flink.streaming.api.graph.util.ImmutableStreamEdge;
+import org.apache.flink.streaming.api.graph.util.ImmutableStreamNode;
+import org.apache.flink.streaming.api.graph.util.StreamEdgeUpdateRequestInfo;
+import org.apache.flink.streaming.api.graph.util.StreamNodeUpdateRequestInfo;
+import org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoin;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The post-processing phase of adaptive join optimization, which must be 
placed at the end of all
+ * adaptive join optimization strategies.
+ */
+public class PostProcessAdaptiveJoinStrategy extends 
BaseAdaptiveJoinOperatorOptimizationStrategy {
+
+    @Override
+    public boolean onOperatorsFinished(
+            OperatorsFinished operatorsFinished, StreamGraphContext context) {
+        visitDownstreamAdaptiveJoinNode(operatorsFinished, context);
+
+        return true;
+    }
+
+    @Override
+    protected void tryOptimizeAdaptiveJoin(
+            OperatorsFinished operatorsFinished,
+            StreamGraphContext context,
+            ImmutableStreamNode adaptiveJoinNode,
+            List<ImmutableStreamEdge> upstreamStreamEdges,
+            AdaptiveJoin adaptiveJoin) {
+        if (context.areAllUpstreamNodesFinished(adaptiveJoinNode)) {
+            // For hash join, reorder the join node inputs so the build side 
is read first.
+            if (adaptiveJoin.shouldReorderInputs()) {
+                if (!context.modifyStreamEdge(
+                                
generateStreamEdgeUpdateRequestInfos(adaptiveJoinNode))
+                        || !context.modifyStreamNode(
+                                
generateStreamNodeUpdateRequestInfos(adaptiveJoinNode))) {
+                    throw new RuntimeException(
+                            "Unexpected error occurs while reordering the 
inputs "
+                                    + "of the adaptive join node, potentially 
leading to data inaccuracies. "
+                                    + "Exceptions will be thrown.");
+                }
+            }
+
+            // Lazily generate OperatorFactory for adaptive join operator.
+            ReadableConfig config = 
context.getStreamGraph().getConfiguration();
+            ClassLoader userClassLoader = 
context.getStreamGraph().getUserClassLoader();
+            adaptiveJoin.genOperatorFactory(userClassLoader, config);
+        }
+    }
+
+    private static List<StreamEdgeUpdateRequestInfo> 
generateStreamEdgeUpdateRequestInfos(
+            ImmutableStreamNode adaptiveJoinNode) {
+        List<StreamEdgeUpdateRequestInfo> streamEdgeUpdateRequestInfos = new 
ArrayList<>();
+        for (ImmutableStreamEdge inEdge : adaptiveJoinNode.getInEdges()) {
+            StreamEdgeUpdateRequestInfo streamEdgeUpdateRequestInfo =
+                    new StreamEdgeUpdateRequestInfo(
+                            inEdge.getEdgeId(), inEdge.getSourceId(), 
inEdge.getTargetId());
+            streamEdgeUpdateRequestInfo.typeNumber(inEdge.getTypeNumber() == 1 
? 2 : 1);
+            streamEdgeUpdateRequestInfos.add(streamEdgeUpdateRequestInfo);
+        }
+        return streamEdgeUpdateRequestInfos;
+    }
+
+    private List<StreamNodeUpdateRequestInfo> 
generateStreamNodeUpdateRequestInfos(
+            ImmutableStreamNode modifiedNode) {
+        List<StreamNodeUpdateRequestInfo> streamEdgeUpdateRequestInfos = new 
ArrayList<>();
+
+        TypeSerializer<?>[] typeSerializers = 
modifiedNode.getTypeSerializersIn();
+        Preconditions.checkState(
+                typeSerializers.length == 2,
+                String.format(
+                        "Adaptive join currently only supports two "
+                                + "inputs, but the join node [%s] has received 
%s inputs.",
+                        modifiedNode.getId(), typeSerializers.length));
+        TypeSerializer<?>[] swappedTypeSerializers = new TypeSerializer<?>[2];
+        swappedTypeSerializers[0] = typeSerializers[1];
+        swappedTypeSerializers[1] = typeSerializers[0];
+        StreamNodeUpdateRequestInfo requestInfo =
+                new StreamNodeUpdateRequestInfo(modifiedNode.getId())
+                        .typeSerializersIn(swappedTypeSerializers);

Review Comment:
   If it always sets this field immediately after the requestInfo is created, 
it's better to add it to the constructor so that the field can be final.



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/strategy/AdaptiveBroadcastJoinOptimizationStrategy.java:
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.strategy;
+
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.scheduler.adaptivebatch.BlockingResultInfo;
+import org.apache.flink.runtime.scheduler.adaptivebatch.OperatorsFinished;
+import org.apache.flink.streaming.api.graph.StreamGraphContext;
+import org.apache.flink.streaming.api.graph.util.ImmutableStreamEdge;
+import org.apache.flink.streaming.api.graph.util.ImmutableStreamNode;
+import org.apache.flink.streaming.api.graph.util.StreamEdgeUpdateRequestInfo;
+import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoin;
+
+import org.apache.flink.shaded.guava32.com.google.common.base.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** The stream graph optimization strategy of adaptive broadcast join. */
+public class AdaptiveBroadcastJoinOptimizationStrategy
+        extends BaseAdaptiveJoinOperatorOptimizationStrategy {
+    private static final Logger LOG =
+            
LoggerFactory.getLogger(AdaptiveBroadcastJoinOptimizationStrategy.class);
+
+    private Long broadcastThreshold;
+
+    private Map<Integer, Map<Integer, Long>> 
aggregatedProducedBytesByTypeNumberAndNodeId;
+
+    @Override
+    public boolean onOperatorsFinished(
+            OperatorsFinished operatorsFinished, StreamGraphContext context) {
+        initialize(context.getStreamGraph().getConfiguration());
+        visitDownstreamAdaptiveJoinNode(operatorsFinished, context);
+
+        return true;
+    }
+
+    @Override
+    protected void tryOptimizeAdaptiveJoin(
+            OperatorsFinished operatorsFinished,
+            StreamGraphContext context,
+            ImmutableStreamNode adaptiveJoinNode,
+            List<ImmutableStreamEdge> upstreamStreamEdges,
+            AdaptiveJoin adaptiveJoin) {
+        for (ImmutableStreamEdge upstreamEdge : upstreamStreamEdges) {
+            IntermediateDataSetID relatedDataSetID =
+                    
context.getConsumedIntermediateDataSetId(upstreamEdge.getEdgeId());
+            long producedBytes =
+                    
operatorsFinished.getResultInfoMap().get(upstreamEdge.getSourceId()).stream()
+                            .filter(
+                                    blockingResultInfo ->
+                                            blockingResultInfo.getResultId() 
== relatedDataSetID)
+                            .mapToLong(BlockingResultInfo::getNumBytesProduced)
+                            .sum();
+            aggregatedProducedBytesByTypeNumber(
+                    adaptiveJoinNode, upstreamEdge.getTypeNumber(), 
producedBytes);
+        }
+
+        // If all upstream nodes have finished, we attempt to optimize the 
AdaptiveJoin node.
+        if (context.areAllUpstreamNodesFinished(adaptiveJoinNode)) {
+            Long leftInputSize =
+                    aggregatedProducedBytesByTypeNumberAndNodeId
+                            .get(adaptiveJoinNode.getId())
+                            .get(1);
+            Long rightInputSize =
+                    aggregatedProducedBytesByTypeNumberAndNodeId
+                            .get(adaptiveJoinNode.getId())
+                            .get(2);
+            Preconditions.checkArgument(
+                    leftInputSize != null && rightInputSize != null,
+                    "Adaptive join node currently supports only two inputs, "
+                            + "but received input bytes with left [%s] and 
right [%s] for stream "
+                            + "node id [%s].",
+                    leftInputSize,
+                    rightInputSize,
+                    adaptiveJoinNode.getId());
+
+            boolean leftSizeSmallerThanThreshold = leftInputSize <= 
broadcastThreshold;
+            boolean rightSizeSmallerThanThreshold = rightInputSize <= 
broadcastThreshold;
+            boolean leftSmallerThanRight = leftInputSize < rightInputSize;
+            FlinkJoinType joinType = adaptiveJoin.getJoinType();
+            boolean canBeBroadcast;
+            boolean leftIsBuild;
+            switch (joinType) {
+                case RIGHT:
+                    // For a right outer join, if the left side can be 
broadcast, then the left side
+                    // is
+                    // always the build side; otherwise, the smaller side is 
the build side.
+                    canBeBroadcast = leftSizeSmallerThanThreshold;
+                    leftIsBuild = true;
+                    break;
+                case INNER:
+                    canBeBroadcast = leftSizeSmallerThanThreshold || 
rightSizeSmallerThanThreshold;
+                    leftIsBuild = leftSmallerThanRight;
+                    break;
+                case LEFT:
+                case SEMI:
+                case ANTI:
+                    // For left outer / semi / anti join, if the right side 
can be broadcast, then
+                    // the
+                    // right side is always the build side; otherwise, the 
smaller side is the build
+                    // side.
+                    canBeBroadcast = rightSizeSmallerThanThreshold;
+                    leftIsBuild = false;
+                    break;
+                case FULL:
+                default:
+                    throw new RuntimeException(String.format("Unexpected join 
type %s.", joinType));
+            }
+
+            boolean isBroadcast = false;
+            if (canBeBroadcast) {
+                isBroadcast =
+                        tryModifyStreamEdgesForBroadcastJoin(
+                                adaptiveJoinNode.getInEdges(), context, 
leftIsBuild);
+
+                if (isBroadcast) {
+                    LOG.info(
+                            "The {} input data size of the join node [{}] is 
small enough which is {} bytes, "
+                                    + "adaptively convert it to a broadcast 
hash join.",

Review Comment:
   Maybe 'The {} input data size of the join node [{}] is small enough, 
adaptively convert it to a broadcast hash join. Broadcast threshold bytes: {}, 
left input bytes: {}, right input bytes: {}.'



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/strategy/PostProcessAdaptiveJoinStrategy.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.strategy;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.runtime.scheduler.adaptivebatch.OperatorsFinished;
+import org.apache.flink.streaming.api.graph.StreamGraphContext;
+import org.apache.flink.streaming.api.graph.util.ImmutableStreamEdge;
+import org.apache.flink.streaming.api.graph.util.ImmutableStreamNode;
+import org.apache.flink.streaming.api.graph.util.StreamEdgeUpdateRequestInfo;
+import org.apache.flink.streaming.api.graph.util.StreamNodeUpdateRequestInfo;
+import org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoin;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The post-processing phase of adaptive join optimization, which must be 
placed at the end of all
+ * adaptive join optimization strategies.
+ */
+public class PostProcessAdaptiveJoinStrategy extends 
BaseAdaptiveJoinOperatorOptimizationStrategy {
+
+    @Override
+    public boolean onOperatorsFinished(
+            OperatorsFinished operatorsFinished, StreamGraphContext context) {
+        visitDownstreamAdaptiveJoinNode(operatorsFinished, context);
+
+        return true;
+    }
+
+    @Override
+    protected void tryOptimizeAdaptiveJoin(
+            OperatorsFinished operatorsFinished,
+            StreamGraphContext context,
+            ImmutableStreamNode adaptiveJoinNode,
+            List<ImmutableStreamEdge> upstreamStreamEdges,
+            AdaptiveJoin adaptiveJoin) {
+        if (context.areAllUpstreamNodesFinished(adaptiveJoinNode)) {
+            // For hash join, reorder the join node inputs so the build side 
is read first.
+            if (adaptiveJoin.shouldReorderInputs()) {
+                if (!context.modifyStreamEdge(
+                                
generateStreamEdgeUpdateRequestInfos(adaptiveJoinNode))
+                        || !context.modifyStreamNode(
+                                
generateStreamNodeUpdateRequestInfos(adaptiveJoinNode))) {
+                    throw new RuntimeException(
+                            "Unexpected error occurs while reordering the 
inputs "
+                                    + "of the adaptive join node, potentially 
leading to data inaccuracies. "
+                                    + "Exceptions will be thrown.");
+                }
+            }
+
+            // Lazily generate OperatorFactory for adaptive join operator.
+            ReadableConfig config = 
context.getStreamGraph().getConfiguration();
+            ClassLoader userClassLoader = 
context.getStreamGraph().getUserClassLoader();
+            adaptiveJoin.genOperatorFactory(userClassLoader, config);
+        }
+    }
+
+    private static List<StreamEdgeUpdateRequestInfo> 
generateStreamEdgeUpdateRequestInfos(
+            ImmutableStreamNode adaptiveJoinNode) {
+        List<StreamEdgeUpdateRequestInfo> streamEdgeUpdateRequestInfos = new 
ArrayList<>();
+        for (ImmutableStreamEdge inEdge : adaptiveJoinNode.getInEdges()) {
+            StreamEdgeUpdateRequestInfo streamEdgeUpdateRequestInfo =
+                    new StreamEdgeUpdateRequestInfo(
+                            inEdge.getEdgeId(), inEdge.getSourceId(), 
inEdge.getTargetId());
+            streamEdgeUpdateRequestInfo.typeNumber(inEdge.getTypeNumber() == 1 
? 2 : 1);
+            streamEdgeUpdateRequestInfos.add(streamEdgeUpdateRequestInfo);
+        }
+        return streamEdgeUpdateRequestInfos;
+    }
+
+    private List<StreamNodeUpdateRequestInfo> 
generateStreamNodeUpdateRequestInfos(

Review Comment:
   -> generateStreamNodeUpdateRequestInfosForInputsReordered



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/strategy/BaseAdaptiveJoinOperatorOptimizationStrategy.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.strategy;
+
+import org.apache.flink.runtime.scheduler.adaptivebatch.OperatorsFinished;
+import 
org.apache.flink.runtime.scheduler.adaptivebatch.StreamGraphOptimizationStrategy;
+import org.apache.flink.streaming.api.graph.StreamGraphContext;
+import org.apache.flink.streaming.api.graph.util.ImmutableStreamEdge;
+import org.apache.flink.streaming.api.graph.util.ImmutableStreamGraph;
+import org.apache.flink.streaming.api.graph.util.ImmutableStreamNode;
+import org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoin;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** The base stream graph optimization strategy class for adaptive join 
operator. */
+public abstract class BaseAdaptiveJoinOperatorOptimizationStrategy
+        implements StreamGraphOptimizationStrategy {
+
+    protected void visitDownstreamAdaptiveJoinNode(
+            OperatorsFinished operatorsFinished, StreamGraphContext context) {
+        ImmutableStreamGraph streamGraph = context.getStreamGraph();
+        List<Integer> finishedStreamNodeIds = 
operatorsFinished.getFinishedStreamNodeIds();
+        Map<ImmutableStreamNode, List<ImmutableStreamEdge>> nodesWithOutEdges 
= new HashMap<>();

Review Comment:
   The name is misleading because for the join node the edges are in-edges.
   Maybe `joinNodesWithInEdges`?



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/strategy/AdaptiveBroadcastJoinOptimizationStrategy.java:
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.strategy;
+
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.scheduler.adaptivebatch.BlockingResultInfo;
+import org.apache.flink.runtime.scheduler.adaptivebatch.OperatorsFinished;
+import org.apache.flink.streaming.api.graph.StreamGraphContext;
+import org.apache.flink.streaming.api.graph.util.ImmutableStreamEdge;
+import org.apache.flink.streaming.api.graph.util.ImmutableStreamNode;
+import org.apache.flink.streaming.api.graph.util.StreamEdgeUpdateRequestInfo;
+import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoin;
+
+import org.apache.flink.shaded.guava32.com.google.common.base.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** The stream graph optimization strategy of adaptive broadcast join. */
+public class AdaptiveBroadcastJoinOptimizationStrategy
+        extends BaseAdaptiveJoinOperatorOptimizationStrategy {
+    private static final Logger LOG =
+            
LoggerFactory.getLogger(AdaptiveBroadcastJoinOptimizationStrategy.class);
+
+    private Long broadcastThreshold;
+
+    private Map<Integer, Map<Integer, Long>> 
aggregatedProducedBytesByTypeNumberAndNodeId;
+
+    @Override
+    public boolean onOperatorsFinished(
+            OperatorsFinished operatorsFinished, StreamGraphContext context) {
+        initialize(context.getStreamGraph().getConfiguration());

Review Comment:
   Looks to me it's better to introduce a `initialize(StreamGraphContext 
context)` method to `StreamGraphOptimizationStrategy`. WDYT?



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/strategy/AdaptiveBroadcastJoinOptimizationStrategy.java:
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.strategy;
+
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.scheduler.adaptivebatch.BlockingResultInfo;
+import org.apache.flink.runtime.scheduler.adaptivebatch.OperatorsFinished;
+import org.apache.flink.streaming.api.graph.StreamGraphContext;
+import org.apache.flink.streaming.api.graph.util.ImmutableStreamEdge;
+import org.apache.flink.streaming.api.graph.util.ImmutableStreamNode;
+import org.apache.flink.streaming.api.graph.util.StreamEdgeUpdateRequestInfo;
+import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoin;
+
+import org.apache.flink.shaded.guava32.com.google.common.base.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** The stream graph optimization strategy of adaptive broadcast join. */
+public class AdaptiveBroadcastJoinOptimizationStrategy
+        extends BaseAdaptiveJoinOperatorOptimizationStrategy {
+    private static final Logger LOG =
+            
LoggerFactory.getLogger(AdaptiveBroadcastJoinOptimizationStrategy.class);
+
+    private Long broadcastThreshold;
+
+    private Map<Integer, Map<Integer, Long>> 
aggregatedProducedBytesByTypeNumberAndNodeId;
+
+    @Override
+    public boolean onOperatorsFinished(
+            OperatorsFinished operatorsFinished, StreamGraphContext context) {
+        initialize(context.getStreamGraph().getConfiguration());
+        visitDownstreamAdaptiveJoinNode(operatorsFinished, context);
+
+        return true;
+    }
+
+    @Override
+    protected void tryOptimizeAdaptiveJoin(
+            OperatorsFinished operatorsFinished,
+            StreamGraphContext context,
+            ImmutableStreamNode adaptiveJoinNode,
+            List<ImmutableStreamEdge> upstreamStreamEdges,
+            AdaptiveJoin adaptiveJoin) {
+        for (ImmutableStreamEdge upstreamEdge : upstreamStreamEdges) {
+            IntermediateDataSetID relatedDataSetID =
+                    
context.getConsumedIntermediateDataSetId(upstreamEdge.getEdgeId());
+            long producedBytes =
+                    
operatorsFinished.getResultInfoMap().get(upstreamEdge.getSourceId()).stream()
+                            .filter(
+                                    blockingResultInfo ->
+                                            blockingResultInfo.getResultId() 
== relatedDataSetID)
+                            .mapToLong(BlockingResultInfo::getNumBytesProduced)
+                            .sum();
+            aggregatedProducedBytesByTypeNumber(
+                    adaptiveJoinNode, upstreamEdge.getTypeNumber(), 
producedBytes);
+        }
+
+        // If all upstream nodes have finished, we attempt to optimize the 
AdaptiveJoin node.
+        if (context.areAllUpstreamNodesFinished(adaptiveJoinNode)) {
+            Long leftInputSize =
+                    aggregatedProducedBytesByTypeNumberAndNodeId
+                            .get(adaptiveJoinNode.getId())
+                            .get(1);
+            Long rightInputSize =
+                    aggregatedProducedBytesByTypeNumberAndNodeId
+                            .get(adaptiveJoinNode.getId())
+                            .get(2);
+            Preconditions.checkArgument(

Review Comment:
   checkArgument -> checkState
   
   And it's better to import `Preconditions.checkState` in ahead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to