This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch branch-0.5
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.5 by this push:
new 5eda851ae [CELEBORN-1625] Add parameter `skipCompress` for
pushOrMergeData
5eda851ae is described below
commit 5eda851aeb9bd72caf9b668ff6abdac4705f9ab6
Author: xiyu.zk <[email protected]>
AuthorDate: Mon Sep 30 09:49:12 2024 +0800
[CELEBORN-1625] Add parameter `skipCompress` for pushOrMergeData
### What changes were proposed in this pull request?
Add new parameters to the pushOrMergeData method for gluten
### Why are the changes needed?
Currently, in the scenario of Gluten Fallback, it is possible for a stage
to have both Native and Java shuffles simultaneously. Since the ShuffleClient
is a singleton, the compression settings for the ShuffleClient will only
utilize either the Native or Java configuration. By adding new parameters, we
aim to allow Gluten to control whether compression should be applied in this
scenario.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
CI
Closes #2769 from kerwin-zk/CELEBORN-1625.
Authored-by: xiyu.zk <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
(cherry picked from commit 3b2e70cafaa1ce42525dc287556901fe955bec24)
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../main/java/org/apache/celeborn/client/ShuffleClientImpl.java | 9 ++++++---
1 file changed, 6 insertions(+), 3 deletions(-)
diff --git
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index c2d9a6358..1c86182d7 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -889,7 +889,8 @@ public class ShuffleClientImpl extends ShuffleClient {
int length,
int numMappers,
int numPartitions,
- boolean doPush)
+ boolean doPush,
+ boolean skipCompress)
throws IOException {
// mapKey
final String mapKey = Utils.makeMapKey(shuffleId, mapId, attemptId);
@@ -954,7 +955,7 @@ public class ShuffleClientImpl extends ShuffleClient {
// increment batchId
final int nextBatchId = pushState.nextBatchId();
- if (shuffleCompressionEnabled) {
+ if (shuffleCompressionEnabled && !skipCompress) {
// compress data
final Compressor compressor = compressorThreadLocal.get();
compressor.compress(data, offset, length);
@@ -1260,7 +1261,8 @@ public class ShuffleClientImpl extends ShuffleClient {
length,
numMappers,
numPartitions,
- true);
+ true,
+ false);
}
@Override
@@ -1294,6 +1296,7 @@ public class ShuffleClientImpl extends ShuffleClient {
length,
numMappers,
numPartitions,
+ false,
false);
}