This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit ce42c0a6045cb3b81cbe279350a100759f6eb32b Author: noorall <863485...@qq.com> AuthorDate: Thu Dec 26 10:45:05 2024 +0800 [FLINK-36629][table-planner] Introduce configuration options for adaptive skewed join optimization. --- .../generated/optimizer_config_configuration.html | 18 ++++++ .../table/api/config/OptimizerConfigOptions.java | 73 ++++++++++++++++++++++ 2 files changed, 91 insertions(+) diff --git a/docs/layouts/shortcodes/generated/optimizer_config_configuration.html b/docs/layouts/shortcodes/generated/optimizer_config_configuration.html index 0ab126d602e..652242a1771 100644 --- a/docs/layouts/shortcodes/generated/optimizer_config_configuration.html +++ b/docs/layouts/shortcodes/generated/optimizer_config_configuration.html @@ -8,6 +8,24 @@ </tr> </thead> <tbody> + <tr> + <td><h5>table.optimizer.skewed-join-optimization.strategy</h5><br> <span class="label label-primary">Batch</span></td> + <td style="word-wrap: break-word;">auto</td> + <td><p>Enum</p></td> + <td>Flink will handle skew in shuffled joins (sort-merge and hash) at runtime by splitting data according to the skewed join key. The value of this configuration determines how Flink performs this optimization. AUTO means Flink will automatically apply this optimization, FORCED means Flink will enforce this optimization even if it introduces extra hash shuffle, and NONE means this optimization will not be executed.<br /><br />Possible values:<ul><li>"auto": Flink will automa [...] + </tr> + <tr> + <td><h5>table.optimizer.skewed-join-optimization.skewed-factor</h5><br> <span class="label label-primary">Batch</span></td> + <td style="word-wrap: break-word;">4.0</td> + <td><p>Double</p></td> + <td>When a join operator instance encounters input data that exceeds N times the median size of other concurrent join operator instances, it is considered skewed (where N represents this skewed-factor). In such cases, Flink may automatically split the skewed data into multiple parts to ensure a more balanced data distribution, unless the data volume is below the skewed threshold(defined using table.optimizer.skewed-join-optimization.skewed-threshold).</td> + </tr> + <tr> + <td><h5>table.optimizer.skewed-join-optimization.skewed-threshold</h5><br> <span class="label label-primary">Batch</span></td> + <td style="word-wrap: break-word;">256 mb</td> + <td><p>MemorySize</p></td> + <td>When a join operator instance encounters input data that exceeds N times the median size of other concurrent join operator instances, it is considered skewed (where N represents the table.optimizer.skewed-join-optimization.skewed-factor). In such cases, Flink may automatically split the skewed data into multiple parts to ensure a more balanced data distribution, unless the data volume is below this skewed threshold.</td> + </tr> <tr> <td><h5>table.optimizer.adaptive-broadcast-join.strategy</h5><br> <span class="label label-primary">Batch</span></td> <td style="word-wrap: break-word;">auto</td> diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java index ccda21813f2..6f58edac8a3 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java @@ -177,6 +177,49 @@ public class OptimizerConfigOptions { + "hash join optimization is only performed at runtime, and NONE " + "means the optimization is only carried out at compile time."); + @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH) + public static final ConfigOption<AdaptiveSkewedJoinOptimizationStrategy> + TABLE_OPTIMIZER_ADAPTIVE_SKEWED_JOIN_OPTIMIZATION_STRATEGY = + key("table.optimizer.skewed-join-optimization.strategy") + .enumType(AdaptiveSkewedJoinOptimizationStrategy.class) + .defaultValue(AdaptiveSkewedJoinOptimizationStrategy.AUTO) + .withDescription( + "Flink will handle skew in shuffled joins (sort-merge and hash) " + + "at runtime by splitting data according to the skewed join " + + "key. The value of this configuration determines how Flink performs " + + "this optimization. AUTO means Flink will automatically apply this " + + "optimization, FORCED means Flink will enforce this optimization even " + + "if it introduces extra hash shuffle, and NONE means this optimization " + + "will not be executed."); + + @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH) + public static final ConfigOption<Double> + TABLE_OPTIMIZER_ADAPTIVE_SKEWED_JOIN_OPTIMIZATION_SKEWED_FACTOR = + key("table.optimizer.skewed-join-optimization.skewed-factor") + .doubleType() + .defaultValue(4.0) + .withDescription( + "When a join operator instance encounters input data that exceeds N times the median " + + "size of other concurrent join operator instances, it is considered skewed " + + "(where N represents this skewed-factor). In such cases, Flink may automatically " + + "split the skewed data into multiple parts to ensure a more balanced data " + + "distribution, unless the data volume is below the skewed threshold(defined " + + "using table.optimizer.skewed-join-optimization.skewed-threshold)."); + + @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH) + public static final ConfigOption<MemorySize> + TABLE_OPTIMIZER_ADAPTIVE_SKEWED_JOIN_OPTIMIZATION_SKEWED_THRESHOLD = + key("table.optimizer.skewed-join-optimization.skewed-threshold") + .memoryType() + .defaultValue(MemorySize.ofMebiBytes(256)) + .withDescription( + "When a join operator instance encounters input data that exceeds N times the median " + + "size of other concurrent join operator instances, it is considered skewed " + + "(where N represents the table.optimizer.skewed-join-optimization.skewed-factor). " + + "In such cases, Flink may automatically split the skewed data into multiple " + + "parts to ensure a more balanced data distribution, unless the data volume " + + "is below this skewed threshold."); + /** * The data volume of build side needs to be under this value. If the data volume of build side * is too large, the building overhead will be too large, which may lead to a negative impact on @@ -352,4 +395,34 @@ public class OptimizerConfigOptions { return description; } } + + /** Strategies used for {@link #TABLE_OPTIMIZER_ADAPTIVE_SKEWED_JOIN_OPTIMIZATION_STRATEGY}. */ + @PublicEvolving + public enum AdaptiveSkewedJoinOptimizationStrategy implements DescribedEnum { + AUTO("auto", text(" Flink will automatically perform this optimization.")), + FORCED( + "forced", + text( + "Flink will perform this optimization even if it introduces extra hash shuffling.")), + NONE("none", text("Skewed join optimization will not be performed.")); + + private final String value; + + private final InlineElement description; + + AdaptiveSkewedJoinOptimizationStrategy(String value, InlineElement description) { + this.value = value; + this.description = description; + } + + @Override + public String toString() { + return value; + } + + @Override + public InlineElement getDescription() { + return description; + } + } }