This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ce42c0a6045cb3b81cbe279350a100759f6eb32b
Author: noorall <863485...@qq.com>
AuthorDate: Thu Dec 26 10:45:05 2024 +0800

    [FLINK-36629][table-planner] Introduce configuration options for adaptive 
skewed join optimization.
---
 .../generated/optimizer_config_configuration.html  | 18 ++++++
 .../table/api/config/OptimizerConfigOptions.java   | 73 ++++++++++++++++++++++
 2 files changed, 91 insertions(+)

diff --git 
a/docs/layouts/shortcodes/generated/optimizer_config_configuration.html 
b/docs/layouts/shortcodes/generated/optimizer_config_configuration.html
index 0ab126d602e..652242a1771 100644
--- a/docs/layouts/shortcodes/generated/optimizer_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/optimizer_config_configuration.html
@@ -8,6 +8,24 @@
         </tr>
     </thead>
     <tbody>
+        <tr>
+            <td><h5>table.optimizer.skewed-join-optimization.strategy</h5><br> 
<span class="label label-primary">Batch</span></td>
+            <td style="word-wrap: break-word;">auto</td>
+            <td><p>Enum</p></td>
+            <td>Flink will handle skew in shuffled joins (sort-merge and hash) 
at runtime by splitting data according to the skewed join key. The value of 
this configuration determines how Flink performs this optimization. AUTO means 
Flink will automatically apply this optimization, FORCED means Flink will 
enforce this optimization even if it introduces extra hash shuffle, and NONE 
means this optimization will not be executed.<br /><br />Possible 
values:<ul><li>"auto":  Flink will automa [...]
+        </tr>
+        <tr>
+            
<td><h5>table.optimizer.skewed-join-optimization.skewed-factor</h5><br> <span 
class="label label-primary">Batch</span></td>
+            <td style="word-wrap: break-word;">4.0</td>
+            <td><p>Double</p></td>
+            <td>When a join operator instance encounters input data that 
exceeds N times the median size of other concurrent join operator instances, it 
is considered skewed (where N represents this skewed-factor). In such cases, 
Flink may automatically split the skewed data into multiple parts to ensure a 
more balanced data distribution, unless the data volume is below the skewed 
threshold(defined using 
table.optimizer.skewed-join-optimization.skewed-threshold).</td>
+        </tr>
+        <tr>
+            
<td><h5>table.optimizer.skewed-join-optimization.skewed-threshold</h5><br> 
<span class="label label-primary">Batch</span></td>
+            <td style="word-wrap: break-word;">256 mb</td>
+            <td><p>MemorySize</p></td>
+            <td>When a join operator instance encounters input data that 
exceeds N times the median size of other concurrent join operator instances, it 
is considered skewed (where N represents the 
table.optimizer.skewed-join-optimization.skewed-factor). In such cases, Flink 
may automatically split the skewed data into multiple parts to ensure a more 
balanced data distribution, unless the data volume is below this skewed 
threshold.</td>
+        </tr>
         <tr>
             <td><h5>table.optimizer.adaptive-broadcast-join.strategy</h5><br> 
<span class="label label-primary">Batch</span></td>
             <td style="word-wrap: break-word;">auto</td>
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java
index ccda21813f2..6f58edac8a3 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java
@@ -177,6 +177,49 @@ public class OptimizerConfigOptions {
                                             + "hash join optimization is only 
performed at runtime, and NONE "
                                             + "means the optimization is only 
carried out at compile time.");
 
+    @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH)
+    public static final ConfigOption<AdaptiveSkewedJoinOptimizationStrategy>
+            TABLE_OPTIMIZER_ADAPTIVE_SKEWED_JOIN_OPTIMIZATION_STRATEGY =
+                    key("table.optimizer.skewed-join-optimization.strategy")
+                            
.enumType(AdaptiveSkewedJoinOptimizationStrategy.class)
+                            
.defaultValue(AdaptiveSkewedJoinOptimizationStrategy.AUTO)
+                            .withDescription(
+                                    "Flink will handle skew in shuffled joins 
(sort-merge and hash) "
+                                            + "at runtime by splitting data 
according to the skewed join "
+                                            + "key. The value of this 
configuration determines how Flink performs "
+                                            + "this optimization. AUTO means 
Flink will automatically apply this "
+                                            + "optimization, FORCED means 
Flink will enforce this optimization even "
+                                            + "if it introduces extra hash 
shuffle, and NONE means this optimization "
+                                            + "will not be executed.");
+
+    @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH)
+    public static final ConfigOption<Double>
+            TABLE_OPTIMIZER_ADAPTIVE_SKEWED_JOIN_OPTIMIZATION_SKEWED_FACTOR =
+                    
key("table.optimizer.skewed-join-optimization.skewed-factor")
+                            .doubleType()
+                            .defaultValue(4.0)
+                            .withDescription(
+                                    "When a join operator instance encounters 
input data that exceeds N times the median "
+                                            + "size of other concurrent join 
operator instances, it is considered skewed "
+                                            + "(where N represents this 
skewed-factor). In such cases, Flink may automatically "
+                                            + "split the skewed data into 
multiple parts to ensure a more balanced data "
+                                            + "distribution, unless the data 
volume is below the skewed threshold(defined "
+                                            + "using 
table.optimizer.skewed-join-optimization.skewed-threshold).");
+
+    @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH)
+    public static final ConfigOption<MemorySize>
+            TABLE_OPTIMIZER_ADAPTIVE_SKEWED_JOIN_OPTIMIZATION_SKEWED_THRESHOLD 
=
+                    
key("table.optimizer.skewed-join-optimization.skewed-threshold")
+                            .memoryType()
+                            .defaultValue(MemorySize.ofMebiBytes(256))
+                            .withDescription(
+                                    "When a join operator instance encounters 
input data that exceeds N times the median "
+                                            + "size of other concurrent join 
operator instances, it is considered skewed "
+                                            + "(where N represents the 
table.optimizer.skewed-join-optimization.skewed-factor). "
+                                            + "In such cases, Flink may 
automatically split the skewed data into multiple "
+                                            + "parts to ensure a more balanced 
data distribution, unless the data volume "
+                                            + "is below this skewed 
threshold.");
+
     /**
      * The data volume of build side needs to be under this value. If the data 
volume of build side
      * is too large, the building overhead will be too large, which may lead 
to a negative impact on
@@ -352,4 +395,34 @@ public class OptimizerConfigOptions {
             return description;
         }
     }
+
+    /** Strategies used for {@link 
#TABLE_OPTIMIZER_ADAPTIVE_SKEWED_JOIN_OPTIMIZATION_STRATEGY}. */
+    @PublicEvolving
+    public enum AdaptiveSkewedJoinOptimizationStrategy implements 
DescribedEnum {
+        AUTO("auto", text(" Flink will automatically perform this 
optimization.")),
+        FORCED(
+                "forced",
+                text(
+                        "Flink will perform this optimization even if it 
introduces extra hash shuffling.")),
+        NONE("none", text("Skewed join optimization will not be performed."));
+
+        private final String value;
+
+        private final InlineElement description;
+
+        AdaptiveSkewedJoinOptimizationStrategy(String value, InlineElement 
description) {
+            this.value = value;
+            this.description = description;
+        }
+
+        @Override
+        public String toString() {
+            return value;
+        }
+
+        @Override
+        public InlineElement getDescription() {
+            return description;
+        }
+    }
 }

Reply via email to