This is an automated email from the ASF dual-hosted git repository.

kerwinzhang pushed a commit to branch celeborn-755
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/celeborn-755 by this push:
     new 31a699914 [CELEBORN-755] Support to decide whether to compress shuffle 
data through configuration
31a699914 is described below

commit 31a6999142ead478e249d9f78bf050758070a1e9
Author: xiyu.zk <[email protected]>
AuthorDate: Fri Jun 30 17:52:01 2023 +0800

    [CELEBORN-755] Support to decide whether to compress shuffle data through 
configuration
---
 .../main/java/org/apache/celeborn/client/ShuffleClientImpl.java   | 4 +++-
 .../main/java/org/apache/celeborn/client/read/RssInputStream.java | 8 +++++---
 2 files changed, 8 insertions(+), 4 deletions(-)

diff --git 
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java 
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index faa446338..a59c54ebb 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -101,6 +101,7 @@ public class ShuffleClientImpl extends ShuffleClient {
   protected final Map<String, PushState> pushStates = 
JavaUtils.newConcurrentHashMap();
 
   private final boolean pushExcludeWorkerOnFailureEnabled;
+  private final boolean shuffleCompressionEnabled;
   private final Set<String> pushExcludedWorkers = 
ConcurrentHashMap.newKeySet();
   private final ConcurrentHashMap<String, Long> fetchExcludedWorkers =
       JavaUtils.newConcurrentHashMap();
@@ -164,6 +165,7 @@ public class ShuffleClientImpl extends ShuffleClient {
     testRetryRevive = conf.testRetryRevive();
     pushBufferMaxSize = conf.clientPushBufferMaxSize();
     pushExcludeWorkerOnFailureEnabled = 
conf.clientPushExcludeWorkerOnFailureEnabled();
+    shuffleCompressionEnabled = conf.shuffleCompressionEnabled();
     if (conf.clientPushReplicateEnabled()) {
       pushDataTimeout = conf.pushDataTimeoutMs() * 2;
     } else {
@@ -837,7 +839,7 @@ public class ShuffleClientImpl extends ShuffleClient {
     int totalSize = data.length;
     byte[] shuffleDataBuf = data;
 
-    if (conf.shuffleCompressionEnabled()) {
+    if (shuffleCompressionEnabled) {
       // compress data
       final Compressor compressor = compressorThreadLocal.get();
       compressor.compress(data, offset, length);
diff --git 
a/client/src/main/java/org/apache/celeborn/client/read/RssInputStream.java 
b/client/src/main/java/org/apache/celeborn/client/read/RssInputStream.java
index f3615438f..41f6709d2 100644
--- a/client/src/main/java/org/apache/celeborn/client/read/RssInputStream.java
+++ b/client/src/main/java/org/apache/celeborn/client/read/RssInputStream.java
@@ -131,6 +131,7 @@ public abstract class RssInputStream extends InputStream {
 
     private boolean pushReplicateEnabled;
     private boolean fetchExcludeWorkerOnFailureEnabled;
+    private boolean shuffleCompressionEnabled;
     private long fetchExcludedWorkerExpireTimeout;
     private final ConcurrentHashMap<String, Long> fetchExcludedWorkers;
 
@@ -156,11 +157,12 @@ public abstract class RssInputStream extends InputStream {
       this.rangeReadFilter = conf.shuffleRangeReadFilterEnabled();
       this.pushReplicateEnabled = conf.clientPushReplicateEnabled();
       this.fetchExcludeWorkerOnFailureEnabled = 
conf.clientFetchExcludeWorkerOnFailureEnabled();
+      this.shuffleCompressionEnabled = conf.shuffleCompressionEnabled();
       this.fetchExcludedWorkerExpireTimeout = 
conf.clientFetchExcludedWorkerExpireTimeout();
       this.fetchExcludedWorkers = fetchExcludedWorkers;
 
       int blockSize = conf.clientPushBufferMaxSize();
-      if (conf.shuffleCompressionEnabled()) {
+      if (shuffleCompressionEnabled) {
         int headerLen = Decompressor.getCompressionHeaderLength(conf);
         blockSize = conf.clientPushBufferMaxSize() + headerLen;
         compressedBuf = new byte[blockSize];
@@ -516,7 +518,7 @@ public abstract class RssInputStream extends InputStream {
         int batchId = Platform.getInt(sizeBuf, Platform.BYTE_ARRAY_OFFSET + 8);
         int size = Platform.getInt(sizeBuf, Platform.BYTE_ARRAY_OFFSET + 12);
 
-        if (conf.shuffleCompressionEnabled()) {
+        if (shuffleCompressionEnabled) {
           if (size > compressedBuf.length) {
             compressedBuf = new byte[size];
           }
@@ -542,7 +544,7 @@ public abstract class RssInputStream extends InputStream {
             if (callback != null) {
               callback.incBytesRead(BATCH_HEADER_SIZE + size);
             }
-            if (conf.shuffleCompressionEnabled()) {
+            if (shuffleCompressionEnabled) {
               // decompress data
               int originalLength = decompressor.getOriginalLen(compressedBuf);
               if (rawDataBuf.length < originalLength) {

Reply via email to