This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch release-1.20
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.20 by this push:
     new 52c72f2f2df [FLINK-36021][network] Delegating the responsibility for 
compression to every tier
52c72f2f2df is described below

commit 52c72f2f2dfcc85d2e5ce78d866f6895e7f65adc
Author: Weijie Guo <[email protected]>
AuthorDate: Fri Aug 9 15:16:57 2024 +0800

    [FLINK-36021][network] Delegating the responsibility for compression to 
every tier
---
 .../hybrid/tiered/common/TieredStorageUtils.java   | 42 ++++++++++++++++++++++
 .../storage/TieredStorageProducerClient.java       | 24 +++----------
 .../hybrid/tiered/tier/disk/DiskTierFactory.java   | 10 +++++-
 .../tiered/tier/disk/DiskTierProducerAgent.java    | 15 +++++---
 .../tiered/tier/memory/MemoryTierFactory.java      | 10 +++++-
 .../tier/memory/MemoryTierProducerAgent.java       | 16 ++++++---
 .../tiered/tier/remote/RemoteTierFactory.java      |  9 ++++-
 .../tier/remote/RemoteTierProducerAgent.java       | 16 ++++++---
 .../tier/disk/DiskTierProducerAgentTest.java       |  3 +-
 .../tier/memory/MemoryTierProducerAgentTest.java   |  6 ++--
 10 files changed, 113 insertions(+), 38 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageUtils.java
index 8565daf14d2..562b7c529d7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageUtils.java
@@ -19,11 +19,16 @@
 package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common;
 
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import 
org.apache.flink.configuration.NettyShuffleEnvironmentOptions.CompressionCodec;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
 import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.BufferAccumulator;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.HashBufferAccumulator;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.SortBufferAccumulator;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierFactory;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.disk.DiskTierFactory;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.memory.MemoryTierFactory;
