This is an automated email from the ASF dual-hosted git repository. junrui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new d774cf1bdea [FLINK-37139][table-planner] Improve AdaptiveSkewedJoinOptimizationStrategy to make it effective in more scenarios (#25989) d774cf1bdea is described below commit d774cf1bdeac654b35993f43718e4fdc68f129cc Author: Lei Yang <30954408+noor...@users.noreply.github.com> AuthorDate: Thu Jan 16 13:00:32 2025 +0800 [FLINK-37139][table-planner] Improve AdaptiveSkewedJoinOptimizationStrategy to make it effective in more scenarios (#25989) --- .../streaming/api/graph/AdaptiveGraphManager.java | 5 +++++ .../flink/streaming/api/graph/StreamEdge.java | 3 --- .../api/graph/util/ImmutableStreamEdge.java | 5 ++--- .../sql/adaptive/AdaptiveSkewedJoinITCase.scala | 6 ++++++ .../AdaptiveSkewedJoinOptimizationStrategy.java | 25 ++++++++++++++++------ 5 files changed, 31 insertions(+), 13 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/AdaptiveGraphManager.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/AdaptiveGraphManager.java index cb327ac9600..04fa7c997a0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/AdaptiveGraphManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/AdaptiveGraphManager.java @@ -618,6 +618,11 @@ public class AdaptiveGraphManager streamGraph.getStreamNode(edge.getSourceId()), streamGraph)) || isChainable(edge, streamGraph)) { edge.setPartitioner(new ForwardPartitioner<>()); + // Currently, there is no intra input key correlation for edge with + // ForwardForUnspecifiedPartitioner, and we need to modify it to false. + if (partitioner instanceof ForwardForUnspecifiedPartitioner) { + edge.setIntraInputKeyCorrelated(false); + } } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java index 3d604a0887d..bc750f94b59 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java @@ -30,7 +30,6 @@ import java.util.Objects; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; -import static org.apache.flink.util.Preconditions.checkState; /** * An edge in the streaming topology. One edge like this does not necessarily gets converted to a @@ -291,8 +290,6 @@ public class StreamEdge implements Serializable { } public void setIntraInputKeyCorrelated(boolean intraInputKeyCorrelated) { - // We hope to strictly control the behavior of this modification to avoid unexpected errors. - checkState(interInputsKeysCorrelated, "interInputsKeysCorrelated must be true"); this.intraInputKeyCorrelated = intraInputKeyCorrelated; } } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/ImmutableStreamEdge.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/ImmutableStreamEdge.java index d62cde9f63c..39d7d7da727 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/ImmutableStreamEdge.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/ImmutableStreamEdge.java @@ -22,7 +22,6 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.streaming.api.graph.StreamEdge; import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner; import org.apache.flink.streaming.runtime.partitioner.ForwardForConsecutiveHashPartitioner; -import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner; /** Helper class that provides read-only StreamEdge. */ @Internal @@ -53,8 +52,8 @@ public class ImmutableStreamEdge { return streamEdge.getPartitioner() instanceof ForwardForConsecutiveHashPartitioner; } - public boolean isExactForwardEdge() { - return streamEdge.getPartitioner().getClass().equals(ForwardPartitioner.class); + public boolean isIntraInputKeyCorrelated() { + return streamEdge.isIntraInputKeyCorrelated(); } public boolean isBroadcastEdge() { diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/adaptive/AdaptiveSkewedJoinITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/adaptive/AdaptiveSkewedJoinITCase.scala index 4a2e05b82d0..9b1bd4d0785 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/adaptive/AdaptiveSkewedJoinITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/adaptive/AdaptiveSkewedJoinITCase.scala @@ -81,6 +81,12 @@ class AdaptiveSkewedJoinITCase extends AdaptiveJoinITCase { checkResult(sql) } + @Test + def testJoinWithUnspecifiedForwardOutput(): Unit = { + val sql = "SELECT a1 as a, b1 as b, c1 as c, d1 as d FROM T1, T2 WHERE a1 = a2" + checkResult(sql) + } + override def checkResult(sql: String): Unit = { tEnv.getConfig .set( diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/strategy/AdaptiveSkewedJoinOptimizationStrategy.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/strategy/AdaptiveSkewedJoinOptimizationStrategy.java index e9c628e20bd..ea0420c50f8 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/strategy/AdaptiveSkewedJoinOptimizationStrategy.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/strategy/AdaptiveSkewedJoinOptimizationStrategy.java @@ -125,11 +125,10 @@ public class AdaptiveSkewedJoinOptimizationStrategy } if (adaptiveSkewedJoinOptimizationStrategy == OptimizerConfigOptions.AdaptiveSkewedJoinOptimizationStrategy.AUTO) { - return !existExactForwardOutEdge(adaptiveJoinNode.getOutEdges()) - && !existForwardForConsecutiveHashOutEdge(adaptiveJoinNode.getOutEdges()); + return canPerformOptimizationAutomatic(adaptiveJoinNode); } else if (adaptiveSkewedJoinOptimizationStrategy == OptimizerConfigOptions.AdaptiveSkewedJoinOptimizationStrategy.FORCED) { - return !existExactForwardOutEdge(adaptiveJoinNode.getOutEdges()); + return canPerformOptimizationForced(adaptiveJoinNode); } else { return false; } @@ -301,11 +300,23 @@ public class AdaptiveSkewedJoinOptimizationStrategy return false; } - private static boolean existExactForwardOutEdge(List<ImmutableStreamEdge> edges) { - return edges.stream().anyMatch(ImmutableStreamEdge::isExactForwardEdge); + private static boolean canPerformOptimizationAutomatic(ImmutableStreamNode adaptiveJoinNode) { + // In AUTO mode, we need to ensure that there are no intra-correlated out edge to ensure the + // application of this optimization wouldn't break data correctness or introduce additional + // performance overhead. + return adaptiveJoinNode.getOutEdges().stream() + .noneMatch(ImmutableStreamEdge::isIntraInputKeyCorrelated); } - private static boolean existForwardForConsecutiveHashOutEdge(List<ImmutableStreamEdge> edges) { - return edges.stream().anyMatch(ImmutableStreamEdge::isForwardForConsecutiveHashEdge); + private static boolean canPerformOptimizationForced(ImmutableStreamNode adaptiveJoinNode) { + // In FORCED mode, if there is an intra-correlated out edge, and the type of it is + // ForwardForConsecutiveHash, we can modify its partitioner to HashPartitioner to ensure + // the data correctness after the optimization applied. Otherwise, this optimization is not + // allowed. + return adaptiveJoinNode.getOutEdges().stream() + .noneMatch( + edge -> + edge.isIntraInputKeyCorrelated() + && !edge.isForwardForConsecutiveHashEdge()); } }