This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch branch-0.5
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.5 by this push:
     new 5eda851ae [CELEBORN-1625] Add parameter `skipCompress` for 
pushOrMergeData
5eda851ae is described below

commit 5eda851aeb9bd72caf9b668ff6abdac4705f9ab6
Author: xiyu.zk <[email protected]>
AuthorDate: Mon Sep 30 09:49:12 2024 +0800

    [CELEBORN-1625] Add parameter `skipCompress` for pushOrMergeData
    
    ### What changes were proposed in this pull request?
    Add new parameters to the pushOrMergeData method for gluten
    
    ### Why are the changes needed?
    Currently, in the scenario of Gluten Fallback, it is possible for a stage 
to have both Native and Java shuffles simultaneously. Since the ShuffleClient 
is a singleton, the compression settings for the ShuffleClient will only 
utilize either the Native or Java configuration. By adding new parameters, we 
aim to allow Gluten to control whether compression should be applied in this 
scenario.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    CI
    
    Closes #2769 from kerwin-zk/CELEBORN-1625.
    
    Authored-by: xiyu.zk <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
    (cherry picked from commit 3b2e70cafaa1ce42525dc287556901fe955bec24)
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../main/java/org/apache/celeborn/client/ShuffleClientImpl.java  | 9 ++++++---
 1 file changed, 6 insertions(+), 3 deletions(-)

diff --git 
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java 
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index c2d9a6358..1c86182d7 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -889,7 +889,8 @@ public class ShuffleClientImpl extends ShuffleClient {
       int length,
       int numMappers,
       int numPartitions,
-      boolean doPush)
+      boolean doPush,
+      boolean skipCompress)
       throws IOException {
     // mapKey
     final String mapKey = Utils.makeMapKey(shuffleId, mapId, attemptId);
@@ -954,7 +955,7 @@ public class ShuffleClientImpl extends ShuffleClient {
     // increment batchId
     final int nextBatchId = pushState.nextBatchId();
 
-    if (shuffleCompressionEnabled) {
+    if (shuffleCompressionEnabled && !skipCompress) {
       // compress data
       final Compressor compressor = compressorThreadLocal.get();
       compressor.compress(data, offset, length);
@@ -1260,7 +1261,8 @@ public class ShuffleClientImpl extends ShuffleClient {
         length,
         numMappers,
         numPartitions,
-        true);
+        true,
+        false);
   }
 
   @Override
@@ -1294,6 +1296,7 @@ public class ShuffleClientImpl extends ShuffleClient {
         length,
         numMappers,
         numPartitions,
+        false,
         false);
   }
 

Reply via email to