This is an automated email from the ASF dual-hosted git repository. zuston pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/uniffle.git
The following commit(s) were added to refs/heads/master by this push: new 04964f30e [#2606] feat(mr): Add safety switch for map-stage combiner (#2607) 04964f30e is described below commit 04964f30e367e5d698f2965c287474ded0f0b71b Author: l.zonghai <842315...@qq.com> AuthorDate: Tue Sep 16 10:08:39 2025 +0800 [#2606] feat(mr): Add safety switch for map-stage combiner (#2607) ### What changes were proposed in this pull request? Introduce a configuration `mapreduce.rss.client.combiner.enable` to control whether the map-stage combiner runs in Uniffle MapReduce client. Default value is `false` to prevent job instability caused by large send-buffer GC storm. ### Why are the changes needed? Using map-stage combiner on large send buffer (`mapreduce.task.io.sort.mb * mapreduce.rss.client.sort.memory.use.threshold`) can trigger severe GC overhead, which may stall MapTask and sender threads, leading to job hang. Most users do not require this by default. ### Does this PR introduce _any_ user-facing change? Yes, this adds a new optional configuration for expert users. Default behavior remains stable. ### How was this patch tested? Manually tested with MapReduce jobs with combiners. Verified that jobs run successfully with combiner disabled. 1. **Combiner disabled**: MapTasks completed normally with fast GC cycles. Sample logs: ``` [2025-09-11 19:48:47] S0: 0MB, S1: 0MB, Eden: 299.02MB, Old: 11.53MB, ... Total: 0.101s ... [2025-09-11 19:49:30] S0: 82.88MB, S1: 0MB, Eden: 160.04MB, Old: 485.99MB, ...Total: 6.683s ... [2025-09-11 19:49:57] S0: 0MB, S1: 0MB, Eden: 207.66MB, Old: 532.27MB, ...Total: 12.474s ``` > The MapTask completed successfully within 1 minute. 2. **Combiner enabled**: MapTask GC cycles grew very long; job stalled and was eventually killed. Sample logs: ``` [2025-09-11 19:52:00] S0: 0MB, S1: 0MB, Eden: 80.49MB, Old: 12.86MB, ... Total: 0.054s [2025-09-11 19:52:08] S0: 0MB, S1: 0MB, Eden: 515.53MB, Old: 27.24MB, ... Total: 0.149s ... [2025-09-11 20:01:54] S0: 0MB, S1: 0MB, Eden: 60.36MB, Old: 687.51MB, ... Total: 242.505s ``` > The MapTask did not complete after 9 minutes. These logs demonstrate that disabling the map-stage combiner avoids severe GC overhead and job stalls, validating the safety switch. --------- Co-authored-by: Lobo2008 <842315...@qq.coom> --- .../hadoop/mapred/RssMapOutputCollector.java | 31 +++++++++++++++++----- .../org/apache/hadoop/mapreduce/RssMRConfig.java | 4 +++ .../uniffle/client/util/RssClientConfig.java | 2 ++ 3 files changed, 31 insertions(+), 6 deletions(-) diff --git a/client-mr/core/src/main/java/org/apache/hadoop/mapred/RssMapOutputCollector.java b/client-mr/core/src/main/java/org/apache/hadoop/mapred/RssMapOutputCollector.java index 2db387e3f..0823dbd66 100644 --- a/client-mr/core/src/main/java/org/apache/hadoop/mapred/RssMapOutputCollector.java +++ b/client-mr/core/src/main/java/org/apache/hadoop/mapred/RssMapOutputCollector.java @@ -78,12 +78,31 @@ public class RssMapOutputCollector<K extends Object, V extends Object> throw new IOException("Invalid sort memory use threshold : " + sortThreshold); } - // combiner - final Counters.Counter combineInputCounter = - reporter.getCounter(TaskCounter.COMBINE_INPUT_RECORDS); - combinerRunner = - Task.CombinerRunner.create( - mrJobConf, mapTask.getTaskID(), combineInputCounter, reporter, null); + boolean enableCombiner = + RssMRUtils.getBoolean( + rssJobConf, + RssMRConfig.RSS_CLIENT_COMBINER_ENABLE, + RssMRConfig.RSS_CLIENT_COMBINER_ENABLE_DEFAULT); + + combinerRunner = null; + if (enableCombiner) { + try { + final Counters.Counter combineInputCounter = + reporter.getCounter(TaskCounter.COMBINE_INPUT_RECORDS); + combinerRunner = + Task.CombinerRunner.create( + mrJobConf, mapTask.getTaskID(), combineInputCounter, reporter, null); + if (combinerRunner != null) { + LOG.info( + "Map-stage combiner enabled. Warning: This may cause GC issues in large jobs. " + + "Consider setting {}=false if experiencing instability", + RssMRConfig.RSS_CLIENT_COMBINER_ENABLE); + } + + } catch (Exception e) { + LOG.error("Get CombinerClass failed", e); + } + } int batch = RssMRUtils.getInt( diff --git a/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java b/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java index 41f3fbe68..0d61fcf66 100644 --- a/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java +++ b/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java @@ -206,6 +206,10 @@ public class RssMRConfig { public static final String RSS_REMOTE_MERGE_CLASS_LOADER = MR_CONFIG_PREFIX + RssClientConfig.RSS_REMOTE_MERGE_CLASS_LOADER; + public static final String RSS_CLIENT_COMBINER_ENABLE = + MR_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_COMBINER_ENABLE; + public static final boolean RSS_CLIENT_COMBINER_ENABLE_DEFAULT = false; + public static RssConf toRssConf(Configuration jobConf) { RssConf rssConf = new RssConf(); for (Map.Entry<String, String> entry : jobConf) { diff --git a/client/src/main/java/org/apache/uniffle/client/util/RssClientConfig.java b/client/src/main/java/org/apache/uniffle/client/util/RssClientConfig.java index a2e60caf7..a4ced2111 100644 --- a/client/src/main/java/org/apache/uniffle/client/util/RssClientConfig.java +++ b/client/src/main/java/org/apache/uniffle/client/util/RssClientConfig.java @@ -98,4 +98,6 @@ public class RssClientConfig { public static final String RSS_MERGED_BLOCK_SZIE = "rss.merged.block.size"; public static final int RSS_MERGED_BLOCK_SZIE_DEFAULT = -1; public static final String RSS_REMOTE_MERGE_CLASS_LOADER = "rss.remote.merge.classloader"; + + public static final String RSS_CLIENT_COMBINER_ENABLE = "rss.client.combiner.enable"; }