zhuzhurk commented on code in PR #25859:
URL: https://github.com/apache/flink/pull/25859#discussion_r1912604761
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java:
##########
@@ -177,6 +177,53 @@ public class OptimizerConfigOptions {
+ "hash join optimization is only
performed at runtime, and NONE "
+ "means the optimization is only
carried out at compile time.");
+ @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH)
+ public static final ConfigOption<AdaptiveSkewedJoinOptimizationStrategy>
+ TABLE_OPTIMIZER_ADAPTIVE_SKEWED_JOIN_OPTIMIZATION_STRATEGY =
+ key("table.optimizer.skewed-join-optimization.strategy")
+
.enumType(AdaptiveSkewedJoinOptimizationStrategy.class)
+
.defaultValue(AdaptiveSkewedJoinOptimizationStrategy.AUTO)
+ .withDescription(
+ "Flink will handle skew in shuffled joins
(sort-merge and hash) "
+ + "at runtime by splitting data
corresponding to the skewed join "
+ + "key. The value of this
configuration determines how Flink performs "
+ + "this optimization. AUTO means
Flink will automatically apply this "
+ + "optimization, FORCED means
Flink will enforce this optimization even "
+ + "if it introduces extra hash
shuffle, and NONE means this optimization "
+ + "will not be executed.");
+
+ @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH)
+ public static final ConfigOption<Double>
+ TABLE_OPTIMIZER_ADAPTIVE_SKEWED_JOIN_OPTIMIZATION_SKEWED_FACTOR =
+
key("table.optimizer.skewed-join-optimization.skewed-factor")
+ .doubleType()
+ .defaultValue(4.0)
+ .withDescription(
+ "During the join phase, Flink will
automatically reduce the ratio of the "
+ + "maximum to median concurrent
task processing data volume to below "
+ + "the skewed-factor and will also
achieve a more balanced data distribution, "
Review Comment:
> Flink will automatically reduce the ratio of the maximum to median
concurrent task processing data volume to below the skewed-factor and will also
achieve a more balanced data distribution
When a join operator instance encounters input data that exceeds N times the
median size of other concurrent join operator instances, it is considered
skewed (where N represents this skewed-factor). In such cases, Flink may
automatically split the skewed data into multiple parts to ensure a more
balanced data distribution, unless the data volume is below the skewed
threshold(defined using
`table.optimizer.skewed-join-optimization.skewed-threshold`).
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java:
##########
@@ -177,6 +177,53 @@ public class OptimizerConfigOptions {
+ "hash join optimization is only
performed at runtime, and NONE "
+ "means the optimization is only
carried out at compile time.");
+ @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH)
+ public static final ConfigOption<AdaptiveSkewedJoinOptimizationStrategy>
+ TABLE_OPTIMIZER_ADAPTIVE_SKEWED_JOIN_OPTIMIZATION_STRATEGY =
+ key("table.optimizer.skewed-join-optimization.strategy")
+
.enumType(AdaptiveSkewedJoinOptimizationStrategy.class)
+
.defaultValue(AdaptiveSkewedJoinOptimizationStrategy.AUTO)
+ .withDescription(
+ "Flink will handle skew in shuffled joins
(sort-merge and hash) "
+ + "at runtime by splitting data
corresponding to the skewed join "
Review Comment:
corresponding to -> according to
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java:
##########
@@ -177,6 +177,53 @@ public class OptimizerConfigOptions {
+ "hash join optimization is only
performed at runtime, and NONE "
+ "means the optimization is only
carried out at compile time.");
+ @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH)
+ public static final ConfigOption<AdaptiveSkewedJoinOptimizationStrategy>
+ TABLE_OPTIMIZER_ADAPTIVE_SKEWED_JOIN_OPTIMIZATION_STRATEGY =
+ key("table.optimizer.skewed-join-optimization.strategy")
+
.enumType(AdaptiveSkewedJoinOptimizationStrategy.class)
+
.defaultValue(AdaptiveSkewedJoinOptimizationStrategy.AUTO)
+ .withDescription(
+ "Flink will handle skew in shuffled joins
(sort-merge and hash) "
+ + "at runtime by splitting data
corresponding to the skewed join "
+ + "key. The value of this
configuration determines how Flink performs "
+ + "this optimization. AUTO means
Flink will automatically apply this "
+ + "optimization, FORCED means
Flink will enforce this optimization even "
+ + "if it introduces extra hash
shuffle, and NONE means this optimization "
+ + "will not be executed.");
+
+ @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH)
+ public static final ConfigOption<Double>
+ TABLE_OPTIMIZER_ADAPTIVE_SKEWED_JOIN_OPTIMIZATION_SKEWED_FACTOR =
+
key("table.optimizer.skewed-join-optimization.skewed-factor")
+ .doubleType()
+ .defaultValue(4.0)
+ .withDescription(
+ "During the join phase, Flink will
automatically reduce the ratio of the "
+ + "maximum to median concurrent
task processing data volume to below "
+ + "the skewed-factor and will also
achieve a more balanced data distribution, "
+ + "unless the maximum value is
below the `table.optimizer.skewed-join-optimization.skewed-threshold`. Note
that "
+ + "this optimization has
additional overhead, and after balancing, there "
+ + "may still be a 2x difference in
data volume. Users can adjust this "
+ + "parameter based on the 'Read
Bytes' metric. We recommend that this value "
+ + "be set to no less than 3 when
the left and right tables have similar data "
+ + "volume, and to no less than 2
in other cases.");
+
+ @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH)
+ public static final ConfigOption<MemorySize>
+ TABLE_OPTIMIZER_ADAPTIVE_SKEWED_JOIN_OPTIMIZATION_SKEWED_THRESHOLD
=
+
key("table.optimizer.skewed-join-optimization.skewed-threshold")
+ .memoryType()
+ .defaultValue(MemorySize.ofMebiBytes(256))
+ .withDescription(
+ "During the join phase, when the maximum
data volume processed by a concurrent task "
+ + "is greater than the
skewed-threshold, Flink can automatically reduce the "
+ + "ratio of the maximum data
volume processed by a concurrent task to the median "
+ + "to less than the
`table.optimizer.skewed-join-optimization.skewed-factor` and "
+ + "will also achieve a more
balanced data distribution. Note that this optimization "
Review Comment:
> During the join phase, when the maximum data volume processed by a
concurrent task is greater than the skewed-threshold, Flink can automatically
reduce the ratio of the maximum data volume processed by a concurrent task to
the median to less than the
`table.optimizer.skewed-join-optimization.skewed-factor` and will also achieve
a more balanced data distribution.
When a join operator instance encounters input data that exceeds N times the
median size of other concurrent join operator instances, it is considered
skewed (where N represents the skewed-factor). In such cases, Flink may
automatically split the skewed data into multiple parts to ensure a more
balanced data distribution, unless the data volume is below the skewed
threshold(defined using
table.optimizer.skewed-join-optimization.skewed-threshold).
##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/StreamEdgeUpdateRequestInfo.java:
##########
@@ -34,6 +34,7 @@ public class StreamEdgeUpdateRequestInfo {
// For two or more inputs, typeNumber must be >= 1, and 0 means the
request will not change the
// typeNumber.
private int typeNumber;
+ private Boolean intraInputKeyCorrelated;
Review Comment:
It's better to use `Optional<Boolean>` to explicitly demonstrate that this
property is optional.
And I prefer to add an empty new line before this field because it is not
related to the comment above.
##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/adaptive/AdaptiveJoinTest.scala:
##########
@@ -38,6 +38,9 @@ class AdaptiveJoinTest extends TableTestBase {
util.tableConfig.set(
OptimizerConfigOptions.TABLE_OPTIMIZER_ADAPTIVE_BROADCAST_JOIN_STRATEGY,
OptimizerConfigOptions.AdaptiveBroadcastJoinStrategy.AUTO)
+ util.tableConfig.set(
+
OptimizerConfigOptions.TABLE_OPTIMIZER_ADAPTIVE_SKEWED_JOIN_OPTIMIZATION_STRATEGY,
+ OptimizerConfigOptions.AdaptiveSkewedJoinOptimizationStrategy.AUTO)
Review Comment:
Looks to me this change should not be needed here.
A few new cases should be added to test how the
`TABLE_OPTIMIZER_ADAPTIVE_SKEWED_JOIN_OPTIMIZATION_STRATEGY` affects the
generated plan.
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/AdaptiveJoinProcessor.java:
##########
@@ -142,6 +142,11 @@ private boolean isAdaptiveJoinEnabled(ProcessorContext
context) {
!=
OptimizerConfigOptions.AdaptiveBroadcastJoinStrategy.NONE
&& !TableConfigUtils.isOperatorDisabled(
tableConfig, OperatorType.BroadcastHashJoin);
+ isAdaptiveJoinEnabled |=
Review Comment:
Looks to me this new case is not covered in `AdaptiveJoinTest`
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java:
##########
@@ -177,6 +177,53 @@ public class OptimizerConfigOptions {
+ "hash join optimization is only
performed at runtime, and NONE "
+ "means the optimization is only
carried out at compile time.");
+ @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH)
+ public static final ConfigOption<AdaptiveSkewedJoinOptimizationStrategy>
+ TABLE_OPTIMIZER_ADAPTIVE_SKEWED_JOIN_OPTIMIZATION_STRATEGY =
+ key("table.optimizer.skewed-join-optimization.strategy")
+
.enumType(AdaptiveSkewedJoinOptimizationStrategy.class)
+
.defaultValue(AdaptiveSkewedJoinOptimizationStrategy.AUTO)
+ .withDescription(
+ "Flink will handle skew in shuffled joins
(sort-merge and hash) "
+ + "at runtime by splitting data
corresponding to the skewed join "
+ + "key. The value of this
configuration determines how Flink performs "
+ + "this optimization. AUTO means
Flink will automatically apply this "
+ + "optimization, FORCED means
Flink will enforce this optimization even "
+ + "if it introduces extra hash
shuffle, and NONE means this optimization "
+ + "will not be executed.");
+
+ @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH)
+ public static final ConfigOption<Double>
+ TABLE_OPTIMIZER_ADAPTIVE_SKEWED_JOIN_OPTIMIZATION_SKEWED_FACTOR =
+
key("table.optimizer.skewed-join-optimization.skewed-factor")
+ .doubleType()
+ .defaultValue(4.0)
+ .withDescription(
+ "During the join phase, Flink will
automatically reduce the ratio of the "
+ + "maximum to median concurrent
task processing data volume to below "
+ + "the skewed-factor and will also
achieve a more balanced data distribution, "
Review Comment:
To make it easier to understand for Flink users.
##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/ImmutableStreamEdge.java:
##########
@@ -45,4 +48,16 @@ public int getSourceId() {
public String getEdgeId() {
return streamEdge.getEdgeId();
}
+
+ public boolean isForwardForConsecutiveHashEdge() {
+ return streamEdge.getPartitioner() instanceof
ForwardForConsecutiveHashPartitioner;
+ }
+
+ public boolean isExactForwardEdge() {
+ return
streamEdge.getPartitioner().getClass().equals(ForwardPartitioner.class);
+ }
+
+ public boolean isBroadcastEdge() {
Review Comment:
This commit should happen before the commit `Modify AdaptiveJoinProcessor to
support adaptive skewed join optimization` otherwise that commit won't compile
since it relies on this `isBroadcastEdge()`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]