This is an automated email from the ASF dual-hosted git repository.
guoweijie pushed a commit to branch release-1.20
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.20 by this push:
new 52c72f2f2df [FLINK-36021][network] Delegating the responsibility for
compression to every tier
52c72f2f2df is described below
commit 52c72f2f2dfcc85d2e5ce78d866f6895e7f65adc
Author: Weijie Guo <[email protected]>
AuthorDate: Fri Aug 9 15:16:57 2024 +0800
[FLINK-36021][network] Delegating the responsibility for compression to
every tier
---
.../hybrid/tiered/common/TieredStorageUtils.java | 42 ++++++++++++++++++++++
.../storage/TieredStorageProducerClient.java | 24 +++----------
.../hybrid/tiered/tier/disk/DiskTierFactory.java | 10 +++++-
.../tiered/tier/disk/DiskTierProducerAgent.java | 15 +++++---
.../tiered/tier/memory/MemoryTierFactory.java | 10 +++++-
.../tier/memory/MemoryTierProducerAgent.java | 16 ++++++---
.../tiered/tier/remote/RemoteTierFactory.java | 9 ++++-
.../tier/remote/RemoteTierProducerAgent.java | 16 ++++++---
.../tier/disk/DiskTierProducerAgentTest.java | 3 +-
.../tier/memory/MemoryTierProducerAgentTest.java | 6 ++--
10 files changed, 113 insertions(+), 38 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageUtils.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageUtils.java
index 8565daf14d2..562b7c529d7 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageUtils.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageUtils.java
@@ -19,11 +19,16 @@
package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import
org.apache.flink.configuration.NettyShuffleEnvironmentOptions.CompressionCodec;
import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil;
import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.BufferAccumulator;
import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.HashBufferAccumulator;
import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.SortBufferAccumulator;
+import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierFactory;
import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.disk.DiskTierFactory;
import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.memory.MemoryTierFactory;
@@ -32,6 +37,8 @@ import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.remote.R
import java.nio.ByteBuffer;
import java.util.List;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/** Utils for reading from or writing to tiered storage. */
public class TieredStorageUtils {
@@ -128,4 +135,39 @@ public class TieredStorageUtils {
bufferWithHeaders[index] = header;
bufferWithHeaders[index + 1] = buffer.getNioBufferReadable();
}
+
+ /** Try compress buffer if possible. */
+ public static Buffer compressBufferIfPossible(
+ Buffer buffer, BufferCompressor bufferCompressor) {
+ if (!canBeCompressed(buffer, bufferCompressor)) {
+ return buffer;
+ }
+
+ return checkNotNull(bufferCompressor).compressToOriginalBuffer(buffer);
+ }
+
+ /**
+ * Whether the buffer can be compressed or not. Note that event is not
compressed because it is
+ * usually small and the size can become even larger after compression.
+ */
+ public static boolean canBeCompressed(Buffer buffer, BufferCompressor
bufferCompressor) {
+ return bufferCompressor != null && buffer.isBuffer() &&
buffer.readableBytes() > 0;
+ }
+
+ /**
+ * Construct the {@link BufferCompressor} from configuration.
+ *
+ * <p>Note: This is just a workaround for released version as we can not
change the interface of
+ * {@link TierFactory}.
+ */
+ public static BufferCompressor buildBufferCompressor(
+ int bufferSizeBytes, Configuration configuration) {
+ CompressionCodec compressionCodec =
+
configuration.get(NettyShuffleEnvironmentOptions.SHUFFLE_COMPRESSION_CODEC);
+ boolean compressionEnabled =
+
configuration.get(NettyShuffleEnvironmentOptions.BATCH_SHUFFLE_COMPRESSION_ENABLED);
+ return compressionEnabled && compressionCodec != CompressionCodec.NONE
+ ? new BufferCompressor(bufferSizeBytes, compressionCodec)
+ : null;
+ }
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java
index 80a712d9146..4586444e91b 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java
@@ -145,21 +145,21 @@ public class TieredStorageProducerClient {
TieredStorageSubpartitionId subpartitionId,
Buffer accumulatedBuffer,
int numRemainingConsecutiveBuffers) {
+ int unCompressedSize = accumulatedBuffer.readableBytes();
try {
- Buffer compressedBuffer =
compressBufferIfPossible(accumulatedBuffer);
if
(currentSubpartitionTierAgent[subpartitionId.getSubpartitionId()] == null) {
chooseStorageTierToStartSegment(subpartitionId,
numRemainingConsecutiveBuffers + 1);
}
if
(!currentSubpartitionTierAgent[subpartitionId.getSubpartitionId()].tryWrite(
subpartitionId,
- compressedBuffer,
+ accumulatedBuffer,
bufferAccumulator,
numRemainingConsecutiveBuffers)) {
chooseStorageTierToStartSegment(subpartitionId,
numRemainingConsecutiveBuffers + 1);
checkState(
currentSubpartitionTierAgent[subpartitionId.getSubpartitionId()].tryWrite(
subpartitionId,
- compressedBuffer,
+ accumulatedBuffer,
bufferAccumulator,
numRemainingConsecutiveBuffers),
"Failed to write the first buffer to the new segment");
@@ -168,7 +168,7 @@ public class TieredStorageProducerClient {
accumulatedBuffer.recycleBuffer();
ExceptionUtils.rethrow(ioe);
}
- updateMetricStatistics(1, accumulatedBuffer.readableBytes());
+ updateMetricStatistics(1, unCompressedSize);
}
private void chooseStorageTierToStartSegment(
@@ -189,22 +189,6 @@ public class TieredStorageProducerClient {
throw new IOException("Failed to choose a storage tier to start a new
segment.");
}
- private Buffer compressBufferIfPossible(Buffer buffer) {
- if (!canBeCompressed(buffer)) {
- return buffer;
- }
-
- return checkNotNull(bufferCompressor).compressToOriginalBuffer(buffer);
- }
-
- /**
- * Whether the buffer can be compressed or not. Note that event is not
compressed because it is
- * usually small and the size can become even larger after compression.
- */
- private boolean canBeCompressed(Buffer buffer) {
- return bufferCompressor != null && buffer.isBuffer() &&
buffer.readableBytes() > 0;
- }
-
private void updateMetricStatistics(int numWriteBuffersDelta, int
numWriteBytesDelta) {
checkNotNull(metricStatisticsUpdater)
.accept(
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierFactory.java
index 786aab43c5f..a750fe17e1e 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierFactory.java
@@ -38,15 +38,19 @@ import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProd
import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleDescriptor;
import org.apache.flink.runtime.util.ConfigurationParserUtils;
+import javax.annotation.Nullable;
+
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
+import static
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.buildBufferCompressor;
import static
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.getDiskTierName;
import static
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.ProducerMergedPartitionFile.DATA_FILE_SUFFIX;
import static
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.ProducerMergedPartitionFile.INDEX_FILE_SUFFIX;
+import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
/** The implementation of {@link TierFactory} for disk tier. */
@@ -68,9 +72,12 @@ public class DiskTierFactory implements TierFactory {
private int bufferSizeBytes = -1;
+ @Nullable private Configuration conf;
+
@Override
public void setup(Configuration configuration) {
this.bufferSizeBytes =
ConfigurationParserUtils.getPageSize(configuration);
+ this.conf = checkNotNull(configuration);
}
@Override
@@ -139,7 +146,8 @@ public class DiskTierFactory implements TierFactory {
bufferPool,
ioExecutor,
maxRequestedBuffers,
- DEFAULT_DISK_TIER_BUFFER_REQUEST_TIMEOUT);
+ DEFAULT_DISK_TIER_BUFFER_REQUEST_TIMEOUT,
+ buildBufferCompressor(bufferSizeBytes, checkNotNull(conf)));
}
@Override
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgent.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgent.java
index e8d35ea6e39..366a64e033c 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgent.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgent.java
@@ -22,6 +22,7 @@ import
org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
import org.apache.flink.runtime.io.network.api.EndOfSegmentEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageIdMappingUtils;
import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
@@ -48,6 +49,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
+import static
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.compressBufferIfPossible;
import static
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.getDiskTierName;
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -80,6 +82,8 @@ public class DiskTierProducerAgent implements
TierProducerAgent, NettyServicePro
private final DiskIOScheduler diskIOScheduler;
+ private final BufferCompressor bufferCompressor;
+
private volatile boolean isReleased;
DiskTierProducerAgent(
@@ -99,7 +103,8 @@ public class DiskTierProducerAgent implements
TierProducerAgent, NettyServicePro
BatchShuffleReadBufferPool bufferPool,
ScheduledExecutorService ioExecutor,
int maxRequestedBuffers,
- Duration bufferRequestTimeout) {
+ Duration bufferRequestTimeout,
+ BufferCompressor bufferCompressor) {
checkArgument(
numBytesPerSegment >= bufferSizeBytes,
"One segment should contain at least one buffer.");
@@ -112,6 +117,7 @@ public class DiskTierProducerAgent implements
TierProducerAgent, NettyServicePro
this.memoryManager = memoryManager;
this.firstBufferIndexInSegment = new ArrayList<>();
this.currentSubpartitionWriteBuffers = new int[numSubpartitions];
+ this.bufferCompressor = bufferCompressor;
for (int i = 0; i < numSubpartitions; ++i) {
// Each map is used to store the segment ids belonging to a
subpartition. The map can be
@@ -177,11 +183,12 @@ public class DiskTierProducerAgent implements
TierProducerAgent, NettyServicePro
currentSubpartitionWriteBuffers[subpartitionIndex] = 0;
return false;
}
- if (finishedBuffer.isBuffer()) {
- memoryManager.transferBufferOwnership(bufferOwner,
getDiskTierName(), finishedBuffer);
+ Buffer compressedBuffer = compressBufferIfPossible(finishedBuffer,
bufferCompressor);
+ if (compressedBuffer.isBuffer()) {
+ memoryManager.transferBufferOwnership(bufferOwner,
getDiskTierName(), compressedBuffer);
}
currentSubpartitionWriteBuffers[subpartitionIndex]++;
- emitBuffer(finishedBuffer, subpartitionIndex,
numRemainingConsecutiveBuffers == 0);
+ emitBuffer(compressedBuffer, subpartitionIndex,
numRemainingConsecutiveBuffers == 0);
return true;
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierFactory.java
index b2d84acb364..77fddde1742 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierFactory.java
@@ -34,10 +34,14 @@ import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProd
import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleDescriptor;
import org.apache.flink.runtime.util.ConfigurationParserUtils;
+import javax.annotation.Nullable;
+
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
+import static
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.buildBufferCompressor;
import static
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.getMemoryTierName;
+import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
/** The implementation of {@link TierFactory} for memory tier. */
@@ -51,9 +55,12 @@ public class MemoryTierFactory implements TierFactory {
private int bufferSizeBytes = -1;
+ @Nullable private Configuration conf;
+
@Override
public void setup(Configuration configuration) {
this.bufferSizeBytes =
ConfigurationParserUtils.getPageSize(configuration);
+ this.conf = checkNotNull(configuration);
}
@Override
@@ -103,7 +110,8 @@ public class MemoryTierFactory implements TierFactory {
isBroadcastOnly,
memoryManager,
nettyService,
- resourceRegistry);
+ resourceRegistry,
+ buildBufferCompressor(bufferSizeBytes, checkNotNull(conf)));
}
@Override
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierProducerAgent.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierProducerAgent.java
index 973b8cdf741..54ba4cdd71f 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierProducerAgent.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierProducerAgent.java
@@ -23,6 +23,7 @@ import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.network.api.EndOfSegmentEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
@@ -40,6 +41,7 @@ import java.io.IOException;
import java.util.Arrays;
import static
org.apache.flink.runtime.io.network.buffer.Buffer.DataType.END_OF_SEGMENT;
+import static
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.compressBufferIfPossible;
import static
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.getMemoryTierName;
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -66,6 +68,8 @@ public class MemoryTierProducerAgent implements
TierProducerAgent, NettyServiceP
private final MemoryTierSubpartitionProducerAgent[]
subpartitionProducerAgents;
+ private final BufferCompressor bufferCompressor;
+
public MemoryTierProducerAgent(
TieredStoragePartitionId partitionId,
int numSubpartitions,
@@ -75,7 +79,8 @@ public class MemoryTierProducerAgent implements
TierProducerAgent, NettyServiceP
boolean isBroadcastOnly,
TieredStorageMemoryManager memoryManager,
TieredStorageNettyService nettyService,
- TieredStorageResourceRegistry resourceRegistry) {
+ TieredStorageResourceRegistry resourceRegistry,
+ BufferCompressor bufferCompressor) {
checkArgument(
segmentSizeBytes >= bufferSizeBytes,
"One segment should contain at least one buffer.");
@@ -89,6 +94,7 @@ public class MemoryTierProducerAgent implements
TierProducerAgent, NettyServiceP
this.currentSubpartitionWriteBuffers = new int[numSubpartitions];
this.nettyConnectionEstablished = new boolean[numSubpartitions];
this.subpartitionProducerAgents = new
MemoryTierSubpartitionProducerAgent[numSubpartitions];
+ this.bufferCompressor = bufferCompressor;
Arrays.fill(currentSubpartitionWriteBuffers, 0);
nettyService.registerProducer(partitionId, this);
@@ -138,11 +144,13 @@ public class MemoryTierProducerAgent implements
TierProducerAgent, NettyServiceP
currentSubpartitionWriteBuffers[subpartitionIndex] = 0;
return false;
}
- if (finishedBuffer.isBuffer()) {
- memoryManager.transferBufferOwnership(bufferOwner,
getMemoryTierName(), finishedBuffer);
+ Buffer compressedBuffer = compressBufferIfPossible(finishedBuffer,
bufferCompressor);
+ if (compressedBuffer.isBuffer()) {
+ memoryManager.transferBufferOwnership(
+ bufferOwner, getMemoryTierName(), compressedBuffer);
}
currentSubpartitionWriteBuffers[subpartitionIndex]++;
- addFinishedBuffer(finishedBuffer, subpartitionIndex);
+ addFinishedBuffer(compressedBuffer, subpartitionIndex);
return true;
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierFactory.java
index 5c0f72ecea5..bcb36a7fb47 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierFactory.java
@@ -37,9 +37,12 @@ import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProd
import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleDescriptor;
import org.apache.flink.runtime.util.ConfigurationParserUtils;
+import javax.annotation.Nullable;
+
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
+import static
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.buildBufferCompressor;
import static
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.getRemoteTierName;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
@@ -57,9 +60,12 @@ public class RemoteTierFactory implements TierFactory {
private String remoteStoragePath = DEFAULT_REMOTE_STORAGE_BASE_PATH;
+ @Nullable private Configuration conf;
+
@Override
public void setup(Configuration configuration) {
this.bufferSizeBytes =
ConfigurationParserUtils.getPageSize(configuration);
+ this.conf = checkNotNull(configuration);
this.remoteStoragePath =
checkNotNull(
configuration.get(
@@ -115,7 +121,8 @@ public class RemoteTierFactory implements TierFactory {
isBroadcastOnly,
partitionFileWriter,
storageMemoryManager,
- resourceRegistry);
+ resourceRegistry,
+ buildBufferCompressor(bufferSizeBytes, checkNotNull(conf)));
}
@Override
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierProducerAgent.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierProducerAgent.java
index 8444c29fec9..e3cfcb960aa 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierProducerAgent.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierProducerAgent.java
@@ -19,6 +19,7 @@
package
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.remote;
import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.PartitionFileWriter;
@@ -28,6 +29,7 @@ import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProd
import java.util.Arrays;
+import static
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.compressBufferIfPossible;
import static
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.getRemoteTierName;
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -44,6 +46,8 @@ public class RemoteTierProducerAgent implements
TierProducerAgent {
private final int[] currentSubpartitionSegmentWriteBuffers;
+ private final BufferCompressor bufferCompressor;
+
RemoteTierProducerAgent(
TieredStoragePartitionId partitionId,
int numSubpartitions,
@@ -52,7 +56,8 @@ public class RemoteTierProducerAgent implements
TierProducerAgent {
boolean isBroadcastOnly,
PartitionFileWriter partitionFileWriter,
TieredStorageMemoryManager memoryManager,
- TieredStorageResourceRegistry resourceRegistry) {
+ TieredStorageResourceRegistry resourceRegistry,
+ BufferCompressor bufferCompressor) {
checkArgument(
numBytesPerSegment >= bufferSizeBytes,
"One segment should contain at least one buffer.");
@@ -67,6 +72,7 @@ public class RemoteTierProducerAgent implements
TierProducerAgent {
memoryManager,
partitionFileWriter);
this.currentSubpartitionSegmentWriteBuffers = new
int[numSubpartitions];
+ this.bufferCompressor = bufferCompressor;
Arrays.fill(currentSubpartitionSegmentWriteBuffers, 0);
resourceRegistry.registerResource(partitionId,
this::releaseAllResources);
}
@@ -95,11 +101,13 @@ public class RemoteTierProducerAgent implements
TierProducerAgent {
currentSubpartitionSegmentWriteBuffers[subpartitionIndex] = 0;
return false;
}
- if (buffer.isBuffer()) {
- memoryManager.transferBufferOwnership(bufferOwner,
getRemoteTierName(), buffer);
+ Buffer compressedBuffer = compressBufferIfPossible(buffer,
bufferCompressor);
+ if (compressedBuffer.isBuffer()) {
+ memoryManager.transferBufferOwnership(
+ bufferOwner, getRemoteTierName(), compressedBuffer);
}
currentSubpartitionSegmentWriteBuffers[subpartitionIndex]++;
- cacheDataManager.appendBuffer(buffer, subpartitionIndex);
+ cacheDataManager.appendBuffer(compressedBuffer, subpartitionIndex);
return true;
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgentTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgentTest.java
index 7444dcda671..33a3682c0e1 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgentTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgentTest.java
@@ -229,6 +229,7 @@ public class DiskTierProducerAgentTest {
new BatchShuffleReadBufferPool(1, 1),
new ManuallyTriggeredScheduledExecutorService(),
0,
- Duration.ofMinutes(5));
+ Duration.ofMinutes(5),
+ null);
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierProducerAgentTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierProducerAgentTest.java
index 170ce2e018a..873ccc70197 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierProducerAgentTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierProducerAgentTest.java
@@ -121,7 +121,8 @@ class MemoryTierProducerAgentTest {
false,
memoryManager,
nettyService,
- new TieredStorageResourceRegistry())) {
+ new TieredStorageResourceRegistry(),
+ null)) {
memoryTierProducerAgent.connectionEstablished(
SUBPARTITION_ID, new
TestingNettyConnectionWriter.Builder().build());
assertThat(memoryTierProducerAgent.tryStartNewSegment(SUBPARTITION_ID, 0,
0)).isFalse();
@@ -220,6 +221,7 @@ class MemoryTierProducerAgentTest {
isBroadcastOnly,
memoryManager,
nettyService,
- resourceRegistry);
+ resourceRegistry,
+ null);
}
}