This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 04964f30e [#2606] feat(mr): Add safety switch for map-stage combiner 
(#2607)
04964f30e is described below

commit 04964f30e367e5d698f2965c287474ded0f0b71b
Author: l.zonghai <842315...@qq.com>
AuthorDate: Tue Sep 16 10:08:39 2025 +0800

    [#2606] feat(mr): Add safety switch for map-stage combiner (#2607)
    
    ### What changes were proposed in this pull request?
    
    Introduce a configuration `mapreduce.rss.client.combiner.enable` to control 
whether the map-stage combiner runs in Uniffle MapReduce client.
    Default value is `false` to prevent job instability caused by large 
send-buffer GC storm.
    
    ### Why are the changes needed?
    
    Using map-stage combiner on large send buffer (`mapreduce.task.io.sort.mb * 
mapreduce.rss.client.sort.memory.use.threshold`) can trigger severe GC overhead,
    which may stall MapTask and sender threads, leading to job hang. Most users 
do not require this by default.
    
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, this adds a new optional configuration for expert users. Default 
behavior remains stable.
    
    ### How was this patch tested?
    
    Manually tested with MapReduce jobs with combiners. Verified that jobs run 
successfully with combiner disabled.
    1. **Combiner disabled**: MapTasks completed normally with fast GC cycles. 
Sample logs:
    ```
    [2025-09-11 19:48:47] S0: 0MB, S1: 0MB, Eden: 299.02MB, Old: 11.53MB, ... 
Total: 0.101s
    ...
    [2025-09-11 19:49:30] S0: 82.88MB, S1: 0MB, Eden: 160.04MB, Old: 485.99MB, 
...Total: 6.683s
    ...
    [2025-09-11 19:49:57] S0: 0MB, S1: 0MB, Eden: 207.66MB, Old: 532.27MB, 
...Total: 12.474s
    ```
    > The MapTask completed successfully within 1 minute.
    
    2. **Combiner enabled**: MapTask GC cycles grew very long; job stalled and 
was eventually killed. Sample logs:
    
    ```
    [2025-09-11 19:52:00] S0: 0MB, S1: 0MB, Eden: 80.49MB, Old: 12.86MB, ... 
Total: 0.054s
    [2025-09-11 19:52:08] S0: 0MB, S1: 0MB, Eden: 515.53MB, Old: 27.24MB, ... 
Total: 0.149s
    ...
    [2025-09-11 20:01:54] S0: 0MB, S1: 0MB, Eden: 60.36MB, Old: 687.51MB, ... 
Total: 242.505s
    ```
    
    > The MapTask did not complete after 9 minutes.
    
    These logs demonstrate that disabling the map-stage combiner avoids severe 
GC overhead and job stalls, validating the safety switch.
    
    ---------
    
    Co-authored-by: Lobo2008 <842315...@qq.coom>
---
 .../hadoop/mapred/RssMapOutputCollector.java       | 31 +++++++++++++++++-----
 .../org/apache/hadoop/mapreduce/RssMRConfig.java   |  4 +++
 .../uniffle/client/util/RssClientConfig.java       |  2 ++
 3 files changed, 31 insertions(+), 6 deletions(-)

diff --git 
a/client-mr/core/src/main/java/org/apache/hadoop/mapred/RssMapOutputCollector.java
 
b/client-mr/core/src/main/java/org/apache/hadoop/mapred/RssMapOutputCollector.java
index 2db387e3f..0823dbd66 100644
--- 
a/client-mr/core/src/main/java/org/apache/hadoop/mapred/RssMapOutputCollector.java
+++ 
b/client-mr/core/src/main/java/org/apache/hadoop/mapred/RssMapOutputCollector.java
@@ -78,12 +78,31 @@ public class RssMapOutputCollector<K extends Object, V 
extends Object>
       throw new IOException("Invalid  sort memory use threshold : " + 
sortThreshold);
     }
 
-    // combiner
-    final Counters.Counter combineInputCounter =
-        reporter.getCounter(TaskCounter.COMBINE_INPUT_RECORDS);
-    combinerRunner =
-        Task.CombinerRunner.create(
-            mrJobConf, mapTask.getTaskID(), combineInputCounter, reporter, 
null);
+    boolean enableCombiner =
+        RssMRUtils.getBoolean(
+            rssJobConf,
+            RssMRConfig.RSS_CLIENT_COMBINER_ENABLE,
+            RssMRConfig.RSS_CLIENT_COMBINER_ENABLE_DEFAULT);
+
+    combinerRunner = null;
+    if (enableCombiner) {
+      try {
+        final Counters.Counter combineInputCounter =
+            reporter.getCounter(TaskCounter.COMBINE_INPUT_RECORDS);
+        combinerRunner =
+            Task.CombinerRunner.create(
+                mrJobConf, mapTask.getTaskID(), combineInputCounter, reporter, 
null);
+        if (combinerRunner != null) {
+          LOG.info(
+              "Map-stage combiner enabled. Warning: This may cause GC issues 
in large jobs. "
+                  + "Consider setting {}=false if experiencing instability",
+              RssMRConfig.RSS_CLIENT_COMBINER_ENABLE);
+        }
+
+      } catch (Exception e) {
+        LOG.error("Get CombinerClass failed", e);
+      }
+    }
 
     int batch =
         RssMRUtils.getInt(
diff --git 
a/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java 
b/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java
index 41f3fbe68..0d61fcf66 100644
--- a/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java
+++ b/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java
@@ -206,6 +206,10 @@ public class RssMRConfig {
   public static final String RSS_REMOTE_MERGE_CLASS_LOADER =
       MR_CONFIG_PREFIX + RssClientConfig.RSS_REMOTE_MERGE_CLASS_LOADER;
 
+  public static final String RSS_CLIENT_COMBINER_ENABLE =
+      MR_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_COMBINER_ENABLE;
+  public static final boolean RSS_CLIENT_COMBINER_ENABLE_DEFAULT = false;
+
   public static RssConf toRssConf(Configuration jobConf) {
     RssConf rssConf = new RssConf();
     for (Map.Entry<String, String> entry : jobConf) {
diff --git 
a/client/src/main/java/org/apache/uniffle/client/util/RssClientConfig.java 
b/client/src/main/java/org/apache/uniffle/client/util/RssClientConfig.java
index a2e60caf7..a4ced2111 100644
--- a/client/src/main/java/org/apache/uniffle/client/util/RssClientConfig.java
+++ b/client/src/main/java/org/apache/uniffle/client/util/RssClientConfig.java
@@ -98,4 +98,6 @@ public class RssClientConfig {
   public static final String RSS_MERGED_BLOCK_SZIE = "rss.merged.block.size";
   public static final int RSS_MERGED_BLOCK_SZIE_DEFAULT = -1;
   public static final String RSS_REMOTE_MERGE_CLASS_LOADER = 
"rss.remote.merge.classloader";
+
+  public static final String RSS_CLIENT_COMBINER_ENABLE = 
"rss.client.combiner.enable";
 }

Reply via email to