@@ -32,6 +37,8 @@ import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.remote.R
 import java.nio.ByteBuffer;
 import java.util.List;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /** Utils for reading from or writing to tiered storage. */
 public class TieredStorageUtils {
 
@@ -128,4 +135,39 @@ public class TieredStorageUtils {
         bufferWithHeaders[index] = header;
         bufferWithHeaders[index + 1] = buffer.getNioBufferReadable();
     }
+
+    /** Try compress buffer if possible. */
+    public static Buffer compressBufferIfPossible(
+            Buffer buffer, BufferCompressor bufferCompressor) {
+        if (!canBeCompressed(buffer, bufferCompressor)) {
+            return buffer;
+        }
+
+        return checkNotNull(bufferCompressor).compressToOriginalBuffer(buffer);
+    }
+
+    /**
+     * Whether the buffer can be compressed or not. Note that event is not 
compressed because it is
+     * usually small and the size can become even larger after compression.
+     */
+    public static boolean canBeCompressed(Buffer buffer, BufferCompressor 
bufferCompressor) {
+        return bufferCompressor != null && buffer.isBuffer() && 
buffer.readableBytes() > 0;
+    }
+
+    /**
+     * Construct the {@link BufferCompressor} from configuration.
+     *
+     * <p>Note: This is just a workaround for released version as we can not 
change the interface of
+     * {@link TierFactory}.
+     */
+    public static BufferCompressor buildBufferCompressor(
+            int bufferSizeBytes, Configuration configuration) {
+        CompressionCodec compressionCodec =
+                
configuration.get(NettyShuffleEnvironmentOptions.SHUFFLE_COMPRESSION_CODEC);
+        boolean compressionEnabled =
+                
configuration.get(NettyShuffleEnvironmentOptions.BATCH_SHUFFLE_COMPRESSION_ENABLED);
+        return compressionEnabled && compressionCodec != CompressionCodec.NONE
+                ? new BufferCompressor(bufferSizeBytes, compressionCodec)
+                : null;
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java
index 80a712d9146..4586444e91b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java
@@ -145,21 +145,21 @@ public class TieredStorageProducerClient {
             TieredStorageSubpartitionId subpartitionId,
             Buffer accumulatedBuffer,
             int numRemainingConsecutiveBuffers) {
+        int unCompressedSize = accumulatedBuffer.readableBytes();
         try {
-            Buffer compressedBuffer = 
compressBufferIfPossible(accumulatedBuffer);
             if 
(currentSubpartitionTierAgent[subpartitionId.getSubpartitionId()] == null) {
                 chooseStorageTierToStartSegment(subpartitionId, 
numRemainingConsecutiveBuffers + 1);
             }
             if 
(!currentSubpartitionTierAgent[subpartitionId.getSubpartitionId()].tryWrite(
                     subpartitionId,
-                    compressedBuffer,
+                    accumulatedBuffer,
                     bufferAccumulator,
                     numRemainingConsecutiveBuffers)) {
                 chooseStorageTierToStartSegment(subpartitionId, 
numRemainingConsecutiveBuffers + 1);
                 checkState(
                         
currentSubpartitionTierAgent[subpartitionId.getSubpartitionId()].tryWrite(
                                 subpartitionId,
-                                compressedBuffer,
+                                accumulatedBuffer,
                                 bufferAccumulator,
                                 numRemainingConsecutiveBuffers),
                         "Failed to write the first buffer to the new segment");
@@ -168,7 +168,7 @@ public class TieredStorageProducerClient {
             accumulatedBuffer.recycleBuffer();
             ExceptionUtils.rethrow(ioe);
         }
-        updateMetricStatistics(1, accumulatedBuffer.readableBytes());
+        updateMetricStatistics(1, unCompressedSize);
     }
 
     private void chooseStorageTierToStartSegment(
@@ -189,22 +189,6 @@ public class TieredStorageProducerClient {
         throw new IOException("Failed to choose a storage tier to start a new 
segment.");
     }
 
-    private Buffer compressBufferIfPossible(Buffer buffer) {
-        if (!canBeCompressed(buffer)) {
-            return buffer;
-        }
-
-        return checkNotNull(bufferCompressor).compressToOriginalBuffer(buffer);
-    }
-
-    /**
-     * Whether the buffer can be compressed or not. Note that event is not 
compressed because it is
-     * usually small and the size can become even larger after compression.
-     */
-    private boolean canBeCompressed(Buffer buffer) {
-        return bufferCompressor != null && buffer.isBuffer() && 
buffer.readableBytes() > 0;
-    }
-
     private void updateMetricStatistics(int numWriteBuffersDelta, int 
numWriteBytesDelta) {
         checkNotNull(metricStatisticsUpdater)
                 .accept(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierFactory.java
index 786aab43c5f..a750fe17e1e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierFactory.java
@@ -38,15 +38,19 @@ import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProd
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleDescriptor;
 import org.apache.flink.runtime.util.ConfigurationParserUtils;
 
+import javax.annotation.Nullable;
+
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.time.Duration;
 import java.util.List;
 import java.util.concurrent.ScheduledExecutorService;
 
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.buildBufferCompressor;
 import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.getDiskTierName;
 import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.ProducerMergedPartitionFile.DATA_FILE_SUFFIX;
 import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.ProducerMergedPartitionFile.INDEX_FILE_SUFFIX;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
 /** The implementation of {@link TierFactory} for disk tier. */
@@ -68,9 +72,12 @@ public class DiskTierFactory implements TierFactory {
 
     private int bufferSizeBytes = -1;
 
+    @Nullable private Configuration conf;
+
     @Override
     public void setup(Configuration configuration) {
         this.bufferSizeBytes = 
ConfigurationParserUtils.getPageSize(configuration);
+        this.conf = checkNotNull(configuration);
     }
 
     @Override
@@ -139,7 +146,8 @@ public class DiskTierFactory implements TierFactory {
                 bufferPool,
                 ioExecutor,
                 maxRequestedBuffers,
-                DEFAULT_DISK_TIER_BUFFER_REQUEST_TIMEOUT);
+                DEFAULT_DISK_TIER_BUFFER_REQUEST_TIMEOUT,
+                buildBufferCompressor(bufferSizeBytes, checkNotNull(conf)));
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgent.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgent.java
index e8d35ea6e39..366a64e033c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgent.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgent.java
@@ -22,6 +22,7 @@ import 
org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
 import org.apache.flink.runtime.io.network.api.EndOfSegmentEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
 import 
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageIdMappingUtils;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
@@ -48,6 +49,7 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
 
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.compressBufferIfPossible;
 import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.getDiskTierName;
 import static org.apache.flink.util.Preconditions.checkArgument;
 
@@ -80,6 +82,8 @@ public class DiskTierProducerAgent implements 
TierProducerAgent, NettyServicePro
 
     private final DiskIOScheduler diskIOScheduler;
 
+    private final BufferCompressor bufferCompressor;
+
     private volatile boolean isReleased;
 
     DiskTierProducerAgent(
@@ -99,7 +103,8 @@ public class DiskTierProducerAgent implements 
TierProducerAgent, NettyServicePro
             BatchShuffleReadBufferPool bufferPool,
             ScheduledExecutorService ioExecutor,
             int maxRequestedBuffers,
-            Duration bufferRequestTimeout) {
+            Duration bufferRequestTimeout,
+            BufferCompressor bufferCompressor) {
         checkArgument(
                 numBytesPerSegment >= bufferSizeBytes,
                 "One segment should contain at least one buffer.");
@@ -112,6 +117,7 @@ public class DiskTierProducerAgent implements 
TierProducerAgent, NettyServicePro
         this.memoryManager = memoryManager;
         this.firstBufferIndexInSegment = new ArrayList<>();
         this.currentSubpartitionWriteBuffers = new int[numSubpartitions];
+        this.bufferCompressor = bufferCompressor;
 
         for (int i = 0; i < numSubpartitions; ++i) {
             // Each map is used to store the segment ids belonging to a 
subpartition. The map can be
@@ -177,11 +183,12 @@ public class DiskTierProducerAgent implements 
TierProducerAgent, NettyServicePro
             currentSubpartitionWriteBuffers[subpartitionIndex] = 0;
             return false;
         }
-        if (finishedBuffer.isBuffer()) {
-            memoryManager.transferBufferOwnership(bufferOwner, 
getDiskTierName(), finishedBuffer);
+        Buffer compressedBuffer = compressBufferIfPossible(finishedBuffer, 
bufferCompressor);
+        if (compressedBuffer.isBuffer()) {
+            memoryManager.transferBufferOwnership(bufferOwner, 
getDiskTierName(), compressedBuffer);
         }
         currentSubpartitionWriteBuffers[subpartitionIndex]++;
-        emitBuffer(finishedBuffer, subpartitionIndex, 
numRemainingConsecutiveBuffers == 0);
+        emitBuffer(compressedBuffer, subpartitionIndex, 
numRemainingConsecutiveBuffers == 0);
         return true;
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierFactory.java
index b2d84acb364..77fddde1742 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierFactory.java
@@ -34,10 +34,14 @@ import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProd
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleDescriptor;
 import org.apache.flink.runtime.util.ConfigurationParserUtils;
 
+import javax.annotation.Nullable;
+
 import java.util.List;
 import java.util.concurrent.ScheduledExecutorService;
 
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.buildBufferCompressor;
 import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.getMemoryTierName;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
 /** The implementation of {@link TierFactory} for memory tier. */
@@ -51,9 +55,12 @@ public class MemoryTierFactory implements TierFactory {
 
     private int bufferSizeBytes = -1;
 
+    @Nullable private Configuration conf;
+
     @Override
     public void setup(Configuration configuration) {
         this.bufferSizeBytes = 
ConfigurationParserUtils.getPageSize(configuration);
+        this.conf = checkNotNull(configuration);
     }
 
     @Override
@@ -103,7 +110,8 @@ public class MemoryTierFactory implements TierFactory {
                 isBroadcastOnly,
                 memoryManager,
                 nettyService,
-                resourceRegistry);
+                resourceRegistry,
+                buildBufferCompressor(bufferSizeBytes, checkNotNull(conf)));
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierProducerAgent.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierProducerAgent.java
index 973b8cdf741..54ba4cdd71f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierProducerAgent.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierProducerAgent.java
@@ -23,6 +23,7 @@ import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.network.api.EndOfSegmentEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
@@ -40,6 +41,7 @@ import java.io.IOException;
 import java.util.Arrays;
 
 import static 
org.apache.flink.runtime.io.network.buffer.Buffer.DataType.END_OF_SEGMENT;
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.compressBufferIfPossible;
 import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.getMemoryTierName;
 import static org.apache.flink.util.Preconditions.checkArgument;
 
@@ -66,6 +68,8 @@ public class MemoryTierProducerAgent implements 
TierProducerAgent, NettyServiceP
 
     private final MemoryTierSubpartitionProducerAgent[] 
subpartitionProducerAgents;
 
+    private final BufferCompressor bufferCompressor;
+
     public MemoryTierProducerAgent(
             TieredStoragePartitionId partitionId,
             int numSubpartitions,
@@ -75,7 +79,8 @@ public class MemoryTierProducerAgent implements 
TierProducerAgent, NettyServiceP
             boolean isBroadcastOnly,
             TieredStorageMemoryManager memoryManager,
             TieredStorageNettyService nettyService,
-            TieredStorageResourceRegistry resourceRegistry) {
+            TieredStorageResourceRegistry resourceRegistry,
+            BufferCompressor bufferCompressor) {
         checkArgument(
                 segmentSizeBytes >= bufferSizeBytes,
                 "One segment should contain at least one buffer.");
@@ -89,6 +94,7 @@ public class MemoryTierProducerAgent implements 
TierProducerAgent, NettyServiceP
         this.currentSubpartitionWriteBuffers = new int[numSubpartitions];
         this.nettyConnectionEstablished = new boolean[numSubpartitions];
         this.subpartitionProducerAgents = new 
MemoryTierSubpartitionProducerAgent[numSubpartitions];
+        this.bufferCompressor = bufferCompressor;
 
         Arrays.fill(currentSubpartitionWriteBuffers, 0);
         nettyService.registerProducer(partitionId, this);
@@ -138,11 +144,13 @@ public class MemoryTierProducerAgent implements 
TierProducerAgent, NettyServiceP
             currentSubpartitionWriteBuffers[subpartitionIndex] = 0;
             return false;
         }
-        if (finishedBuffer.isBuffer()) {
-            memoryManager.transferBufferOwnership(bufferOwner, 
getMemoryTierName(), finishedBuffer);
+        Buffer compressedBuffer = compressBufferIfPossible(finishedBuffer, 
bufferCompressor);
+        if (compressedBuffer.isBuffer()) {
+            memoryManager.transferBufferOwnership(
+                    bufferOwner, getMemoryTierName(), compressedBuffer);
         }
         currentSubpartitionWriteBuffers[subpartitionIndex]++;
-        addFinishedBuffer(finishedBuffer, subpartitionIndex);
+        addFinishedBuffer(compressedBuffer, subpartitionIndex);
         return true;
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierFactory.java
index 5c0f72ecea5..bcb36a7fb47 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierFactory.java
@@ -37,9 +37,12 @@ import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProd
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleDescriptor;
 import org.apache.flink.runtime.util.ConfigurationParserUtils;
 
+import javax.annotation.Nullable;
+
 import java.util.List;
 import java.util.concurrent.ScheduledExecutorService;
 
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.buildBufferCompressor;
 import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.getRemoteTierName;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -57,9 +60,12 @@ public class RemoteTierFactory implements TierFactory {
 
     private String remoteStoragePath = DEFAULT_REMOTE_STORAGE_BASE_PATH;
 
+    @Nullable private Configuration conf;
+
     @Override
     public void setup(Configuration configuration) {
         this.bufferSizeBytes = 
ConfigurationParserUtils.getPageSize(configuration);
+        this.conf = checkNotNull(configuration);
         this.remoteStoragePath =
                 checkNotNull(
                         configuration.get(
@@ -115,7 +121,8 @@ public class RemoteTierFactory implements TierFactory {
                 isBroadcastOnly,
                 partitionFileWriter,
                 storageMemoryManager,
-                resourceRegistry);
+                resourceRegistry,
+                buildBufferCompressor(bufferSizeBytes, checkNotNull(conf)));
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierProducerAgent.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierProducerAgent.java
index 8444c29fec9..e3cfcb960aa 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierProducerAgent.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierProducerAgent.java
@@ -19,6 +19,7 @@
 package 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.remote;
 
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.PartitionFileWriter;
@@ -28,6 +29,7 @@ import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProd
 
 import java.util.Arrays;
 
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.compressBufferIfPossible;
 import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.getRemoteTierName;
 import static org.apache.flink.util.Preconditions.checkArgument;
 
@@ -44,6 +46,8 @@ public class RemoteTierProducerAgent implements 
TierProducerAgent {
 
     private final int[] currentSubpartitionSegmentWriteBuffers;
 
+    private final BufferCompressor bufferCompressor;
+
     RemoteTierProducerAgent(
             TieredStoragePartitionId partitionId,
             int numSubpartitions,
@@ -52,7 +56,8 @@ public class RemoteTierProducerAgent implements 
TierProducerAgent {
             boolean isBroadcastOnly,
             PartitionFileWriter partitionFileWriter,
             TieredStorageMemoryManager memoryManager,
-            TieredStorageResourceRegistry resourceRegistry) {
+            TieredStorageResourceRegistry resourceRegistry,
+            BufferCompressor bufferCompressor) {
         checkArgument(
                 numBytesPerSegment >= bufferSizeBytes,
                 "One segment should contain at least one buffer.");
@@ -67,6 +72,7 @@ public class RemoteTierProducerAgent implements 
TierProducerAgent {
                         memoryManager,
                         partitionFileWriter);
         this.currentSubpartitionSegmentWriteBuffers = new 
int[numSubpartitions];
+        this.bufferCompressor = bufferCompressor;
         Arrays.fill(currentSubpartitionSegmentWriteBuffers, 0);
         resourceRegistry.registerResource(partitionId, 
this::releaseAllResources);
     }
@@ -95,11 +101,13 @@ public class RemoteTierProducerAgent implements 
TierProducerAgent {
             currentSubpartitionSegmentWriteBuffers[subpartitionIndex] = 0;
             return false;
         }
-        if (buffer.isBuffer()) {
-            memoryManager.transferBufferOwnership(bufferOwner, 
getRemoteTierName(), buffer);
+        Buffer compressedBuffer = compressBufferIfPossible(buffer, 
bufferCompressor);
+        if (compressedBuffer.isBuffer()) {
+            memoryManager.transferBufferOwnership(
+                    bufferOwner, getRemoteTierName(), compressedBuffer);
         }
         currentSubpartitionSegmentWriteBuffers[subpartitionIndex]++;
-        cacheDataManager.appendBuffer(buffer, subpartitionIndex);
+        cacheDataManager.appendBuffer(compressedBuffer, subpartitionIndex);
         return true;
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgentTest.java
index 7444dcda671..33a3682c0e1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgentTest.java
@@ -229,6 +229,7 @@ public class DiskTierProducerAgentTest {
                 new BatchShuffleReadBufferPool(1, 1),
                 new ManuallyTriggeredScheduledExecutorService(),
                 0,
-                Duration.ofMinutes(5));
+                Duration.ofMinutes(5),
+                null);
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierProducerAgentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierProducerAgentTest.java
index 170ce2e018a..873ccc70197 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierProducerAgentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierProducerAgentTest.java
@@ -121,7 +121,8 @@ class MemoryTierProducerAgentTest {
                         false,
                         memoryManager,
                         nettyService,
-                        new TieredStorageResourceRegistry())) {
+                        new TieredStorageResourceRegistry(),
+                        null)) {
             memoryTierProducerAgent.connectionEstablished(
                     SUBPARTITION_ID, new 
TestingNettyConnectionWriter.Builder().build());
             
assertThat(memoryTierProducerAgent.tryStartNewSegment(SUBPARTITION_ID, 0, 
0)).isFalse();
@@ -220,6 +221,7 @@ class MemoryTierProducerAgentTest {
                 isBroadcastOnly,
                 memoryManager,
                 nettyService,
-                resourceRegistry);
+                resourceRegistry,
+                null);
     }
 }

Reply via email to