This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 8cb662524 [#1603] feat(spark): Disable dataPusher initialization for
Spark Driver (#2688)
8cb662524 is described below
commit 8cb662524f7e215391dfb77edfefb0ade8faa745
Author: zhan7236 <[email protected]>
AuthorDate: Mon Dec 1 10:34:28 2025 +0800
[#1603] feat(spark): Disable dataPusher initialization for Spark Driver
(#2688)
### What changes were proposed in this pull request?
This PR disables the `dataPusher` initialization for Spark driver in
cluster mode, as it's only needed for executors to push shuffle data.
### Why are the changes needed?
The `dataPusher` is used to push shuffle data to shuffle servers, which is
an executor-side operation. In cluster mode, driver does not push shuffle data,
so initializing `dataPusher` (along with its thread pool) for driver is
unnecessary and wastes resources.
**Note**: In local mode, driver also acts as executor, so `dataPusher` is
still initialized in local mode.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
- Compiled successfully for both Spark 2 and Spark 3 profiles
- All unit tests pass
- Integration tests pass (tested `CombineByKeyTest` and `GroupByKeyTest`
which were previously failing)
### Summary of Changes:
1. **RssShuffleManagerBase.java**: Wrapped `dataPusher` initialization with
`(!isDriver || isLocalMode)` condition, where `isLocalMode` is determined by
checking if `spark.master` starts with "local"
2. **RssShuffleManager.java (Spark 3)**: Added null check for
`dataPusher.setRssAppId()` calls
3. **RssShuffleManager.java (Spark 2)**: Added null check for
`dataPusher.setRssAppId()` calls
Closes #1603
---
.../shuffle/manager/RssShuffleManagerBase.java | 65 ++++++++++++----------
.../apache/spark/shuffle/RssShuffleManager.java | 8 ++-
.../apache/spark/shuffle/RssShuffleManager.java | 8 ++-
3 files changed, 48 insertions(+), 33 deletions(-)
diff --git
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
index b426d6145..fdd700b44 100644
---
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
+++
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
@@ -343,35 +343,42 @@ public abstract class RssShuffleManagerBase implements
RssShuffleManagerInterfac
this.shuffleWriteClient = createShuffleWriteClient();
registerCoordinator();
- LOG.info("Rss data pusher is starting...");
- int poolSize =
sparkConf.get(RssSparkConfig.RSS_CLIENT_SEND_THREAD_POOL_SIZE);
- int keepAliveTime =
sparkConf.get(RssSparkConfig.RSS_CLIENT_SEND_THREAD_POOL_KEEPALIVE);
-
- boolean overlappingCompressionEnabled =
- rssConf.get(RssSparkConfig.RSS_WRITE_OVERLAPPING_COMPRESSION_ENABLED);
- int overlappingCompressionThreadsPerVcore =
-
rssConf.get(RssSparkConfig.RSS_WRITE_OVERLAPPING_COMPRESSION_THREADS_PER_VCORE);
- if (overlappingCompressionEnabled && overlappingCompressionThreadsPerVcore
> 0) {
- int compressionThreads =
- overlappingCompressionThreadsPerVcore *
sparkConf.getInt(EXECUTOR_CORES, 1);
- this.dataPusher =
- new OverlappingCompressionDataPusher(
- shuffleWriteClient,
- taskToSuccessBlockIds,
- taskToFailedBlockSendTracker,
- failedTaskIds,
- poolSize,
- keepAliveTime,
- compressionThreads);
- } else {
- this.dataPusher =
- new DataPusher(
- shuffleWriteClient,
- taskToSuccessBlockIds,
- taskToFailedBlockSendTracker,
- failedTaskIds,
- poolSize,
- keepAliveTime);
+ // Only initialize dataPusher for executor or for driver in local mode.
+ // In local mode, driver also acts as executor and needs dataPusher.
+ // In cluster mode, pure driver doesn't push shuffle data.
+ String sparkMaster = sparkConf.get("spark.master", "");
+ boolean isLocalMode = sparkMaster.startsWith("local");
+ if (!isDriver || isLocalMode) {
+ LOG.info("Rss data pusher is starting...");
+ int poolSize =
sparkConf.get(RssSparkConfig.RSS_CLIENT_SEND_THREAD_POOL_SIZE);
+ int keepAliveTime =
sparkConf.get(RssSparkConfig.RSS_CLIENT_SEND_THREAD_POOL_KEEPALIVE);
+
+ boolean overlappingCompressionEnabled =
+
rssConf.get(RssSparkConfig.RSS_WRITE_OVERLAPPING_COMPRESSION_ENABLED);
+ int overlappingCompressionThreadsPerVcore =
+
rssConf.get(RssSparkConfig.RSS_WRITE_OVERLAPPING_COMPRESSION_THREADS_PER_VCORE);
+ if (overlappingCompressionEnabled &&
overlappingCompressionThreadsPerVcore > 0) {
+ int compressionThreads =
+ overlappingCompressionThreadsPerVcore *
sparkConf.getInt(EXECUTOR_CORES, 1);
+ this.dataPusher =
+ new OverlappingCompressionDataPusher(
+ shuffleWriteClient,
+ taskToSuccessBlockIds,
+ taskToFailedBlockSendTracker,
+ failedTaskIds,
+ poolSize,
+ keepAliveTime,
+ compressionThreads);
+ } else {
+ this.dataPusher =
+ new DataPusher(
+ shuffleWriteClient,
+ taskToSuccessBlockIds,
+ taskToFailedBlockSendTracker,
+ failedTaskIds,
+ poolSize,
+ keepAliveTime);
+ }
}
this.partitionReassignMaxServerNum =
diff --git
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 5b096c50f..d6c3bd5a4 100644
---
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -110,7 +110,9 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
// will be called many times depend on how many shuffle stage
if ("".equals(appId)) {
appId = SparkEnv.get().conf().getAppId() + "_" + uuid;
- dataPusher.setRssAppId(appId);
+ if (dataPusher != null) {
+ dataPusher.setRssAppId(appId);
+ }
LOG.info("Generate application id used in rss: " + appId);
}
@@ -203,7 +205,9 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
if (handle instanceof RssShuffleHandle) {
RssShuffleHandle<K, V, ?> rssHandle = (RssShuffleHandle<K, V, ?>) handle;
appId = rssHandle.getAppId();
- dataPusher.setRssAppId(appId);
+ if (dataPusher != null) {
+ dataPusher.setRssAppId(appId);
+ }
int shuffleId = rssHandle.getShuffleId();
String taskId = "" + context.taskAttemptId() + "_" +
context.attemptNumber();
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index b81fd479d..776a15329 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -142,7 +142,9 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
if (id.get() == null) {
id.compareAndSet(null, SparkEnv.get().conf().getAppId() + "_" + uuid);
appId = id.get();
- dataPusher.setRssAppId(id.get());
+ if (dataPusher != null) {
+ dataPusher.setRssAppId(id.get());
+ }
}
LOG.info("Generate application id used in rss: " + id.get());
// If stage retry is enabled, the Deterministic status of the ShuffleId
needs to be recorded.
@@ -273,7 +275,9 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
// todo: this implement is tricky, we should refactor it
if (id.get() == null) {
id.compareAndSet(null, rssShuffleHandle.getAppId());
- dataPusher.setRssAppId(id.get());
+ if (dataPusher != null) {
+ dataPusher.setRssAppId(id.get());
+ }
}
}