This is an automated email from the ASF dual-hosted git repository.
kerwinzhang pushed a commit to branch celeborn-755
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/celeborn-755 by this push:
new 31a699914 [CELEBORN-755] Support to decide whether to compress shuffle
data through configuration
31a699914 is described below
commit 31a6999142ead478e249d9f78bf050758070a1e9
Author: xiyu.zk <[email protected]>
AuthorDate: Fri Jun 30 17:52:01 2023 +0800
[CELEBORN-755] Support to decide whether to compress shuffle data through
configuration
---
.../main/java/org/apache/celeborn/client/ShuffleClientImpl.java | 4 +++-
.../main/java/org/apache/celeborn/client/read/RssInputStream.java | 8 +++++---
2 files changed, 8 insertions(+), 4 deletions(-)
diff --git
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index faa446338..a59c54ebb 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -101,6 +101,7 @@ public class ShuffleClientImpl extends ShuffleClient {
protected final Map<String, PushState> pushStates =
JavaUtils.newConcurrentHashMap();
private final boolean pushExcludeWorkerOnFailureEnabled;
+ private final boolean shuffleCompressionEnabled;
private final Set<String> pushExcludedWorkers =
ConcurrentHashMap.newKeySet();
private final ConcurrentHashMap<String, Long> fetchExcludedWorkers =
JavaUtils.newConcurrentHashMap();
@@ -164,6 +165,7 @@ public class ShuffleClientImpl extends ShuffleClient {
testRetryRevive = conf.testRetryRevive();
pushBufferMaxSize = conf.clientPushBufferMaxSize();
pushExcludeWorkerOnFailureEnabled =
conf.clientPushExcludeWorkerOnFailureEnabled();
+ shuffleCompressionEnabled = conf.shuffleCompressionEnabled();
if (conf.clientPushReplicateEnabled()) {
pushDataTimeout = conf.pushDataTimeoutMs() * 2;
} else {
@@ -837,7 +839,7 @@ public class ShuffleClientImpl extends ShuffleClient {
int totalSize = data.length;
byte[] shuffleDataBuf = data;
- if (conf.shuffleCompressionEnabled()) {
+ if (shuffleCompressionEnabled) {
// compress data
final Compressor compressor = compressorThreadLocal.get();
compressor.compress(data, offset, length);
diff --git
a/client/src/main/java/org/apache/celeborn/client/read/RssInputStream.java
b/client/src/main/java/org/apache/celeborn/client/read/RssInputStream.java
index f3615438f..41f6709d2 100644
--- a/client/src/main/java/org/apache/celeborn/client/read/RssInputStream.java
+++ b/client/src/main/java/org/apache/celeborn/client/read/RssInputStream.java
@@ -131,6 +131,7 @@ public abstract class RssInputStream extends InputStream {
private boolean pushReplicateEnabled;
private boolean fetchExcludeWorkerOnFailureEnabled;
+ private boolean shuffleCompressionEnabled;
private long fetchExcludedWorkerExpireTimeout;
private final ConcurrentHashMap<String, Long> fetchExcludedWorkers;
@@ -156,11 +157,12 @@ public abstract class RssInputStream extends InputStream {
this.rangeReadFilter = conf.shuffleRangeReadFilterEnabled();
this.pushReplicateEnabled = conf.clientPushReplicateEnabled();
this.fetchExcludeWorkerOnFailureEnabled =
conf.clientFetchExcludeWorkerOnFailureEnabled();
+ this.shuffleCompressionEnabled = conf.shuffleCompressionEnabled();
this.fetchExcludedWorkerExpireTimeout =
conf.clientFetchExcludedWorkerExpireTimeout();
this.fetchExcludedWorkers = fetchExcludedWorkers;
int blockSize = conf.clientPushBufferMaxSize();
- if (conf.shuffleCompressionEnabled()) {
+ if (shuffleCompressionEnabled) {
int headerLen = Decompressor.getCompressionHeaderLength(conf);
blockSize = conf.clientPushBufferMaxSize() + headerLen;
compressedBuf = new byte[blockSize];
@@ -516,7 +518,7 @@ public abstract class RssInputStream extends InputStream {
int batchId = Platform.getInt(sizeBuf, Platform.BYTE_ARRAY_OFFSET + 8);
int size = Platform.getInt(sizeBuf, Platform.BYTE_ARRAY_OFFSET + 12);
- if (conf.shuffleCompressionEnabled()) {
+ if (shuffleCompressionEnabled) {
if (size > compressedBuf.length) {
compressedBuf = new byte[size];
}
@@ -542,7 +544,7 @@ public abstract class RssInputStream extends InputStream {
if (callback != null) {
callback.incBytesRead(BATCH_HEADER_SIZE + size);
}
- if (conf.shuffleCompressionEnabled()) {
+ if (shuffleCompressionEnabled) {
// decompress data
int originalLength = decompressor.getOriginalLen(compressedBuf);
if (rawDataBuf.length < originalLength) {