This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch branch-0.4
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.4 by this push:
     new d3fd5dfc1 [CELEBORN-1363] AbstractRemoteShuffleInputGateFactory 
supports celeborn.client.shuffle.compression.codec to configure compression 
codec
d3fd5dfc1 is described below

commit d3fd5dfc136b287021dc8861bacc7ffcee9deeee
Author: SteNicholas <[email protected]>
AuthorDate: Mon Apr 1 11:32:44 2024 +0800

    [CELEBORN-1363] AbstractRemoteShuffleInputGateFactory supports 
celeborn.client.shuffle.compression.codec to configure compression codec
    
    ### What changes were proposed in this pull request?
    
    `AbstractRemoteShuffleInputGateFactory` supports 
`celeborn.client.shuffle.compression.codec` to configure compression codec.
    
    ### Why are the changes needed?
    
    `AbstractRemoteShuffleInputGateFactory` only supports LZ4 compression codec 
via hard code at present. `AbstractRemoteShuffleInputGateFactory` should 
support `celeborn.client.shuffle.compression.codec` to configure compression 
codec like ZSTD etc.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    No.
    
    Closes #2435 from SteNicholas/CELEBORN-1363.
    
    Authored-by: SteNicholas <[email protected]>
    Signed-off-by: mingji <[email protected]>
    (cherry picked from commit af5c5060f63bc403cb09fef53ef984aa43d47b67)
    Signed-off-by: mingji <[email protected]>
---
 .../celeborn/plugin/flink/AbstractRemoteShuffleInputGateFactory.java | 5 +----
 1 file changed, 1 insertion(+), 4 deletions(-)

diff --git 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGateFactory.java
 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGateFactory.java
index c3bc6c298..a101f182c 100644
--- 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGateFactory.java
+++ 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/AbstractRemoteShuffleInputGateFactory.java
@@ -44,9 +44,6 @@ public abstract class AbstractRemoteShuffleInputGateFactory {
   /** Number of max concurrent reading channels. */
   protected final int numConcurrentReading;
 
-  /** Codec used for compression / decompression. */
-  protected static final String compressionCodec = "LZ4";
-
   /** Network buffer size. */
   protected final int networkBufferSize;
 
@@ -104,7 +101,7 @@ public abstract class AbstractRemoteShuffleInputGateFactory 
{
     SupplierWithException<BufferPool, IOException> bufferPoolFactory =
         createBufferPoolFactory(networkBufferPool, numBuffersPerGate, 
supportFloatingBuffers);
     BufferDecompressor bufferDecompressor =
-        new BufferDecompressor(networkBufferSize, compressionCodec);
+        new BufferDecompressor(networkBufferSize, 
celebornConf.shuffleCompressionCodec().name());
 
     return createInputGate(owningTaskName, gateIndex, igdd, bufferPoolFactory, 
bufferDecompressor);
   }

Reply via email to