This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch branch-0.4
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.4 by this push:
new d3fd5dfc1 [CELEBORN-1363] AbstractRemoteShuffleInputGateFactory
supports celeborn.client.shuffle.compression.codec to configure compression
codec
d3fd5dfc1 is described below
commit d3fd5dfc136b287021dc8861bacc7ffcee9deeee
Author: SteNicholas <[email protected]>
AuthorDate: Mon Apr 1 11:32:44 2024 +0800
[CELEBORN-1363] AbstractRemoteShuffleInputGateFactory supports
celeborn.client.shuffle.compression.codec to configure compression codec
### What changes were proposed in this pull request?
`AbstractRemoteShuffleInputGateFactory` supports
`celeborn.client.shuffle.compression.codec` to configure compression codec.
### Why are the changes needed?
`AbstractRemoteShuffleInputGateFactory` only supports LZ4 compression codec
via hard code at present. `AbstractRemoteShuffleInputGateFactory` should
support `celeborn.client.shuffle.compression.codec` to configure compression
codec like ZSTD etc.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
No.
Closes #2435 from SteNicholas/CELEBORN-1363.
Authored-by: SteNicholas <[email protected]>
Signed-off-by: mingji <[email protected]>
(cherry picked from commit af5c5060f63bc403cb09fef53ef984aa43d47b67)
Signed-off-by: mingji <[email protected]>
---
.../celeborn/plugin/flink/AbstractRemoteShuffleInputGateFactory.java | 5 +----
1 file changed, 1 insertion(+), 4 deletions(-)
diff --git
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGateFactory.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGateFactory.java
index c3bc6c298..a101f182c 100644
---
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGateFactory.java
+++
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGateFactory.java
@@ -44,9 +44,6 @@ public abstract class AbstractRemoteShuffleInputGateFactory {
/** Number of max concurrent reading channels. */
protected final int numConcurrentReading;
- /** Codec used for compression / decompression. */
- protected static final String compressionCodec = "LZ4";
-
/** Network buffer size. */
protected final int networkBufferSize;
@@ -104,7 +101,7 @@ public abstract class AbstractRemoteShuffleInputGateFactory
{
SupplierWithException<BufferPool, IOException> bufferPoolFactory =
createBufferPoolFactory(networkBufferPool, numBuffersPerGate,
supportFloatingBuffers);
BufferDecompressor bufferDecompressor =
- new BufferDecompressor(networkBufferSize, compressionCodec);
+ new BufferDecompressor(networkBufferSize,
celebornConf.shuffleCompressionCodec().name());
return createInputGate(owningTaskName, gateIndex, igdd, bufferPoolFactory,
bufferDecompressor);
}