This is an automated email from the ASF dual-hosted git repository.

junrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d774cf1bdea [FLINK-37139][table-planner] Improve 
AdaptiveSkewedJoinOptimizationStrategy to make it effective in more scenarios 
(#25989)
d774cf1bdea is described below

commit d774cf1bdeac654b35993f43718e4fdc68f129cc
Author: Lei Yang <30954408+noor...@users.noreply.github.com>
AuthorDate: Thu Jan 16 13:00:32 2025 +0800

    [FLINK-37139][table-planner] Improve AdaptiveSkewedJoinOptimizationStrategy 
to make it effective in more scenarios (#25989)
---
 .../streaming/api/graph/AdaptiveGraphManager.java  |  5 +++++
 .../flink/streaming/api/graph/StreamEdge.java      |  3 ---
 .../api/graph/util/ImmutableStreamEdge.java        |  5 ++---
 .../sql/adaptive/AdaptiveSkewedJoinITCase.scala    |  6 ++++++
 .../AdaptiveSkewedJoinOptimizationStrategy.java    | 25 ++++++++++++++++------
 5 files changed, 31 insertions(+), 13 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/AdaptiveGraphManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/AdaptiveGraphManager.java
index cb327ac9600..04fa7c997a0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/AdaptiveGraphManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/AdaptiveGraphManager.java
@@ -618,6 +618,11 @@ public class AdaptiveGraphManager
                                     
streamGraph.getStreamNode(edge.getSourceId()), streamGraph))
                     || isChainable(edge, streamGraph)) {
                 edge.setPartitioner(new ForwardPartitioner<>());
+                // Currently, there is no intra input key correlation for edge 
with
+                // ForwardForUnspecifiedPartitioner, and we need to modify it 
to false.
+                if (partitioner instanceof ForwardForUnspecifiedPartitioner) {
+                    edge.setIntraInputKeyCorrelated(false);
+                }
             }
         }
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
index 3d604a0887d..bc750f94b59 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
@@ -30,7 +30,6 @@ import java.util.Objects;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * An edge in the streaming topology. One edge like this does not necessarily 
gets converted to a
@@ -291,8 +290,6 @@ public class StreamEdge implements Serializable {
     }
 
     public void setIntraInputKeyCorrelated(boolean intraInputKeyCorrelated) {
-        // We hope to strictly control the behavior of this modification to 
avoid unexpected errors.
-        checkState(interInputsKeysCorrelated, "interInputsKeysCorrelated must 
be true");
         this.intraInputKeyCorrelated = intraInputKeyCorrelated;
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/ImmutableStreamEdge.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/ImmutableStreamEdge.java
index d62cde9f63c..39d7d7da727 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/ImmutableStreamEdge.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/ImmutableStreamEdge.java
@@ -22,7 +22,6 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
 import 
org.apache.flink.streaming.runtime.partitioner.ForwardForConsecutiveHashPartitioner;
-import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
 
 /** Helper class that provides read-only StreamEdge. */
 @Internal
@@ -53,8 +52,8 @@ public class ImmutableStreamEdge {
         return streamEdge.getPartitioner() instanceof 
ForwardForConsecutiveHashPartitioner;
     }
 
-    public boolean isExactForwardEdge() {
-        return 
streamEdge.getPartitioner().getClass().equals(ForwardPartitioner.class);
+    public boolean isIntraInputKeyCorrelated() {
+        return streamEdge.isIntraInputKeyCorrelated();
     }
 
     public boolean isBroadcastEdge() {
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/adaptive/AdaptiveSkewedJoinITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/adaptive/AdaptiveSkewedJoinITCase.scala
index 4a2e05b82d0..9b1bd4d0785 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/adaptive/AdaptiveSkewedJoinITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/adaptive/AdaptiveSkewedJoinITCase.scala
@@ -81,6 +81,12 @@ class AdaptiveSkewedJoinITCase extends AdaptiveJoinITCase {
     checkResult(sql)
   }
 
+  @Test
+  def testJoinWithUnspecifiedForwardOutput(): Unit = {
+    val sql = "SELECT a1 as a, b1 as b, c1 as c, d1 as d FROM T1, T2 WHERE a1 
= a2"
+    checkResult(sql)
+  }
+
   override def checkResult(sql: String): Unit = {
     tEnv.getConfig
       .set(
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/strategy/AdaptiveSkewedJoinOptimizationStrategy.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/strategy/AdaptiveSkewedJoinOptimizationStrategy.java
index e9c628e20bd..ea0420c50f8 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/strategy/AdaptiveSkewedJoinOptimizationStrategy.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/strategy/AdaptiveSkewedJoinOptimizationStrategy.java
@@ -125,11 +125,10 @@ public class AdaptiveSkewedJoinOptimizationStrategy
         }
         if (adaptiveSkewedJoinOptimizationStrategy
                 == 
OptimizerConfigOptions.AdaptiveSkewedJoinOptimizationStrategy.AUTO) {
-            return !existExactForwardOutEdge(adaptiveJoinNode.getOutEdges())
-                    && 
!existForwardForConsecutiveHashOutEdge(adaptiveJoinNode.getOutEdges());
+            return canPerformOptimizationAutomatic(adaptiveJoinNode);
         } else if (adaptiveSkewedJoinOptimizationStrategy
                 == 
OptimizerConfigOptions.AdaptiveSkewedJoinOptimizationStrategy.FORCED) {
-            return !existExactForwardOutEdge(adaptiveJoinNode.getOutEdges());
+            return canPerformOptimizationForced(adaptiveJoinNode);
         } else {
             return false;
         }
@@ -301,11 +300,23 @@ public class AdaptiveSkewedJoinOptimizationStrategy
         return false;
     }
 
-    private static boolean existExactForwardOutEdge(List<ImmutableStreamEdge> 
edges) {
-        return 
edges.stream().anyMatch(ImmutableStreamEdge::isExactForwardEdge);
+    private static boolean canPerformOptimizationAutomatic(ImmutableStreamNode 
adaptiveJoinNode) {
+        // In AUTO mode, we need to ensure that there are no intra-correlated 
out edge to ensure the
+        // application of this optimization wouldn't break data correctness or 
introduce additional
+        // performance overhead.
+        return adaptiveJoinNode.getOutEdges().stream()
+                .noneMatch(ImmutableStreamEdge::isIntraInputKeyCorrelated);
     }
 
-    private static boolean 
existForwardForConsecutiveHashOutEdge(List<ImmutableStreamEdge> edges) {
-        return 
edges.stream().anyMatch(ImmutableStreamEdge::isForwardForConsecutiveHashEdge);
+    private static boolean canPerformOptimizationForced(ImmutableStreamNode 
adaptiveJoinNode) {
+        // In FORCED mode, if there is an intra-correlated out edge, and the 
type of it is
+        // ForwardForConsecutiveHash, we can modify its partitioner to 
HashPartitioner to ensure
+        // the data correctness after the optimization applied. Otherwise, 
this optimization is not
+        // allowed.
+        return adaptiveJoinNode.getOutEdges().stream()
+                .noneMatch(
+                        edge ->
+                                edge.isIntraInputKeyCorrelated()
+                                        && 
!edge.isForwardForConsecutiveHashEdge());
     }
 }

Reply via email to