This is an automated email from the ASF dual-hosted git repository.
kerwinzhang pushed a commit to branch celeborn-755
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/celeborn-755 by this push:
new 6853ee6ea [CELEBORN-755] Support to decide whether to compress shuffle
data through configuration
6853ee6ea is described below
commit 6853ee6ea9834560901db621d02fae4c369659fb
Author: xiyu.zk <[email protected]>
AuthorDate: Fri Jun 30 18:21:46 2023 +0800
[CELEBORN-755] Support to decide whether to compress shuffle data through
configuration
---
client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java | 2 +-
.../src/main/java/org/apache/celeborn/client/read/RssInputStream.java | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
diff --git
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index f9ef18842..79443187d 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -165,7 +165,7 @@ public class ShuffleClientImpl extends ShuffleClient {
testRetryRevive = conf.testRetryRevive();
pushBufferMaxSize = conf.clientPushBufferMaxSize();
pushExcludeWorkerOnFailureEnabled =
conf.clientPushExcludeWorkerOnFailureEnabled();
- shuffleCompressionEnabled =
conf.shuffleCompressionCodec().equals(CompressionCodec.NONE);
+ shuffleCompressionEnabled =
!conf.shuffleCompressionCodec().equals(CompressionCodec.NONE);
if (conf.clientPushReplicateEnabled()) {
pushDataTimeout = conf.pushDataTimeoutMs() * 2;
} else {
diff --git
a/client/src/main/java/org/apache/celeborn/client/read/RssInputStream.java
b/client/src/main/java/org/apache/celeborn/client/read/RssInputStream.java
index 4821b871a..3db936a95 100644
--- a/client/src/main/java/org/apache/celeborn/client/read/RssInputStream.java
+++ b/client/src/main/java/org/apache/celeborn/client/read/RssInputStream.java
@@ -158,7 +158,7 @@ public abstract class RssInputStream extends InputStream {
this.rangeReadFilter = conf.shuffleRangeReadFilterEnabled();
this.pushReplicateEnabled = conf.clientPushReplicateEnabled();
this.fetchExcludeWorkerOnFailureEnabled =
conf.clientFetchExcludeWorkerOnFailureEnabled();
- this.shuffleCompressionEnabled =
conf.shuffleCompressionCodec().equals(CompressionCodec.NONE);
+ this.shuffleCompressionEnabled =
!conf.shuffleCompressionCodec().equals(CompressionCodec.NONE);
this.fetchExcludedWorkerExpireTimeout =
conf.clientFetchExcludedWorkerExpireTimeout();
this.fetchExcludedWorkers = fetchExcludedWorkers;