This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 8cb662524 [#1603] feat(spark): Disable dataPusher initialization for 
Spark Driver (#2688)
8cb662524 is described below

commit 8cb662524f7e215391dfb77edfefb0ade8faa745
Author: zhan7236 <[email protected]>
AuthorDate: Mon Dec 1 10:34:28 2025 +0800

    [#1603] feat(spark): Disable dataPusher initialization for Spark Driver 
(#2688)
    
    ### What changes were proposed in this pull request?
    
    This PR disables the `dataPusher` initialization for Spark driver in 
cluster mode, as it's only needed for executors to push shuffle data.
    
    ### Why are the changes needed?
    
    The `dataPusher` is used to push shuffle data to shuffle servers, which is 
an executor-side operation. In cluster mode, driver does not push shuffle data, 
so initializing `dataPusher` (along with its thread pool) for driver is 
unnecessary and wastes resources.
    
    **Note**: In local mode, driver also acts as executor, so `dataPusher` is 
still initialized in local mode.
    
    ### Does this PR introduce any user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    - Compiled successfully for both Spark 2 and Spark 3 profiles
    - All unit tests pass
    - Integration tests pass (tested `CombineByKeyTest` and `GroupByKeyTest` 
which were previously failing)
    
    ### Summary of Changes:
    1. **RssShuffleManagerBase.java**: Wrapped `dataPusher` initialization with 
`(!isDriver || isLocalMode)` condition, where `isLocalMode` is determined by 
checking if `spark.master` starts with "local"
    2. **RssShuffleManager.java (Spark 3)**: Added null check for 
`dataPusher.setRssAppId()` calls
    3. **RssShuffleManager.java (Spark 2)**: Added null check for 
`dataPusher.setRssAppId()` calls
    
    Closes #1603
---
 .../shuffle/manager/RssShuffleManagerBase.java     | 65 ++++++++++++----------
 .../apache/spark/shuffle/RssShuffleManager.java    |  8 ++-
 .../apache/spark/shuffle/RssShuffleManager.java    |  8 ++-
 3 files changed, 48 insertions(+), 33 deletions(-)

diff --git 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
index b426d6145..fdd700b44 100644
--- 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
+++ 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
@@ -343,35 +343,42 @@ public abstract class RssShuffleManagerBase implements 
RssShuffleManagerInterfac
     this.shuffleWriteClient = createShuffleWriteClient();
     registerCoordinator();
 
-    LOG.info("Rss data pusher is starting...");
-    int poolSize = 
sparkConf.get(RssSparkConfig.RSS_CLIENT_SEND_THREAD_POOL_SIZE);
-    int keepAliveTime = 
sparkConf.get(RssSparkConfig.RSS_CLIENT_SEND_THREAD_POOL_KEEPALIVE);
-
-    boolean overlappingCompressionEnabled =
-        rssConf.get(RssSparkConfig.RSS_WRITE_OVERLAPPING_COMPRESSION_ENABLED);
-    int overlappingCompressionThreadsPerVcore =
-        
rssConf.get(RssSparkConfig.RSS_WRITE_OVERLAPPING_COMPRESSION_THREADS_PER_VCORE);
-    if (overlappingCompressionEnabled && overlappingCompressionThreadsPerVcore 
> 0) {
-      int compressionThreads =
-          overlappingCompressionThreadsPerVcore * 
sparkConf.getInt(EXECUTOR_CORES, 1);
-      this.dataPusher =
-          new OverlappingCompressionDataPusher(
-              shuffleWriteClient,
-              taskToSuccessBlockIds,
-              taskToFailedBlockSendTracker,
-              failedTaskIds,
-              poolSize,
-              keepAliveTime,
-              compressionThreads);
-    } else {
-      this.dataPusher =
-          new DataPusher(
-              shuffleWriteClient,
-              taskToSuccessBlockIds,
-              taskToFailedBlockSendTracker,
-              failedTaskIds,
-              poolSize,
-              keepAliveTime);
+    // Only initialize dataPusher for executor or for driver in local mode.
+    // In local mode, driver also acts as executor and needs dataPusher.
+    // In cluster mode, pure driver doesn't push shuffle data.
+    String sparkMaster = sparkConf.get("spark.master", "");
+    boolean isLocalMode = sparkMaster.startsWith("local");
+    if (!isDriver || isLocalMode) {
+      LOG.info("Rss data pusher is starting...");
+      int poolSize = 
sparkConf.get(RssSparkConfig.RSS_CLIENT_SEND_THREAD_POOL_SIZE);
+      int keepAliveTime = 
sparkConf.get(RssSparkConfig.RSS_CLIENT_SEND_THREAD_POOL_KEEPALIVE);
+
+      boolean overlappingCompressionEnabled =
+          
rssConf.get(RssSparkConfig.RSS_WRITE_OVERLAPPING_COMPRESSION_ENABLED);
+      int overlappingCompressionThreadsPerVcore =
+          
rssConf.get(RssSparkConfig.RSS_WRITE_OVERLAPPING_COMPRESSION_THREADS_PER_VCORE);
+      if (overlappingCompressionEnabled && 
overlappingCompressionThreadsPerVcore > 0) {
+        int compressionThreads =
+            overlappingCompressionThreadsPerVcore * 
sparkConf.getInt(EXECUTOR_CORES, 1);
+        this.dataPusher =
+            new OverlappingCompressionDataPusher(
+                shuffleWriteClient,
+                taskToSuccessBlockIds,
+                taskToFailedBlockSendTracker,
+                failedTaskIds,
+                poolSize,
+                keepAliveTime,
+                compressionThreads);
+      } else {
+        this.dataPusher =
+            new DataPusher(
+                shuffleWriteClient,
+                taskToSuccessBlockIds,
+                taskToFailedBlockSendTracker,
+                failedTaskIds,
+                poolSize,
+                keepAliveTime);
+      }
     }
 
     this.partitionReassignMaxServerNum =
diff --git 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 5b096c50f..d6c3bd5a4 100644
--- 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -110,7 +110,9 @@ public class RssShuffleManager extends 
RssShuffleManagerBase {
     // will be called many times depend on how many shuffle stage
     if ("".equals(appId)) {
       appId = SparkEnv.get().conf().getAppId() + "_" + uuid;
-      dataPusher.setRssAppId(appId);
+      if (dataPusher != null) {
+        dataPusher.setRssAppId(appId);
+      }
       LOG.info("Generate application id used in rss: " + appId);
     }
 
@@ -203,7 +205,9 @@ public class RssShuffleManager extends 
RssShuffleManagerBase {
     if (handle instanceof RssShuffleHandle) {
       RssShuffleHandle<K, V, ?> rssHandle = (RssShuffleHandle<K, V, ?>) handle;
       appId = rssHandle.getAppId();
-      dataPusher.setRssAppId(appId);
+      if (dataPusher != null) {
+        dataPusher.setRssAppId(appId);
+      }
 
       int shuffleId = rssHandle.getShuffleId();
       String taskId = "" + context.taskAttemptId() + "_" + 
context.attemptNumber();
diff --git 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index b81fd479d..776a15329 100644
--- 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -142,7 +142,9 @@ public class RssShuffleManager extends 
RssShuffleManagerBase {
     if (id.get() == null) {
       id.compareAndSet(null, SparkEnv.get().conf().getAppId() + "_" + uuid);
       appId = id.get();
-      dataPusher.setRssAppId(id.get());
+      if (dataPusher != null) {
+        dataPusher.setRssAppId(id.get());
+      }
     }
     LOG.info("Generate application id used in rss: " + id.get());
     // If stage retry is enabled, the Deterministic status of the ShuffleId 
needs to be recorded.
@@ -273,7 +275,9 @@ public class RssShuffleManager extends 
RssShuffleManagerBase {
     // todo: this implement is tricky, we should refactor it
     if (id.get() == null) {
       id.compareAndSet(null, rssShuffleHandle.getAppId());
-      dataPusher.setRssAppId(id.get());
+      if (dataPusher != null) {
+        dataPusher.setRssAppId(id.get());
+      }
     }
   }
 

Reply via email to