This is an automated email from the ASF dual-hosted git repository.
guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new c00877260c0 [FLINK-36021][network] Delegating the responsibility for
compression to every tier
c00877260c0 is described below
commit c00877260c0dcce8cfd75b7d3a43cf6f68dcc387
Author: Weijie Guo <[email protected]>
AuthorDate: Fri Aug 9 15:16:57 2024 +0800
[FLINK-36021][network] Delegating the responsibility for compression to
every tier
---
.../hybrid/tiered/common/TieredStorageUtils.java | 21 +++++++++++++++++++
.../shuffle/TieredResultPartitionFactory.java | 9 +++++---
.../storage/TieredStorageProducerClient.java | 24 ++++------------------
.../partition/hybrid/tiered/tier/TierFactory.java | 6 +++++-
.../hybrid/tiered/tier/disk/DiskTierFactory.java | 9 ++++++--
.../tiered/tier/disk/DiskTierProducerAgent.java | 15 ++++++++++----
.../tiered/tier/memory/MemoryTierFactory.java | 9 ++++++--
.../tier/memory/MemoryTierProducerAgent.java | 16 +++++++++++----
.../tiered/tier/remote/RemoteTierFactory.java | 9 ++++++--
.../tier/remote/RemoteTierProducerAgent.java | 16 +++++++++++----
.../hybrid/tiered/storage/TestingTierFactory.java | 6 +++++-
.../tier/disk/DiskTierProducerAgentTest.java | 3 ++-
.../tier/memory/MemoryTierProducerAgentTest.java | 6 ++++--
13 files changed, 103 insertions(+), 46 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageUtils.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageUtils.java
index 8565daf14d2..328573050da 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageUtils.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageUtils.java
@@ -20,6 +20,7 @@ package
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil;
import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.BufferAccumulator;
import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.HashBufferAccumulator;
@@ -32,6 +33,8 @@ import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.remote.R
import java.nio.ByteBuffer;
import java.util.List;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/** Utils for reading from or writing to tiered storage. */
public class TieredStorageUtils {
@@ -128,4 +131,22 @@ public class TieredStorageUtils {
bufferWithHeaders[index] = header;
bufferWithHeaders[index + 1] = buffer.getNioBufferReadable();
}
+
+ /** Try compress buffer if possible. */
+ public static Buffer compressBufferIfPossible(
+ Buffer buffer, BufferCompressor bufferCompressor) {
+ if (!canBeCompressed(buffer, bufferCompressor)) {
+ return buffer;
+ }
+
+ return checkNotNull(bufferCompressor).compressToOriginalBuffer(buffer);
+ }
+
+ /**
+ * Whether the buffer can be compressed or not. Note that event is not
compressed because it is
+ * usually small and the size can become even larger after compression.
+ */
+ public static boolean canBeCompressed(Buffer buffer, BufferCompressor
bufferCompressor) {
+ return bufferCompressor != null && buffer.isBuffer() &&
buffer.readableBytes() > 0;
+ }
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartitionFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartitionFactory.java
index 4a00401c4d9..9aaaad28f03 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartitionFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartitionFactory.java
@@ -131,7 +131,8 @@ public class TieredResultPartitionFactory {
tierShuffleDescriptors,
fileChannelManager,
batchShuffleReadBufferPool,
- batchShuffleReadIOExecutor);
+ batchShuffleReadIOExecutor,
+ bufferCompressor);
// Create producer client.
TieredStorageProducerClient tieredStorageProducerClient =
@@ -199,7 +200,8 @@ public class TieredResultPartitionFactory {
List<TierShuffleDescriptor> tierShuffleDescriptors,
FileChannelManager fileChannelManager,
BatchShuffleReadBufferPool batchShuffleReadBufferPool,
- ScheduledExecutorService batchShuffleReadIOExecutor) {
+ ScheduledExecutorService batchShuffleReadIOExecutor,
+ @Nullable BufferCompressor bufferCompressor) {
List<TierProducerAgent> tierProducerAgents = new ArrayList<>();
List<TieredStorageMemorySpec> tieredStorageMemorySpecs = new
ArrayList<>();
@@ -238,7 +240,8 @@ public class TieredResultPartitionFactory {
Collections.singletonList(tierShuffleDescriptors.get(index)),
Math.max(
2 *
batchShuffleReadBufferPool.getNumBuffersPerRequest(),
- numberOfSubpartitions));
+ numberOfSubpartitions),
+ bufferCompressor);
tierProducerAgents.add(producerAgent);
tieredStorageMemorySpecs.add(tierFactory.getProducerAgentMemorySpec());
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java
index 9a03aef0005..64edce34c49 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java
@@ -144,21 +144,21 @@ public class TieredStorageProducerClient {
TieredStorageSubpartitionId subpartitionId,
Buffer accumulatedBuffer,
int numRemainingConsecutiveBuffers) {
+ int unCompressedSize = accumulatedBuffer.readableBytes();
try {
- Buffer compressedBuffer =
compressBufferIfPossible(accumulatedBuffer);
if
(currentSubpartitionTierAgent[subpartitionId.getSubpartitionId()] == null) {
chooseStorageTierToStartSegment(subpartitionId,
numRemainingConsecutiveBuffers + 1);
}
if
(!currentSubpartitionTierAgent[subpartitionId.getSubpartitionId()].tryWrite(
subpartitionId,
- compressedBuffer,
+ accumulatedBuffer,
bufferAccumulator,
numRemainingConsecutiveBuffers)) {
chooseStorageTierToStartSegment(subpartitionId,
numRemainingConsecutiveBuffers + 1);
checkState(
currentSubpartitionTierAgent[subpartitionId.getSubpartitionId()].tryWrite(
subpartitionId,
- compressedBuffer,
+ accumulatedBuffer,
bufferAccumulator,
numRemainingConsecutiveBuffers),
"Failed to write the first buffer to the new segment");
@@ -167,7 +167,7 @@ public class TieredStorageProducerClient {
accumulatedBuffer.recycleBuffer();
ExceptionUtils.rethrow(ioe);
}
- updateMetricStatistics(1, accumulatedBuffer.readableBytes());
+ updateMetricStatistics(1, unCompressedSize);
}
private void chooseStorageTierToStartSegment(
@@ -188,22 +188,6 @@ public class TieredStorageProducerClient {
throw new IOException("Failed to choose a storage tier to start a new
segment.");
}
- private Buffer compressBufferIfPossible(Buffer buffer) {
- if (!canBeCompressed(buffer)) {
- return buffer;
- }
-
- return checkNotNull(bufferCompressor).compressToOriginalBuffer(buffer);
- }
-
- /**
- * Whether the buffer can be compressed or not. Note that event is not
compressed because it is
- * usually small and the size can become even larger after compression.
- */
- private boolean canBeCompressed(Buffer buffer) {
- return bufferCompressor != null && buffer.isBuffer() &&
buffer.readableBytes() > 0;
- }
-
private void updateMetricStatistics(int numWriteBuffersDelta, int
numWriteBytesDelta) {
checkNotNull(metricStatisticsUpdater)
.accept(
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierFactory.java
index fbfce81f2ee..11fb089a4b6 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierFactory.java
@@ -20,6 +20,7 @@ package
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TieredStorageNettyService;
import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageConsumerSpec;
@@ -27,6 +28,8 @@ import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.Tiere
import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemorySpec;
import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageResourceRegistry;
+import javax.annotation.Nullable;
+
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
@@ -61,7 +64,8 @@ public interface TierFactory {
BatchShuffleReadBufferPool bufferPool,
ScheduledExecutorService ioExecutor,
List<TierShuffleDescriptor> shuffleDescriptors,
- int maxRequestedBuffer);
+ int maxRequestedBuffer,
+ @Nullable BufferCompressor bufferCompressor);
/** Creates the consumer-side agent of a Tier. */
TierConsumerAgent createConsumerAgent(
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierFactory.java
index 786aab43c5f..c3a1ee2c4f9 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierFactory.java
@@ -20,6 +20,7 @@ package
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.disk;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.ProducerMergedPartitionFile;
import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.ProducerMergedPartitionFileIndex;
@@ -38,6 +39,8 @@ import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProd
import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleDescriptor;
import org.apache.flink.runtime.util.ConfigurationParserUtils;
+import javax.annotation.Nullable;
+
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
@@ -106,7 +109,8 @@ public class DiskTierFactory implements TierFactory {
BatchShuffleReadBufferPool bufferPool,
ScheduledExecutorService ioExecutor,
List<TierShuffleDescriptor> shuffleDescriptors,
- int maxRequestedBuffers) {
+ int maxRequestedBuffers,
+ @Nullable BufferCompressor bufferCompressor) {
checkState(bufferSizeBytes > 0);
ProducerMergedPartitionFileIndex partitionFileIndex =
@@ -139,7 +143,8 @@ public class DiskTierFactory implements TierFactory {
bufferPool,
ioExecutor,
maxRequestedBuffers,
- DEFAULT_DISK_TIER_BUFFER_REQUEST_TIMEOUT);
+ DEFAULT_DISK_TIER_BUFFER_REQUEST_TIMEOUT,
+ bufferCompressor);
}
@Override
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgent.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgent.java
index e8d35ea6e39..366a64e033c 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgent.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgent.java
@@ -22,6 +22,7 @@ import
org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
import org.apache.flink.runtime.io.network.api.EndOfSegmentEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageIdMappingUtils;
import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
@@ -48,6 +49,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
+import static
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.compressBufferIfPossible;
import static
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.getDiskTierName;
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -80,6 +82,8 @@ public class DiskTierProducerAgent implements
TierProducerAgent, NettyServicePro
private final DiskIOScheduler diskIOScheduler;
+ private final BufferCompressor bufferCompressor;
+
private volatile boolean isReleased;
DiskTierProducerAgent(
@@ -99,7 +103,8 @@ public class DiskTierProducerAgent implements
TierProducerAgent, NettyServicePro
BatchShuffleReadBufferPool bufferPool,
ScheduledExecutorService ioExecutor,
int maxRequestedBuffers,
- Duration bufferRequestTimeout) {
+ Duration bufferRequestTimeout,
+ BufferCompressor bufferCompressor) {
checkArgument(
numBytesPerSegment >= bufferSizeBytes,
"One segment should contain at least one buffer.");
@@ -112,6 +117,7 @@ public class DiskTierProducerAgent implements
TierProducerAgent, NettyServicePro
this.memoryManager = memoryManager;
this.firstBufferIndexInSegment = new ArrayList<>();
this.currentSubpartitionWriteBuffers = new int[numSubpartitions];
+ this.bufferCompressor = bufferCompressor;
for (int i = 0; i < numSubpartitions; ++i) {
// Each map is used to store the segment ids belonging to a
subpartition. The map can be
@@ -177,11 +183,12 @@ public class DiskTierProducerAgent implements
TierProducerAgent, NettyServicePro
currentSubpartitionWriteBuffers[subpartitionIndex] = 0;
return false;
}
- if (finishedBuffer.isBuffer()) {
- memoryManager.transferBufferOwnership(bufferOwner,
getDiskTierName(), finishedBuffer);
+ Buffer compressedBuffer = compressBufferIfPossible(finishedBuffer,
bufferCompressor);
+ if (compressedBuffer.isBuffer()) {
+ memoryManager.transferBufferOwnership(bufferOwner,
getDiskTierName(), compressedBuffer);
}
currentSubpartitionWriteBuffers[subpartitionIndex]++;
- emitBuffer(finishedBuffer, subpartitionIndex,
numRemainingConsecutiveBuffers == 0);
+ emitBuffer(compressedBuffer, subpartitionIndex,
numRemainingConsecutiveBuffers == 0);
return true;
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierFactory.java
index b2d84acb364..8ef55c6b59e 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierFactory.java
@@ -20,6 +20,7 @@ package
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.memory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TieredStorageNettyService;
import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageConsumerSpec;
@@ -34,6 +35,8 @@ import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProd
import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleDescriptor;
import org.apache.flink.runtime.util.ConfigurationParserUtils;
+import javax.annotation.Nullable;
+
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
@@ -91,7 +94,8 @@ public class MemoryTierFactory implements TierFactory {
BatchShuffleReadBufferPool bufferPool,
ScheduledExecutorService ioExecutor,
List<TierShuffleDescriptor> shuffleDescriptors,
- int maxRequestedBuffers) {
+ int maxRequestedBuffers,
+ @Nullable BufferCompressor bufferCompressor) {
checkState(bufferSizeBytes > 0);
return new MemoryTierProducerAgent(
@@ -103,7 +107,8 @@ public class MemoryTierFactory implements TierFactory {
isBroadcastOnly,
memoryManager,
nettyService,
- resourceRegistry);
+ resourceRegistry,
+ bufferCompressor);
}
@Override
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierProducerAgent.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierProducerAgent.java
index 973b8cdf741..54ba4cdd71f 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierProducerAgent.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierProducerAgent.java
@@ -23,6 +23,7 @@ import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.network.api.EndOfSegmentEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
@@ -40,6 +41,7 @@ import java.io.IOException;
import java.util.Arrays;
import static
org.apache.flink.runtime.io.network.buffer.Buffer.DataType.END_OF_SEGMENT;
+import static
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.compressBufferIfPossible;
import static
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.getMemoryTierName;
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -66,6 +68,8 @@ public class MemoryTierProducerAgent implements
TierProducerAgent, NettyServiceP
private final MemoryTierSubpartitionProducerAgent[]
subpartitionProducerAgents;
+ private final BufferCompressor bufferCompressor;
+
public MemoryTierProducerAgent(
TieredStoragePartitionId partitionId,
int numSubpartitions,
@@ -75,7 +79,8 @@ public class MemoryTierProducerAgent implements
TierProducerAgent, NettyServiceP
boolean isBroadcastOnly,
TieredStorageMemoryManager memoryManager,
TieredStorageNettyService nettyService,
- TieredStorageResourceRegistry resourceRegistry) {
+ TieredStorageResourceRegistry resourceRegistry,
+ BufferCompressor bufferCompressor) {
checkArgument(
segmentSizeBytes >= bufferSizeBytes,
"One segment should contain at least one buffer.");
@@ -89,6 +94,7 @@ public class MemoryTierProducerAgent implements
TierProducerAgent, NettyServiceP
this.currentSubpartitionWriteBuffers = new int[numSubpartitions];
this.nettyConnectionEstablished = new boolean[numSubpartitions];
this.subpartitionProducerAgents = new
MemoryTierSubpartitionProducerAgent[numSubpartitions];
+ this.bufferCompressor = bufferCompressor;
Arrays.fill(currentSubpartitionWriteBuffers, 0);
nettyService.registerProducer(partitionId, this);
@@ -138,11 +144,13 @@ public class MemoryTierProducerAgent implements
TierProducerAgent, NettyServiceP
currentSubpartitionWriteBuffers[subpartitionIndex] = 0;
return false;
}
- if (finishedBuffer.isBuffer()) {
- memoryManager.transferBufferOwnership(bufferOwner,
getMemoryTierName(), finishedBuffer);
+ Buffer compressedBuffer = compressBufferIfPossible(finishedBuffer,
bufferCompressor);
+ if (compressedBuffer.isBuffer()) {
+ memoryManager.transferBufferOwnership(
+ bufferOwner, getMemoryTierName(), compressedBuffer);
}
currentSubpartitionWriteBuffers[subpartitionIndex]++;
- addFinishedBuffer(finishedBuffer, subpartitionIndex);
+ addFinishedBuffer(compressedBuffer, subpartitionIndex);
return true;
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierFactory.java
index 5c0f72ecea5..3d374715a9b 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierFactory.java
@@ -21,6 +21,7 @@ package
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.remote;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.PartitionFileReader;
import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.PartitionFileWriter;
@@ -37,6 +38,8 @@ import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProd
import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleDescriptor;
import org.apache.flink.runtime.util.ConfigurationParserUtils;
+import javax.annotation.Nullable;
+
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
@@ -101,7 +104,8 @@ public class RemoteTierFactory implements TierFactory {
BatchShuffleReadBufferPool bufferPool,
ScheduledExecutorService ioExecutor,
List<TierShuffleDescriptor> shuffleDescriptors,
- int maxRequestedBuffers) {
+ int maxRequestedBuffers,
+ @Nullable BufferCompressor bufferCompressor) {
checkState(bufferSizeBytes > 0);
checkNotNull(remoteStoragePath);
@@ -115,7 +119,8 @@ public class RemoteTierFactory implements TierFactory {
isBroadcastOnly,
partitionFileWriter,
storageMemoryManager,
- resourceRegistry);
+ resourceRegistry,
+ bufferCompressor);
}
@Override
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierProducerAgent.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierProducerAgent.java
index 8444c29fec9..e3cfcb960aa 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierProducerAgent.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierProducerAgent.java
@@ -19,6 +19,7 @@
package
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.remote;
import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.PartitionFileWriter;
@@ -28,6 +29,7 @@ import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProd
import java.util.Arrays;
+import static
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.compressBufferIfPossible;
import static
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.getRemoteTierName;
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -44,6 +46,8 @@ public class RemoteTierProducerAgent implements
TierProducerAgent {
private final int[] currentSubpartitionSegmentWriteBuffers;
+ private final BufferCompressor bufferCompressor;
+
RemoteTierProducerAgent(
TieredStoragePartitionId partitionId,
int numSubpartitions,
@@ -52,7 +56,8 @@ public class RemoteTierProducerAgent implements
TierProducerAgent {
boolean isBroadcastOnly,
PartitionFileWriter partitionFileWriter,
TieredStorageMemoryManager memoryManager,
- TieredStorageResourceRegistry resourceRegistry) {
+ TieredStorageResourceRegistry resourceRegistry,
+ BufferCompressor bufferCompressor) {
checkArgument(
numBytesPerSegment >= bufferSizeBytes,
"One segment should contain at least one buffer.");
@@ -67,6 +72,7 @@ public class RemoteTierProducerAgent implements
TierProducerAgent {
memoryManager,
partitionFileWriter);
this.currentSubpartitionSegmentWriteBuffers = new
int[numSubpartitions];
+ this.bufferCompressor = bufferCompressor;
Arrays.fill(currentSubpartitionSegmentWriteBuffers, 0);
resourceRegistry.registerResource(partitionId,
this::releaseAllResources);
}
@@ -95,11 +101,13 @@ public class RemoteTierProducerAgent implements
TierProducerAgent {
currentSubpartitionSegmentWriteBuffers[subpartitionIndex] = 0;
return false;
}
- if (buffer.isBuffer()) {
- memoryManager.transferBufferOwnership(bufferOwner,
getRemoteTierName(), buffer);
+ Buffer compressedBuffer = compressBufferIfPossible(buffer,
bufferCompressor);
+ if (compressedBuffer.isBuffer()) {
+ memoryManager.transferBufferOwnership(
+ bufferOwner, getRemoteTierName(), compressedBuffer);
}
currentSubpartitionSegmentWriteBuffers[subpartitionIndex]++;
- cacheDataManager.appendBuffer(buffer, subpartitionIndex);
+ cacheDataManager.appendBuffer(compressedBuffer, subpartitionIndex);
return true;
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TestingTierFactory.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TestingTierFactory.java
index 041ac33c480..9d87d8e9bc9 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TestingTierFactory.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TestingTierFactory.java
@@ -20,6 +20,7 @@ package
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TieredStorageNettyService;
import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierConsumerAgent;
@@ -28,6 +29,8 @@ import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierMast
import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleDescriptor;
+import javax.annotation.Nullable;
+
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.BiFunction;
@@ -115,7 +118,8 @@ public class TestingTierFactory implements TierFactory {
BatchShuffleReadBufferPool bufferPool,
ScheduledExecutorService ioExecutor,
List<TierShuffleDescriptor> shuffleDescriptors,
- int maxRequestedBuffers) {
+ int maxRequestedBuffers,
+ @Nullable BufferCompressor bufferCompressor) {
return tierProducerAgentSupplier.get();
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgentTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgentTest.java
index 7444dcda671..33a3682c0e1 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgentTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgentTest.java
@@ -229,6 +229,7 @@ public class DiskTierProducerAgentTest {
new BatchShuffleReadBufferPool(1, 1),
new ManuallyTriggeredScheduledExecutorService(),
0,
- Duration.ofMinutes(5));
+ Duration.ofMinutes(5),
+ null);
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierProducerAgentTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierProducerAgentTest.java
index 170ce2e018a..873ccc70197 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierProducerAgentTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierProducerAgentTest.java
@@ -121,7 +121,8 @@ class MemoryTierProducerAgentTest {
false,
memoryManager,
nettyService,
- new TieredStorageResourceRegistry())) {
+ new TieredStorageResourceRegistry(),
+ null)) {
memoryTierProducerAgent.connectionEstablished(
SUBPARTITION_ID, new
TestingNettyConnectionWriter.Builder().build());
assertThat(memoryTierProducerAgent.tryStartNewSegment(SUBPARTITION_ID, 0,
0)).isFalse();
@@ -220,6 +221,7 @@ class MemoryTierProducerAgentTest {
isBroadcastOnly,
memoryManager,
nettyService,
- resourceRegistry);
+ resourceRegistry,
+ null);
}
}