This is an automated email from the ASF dual-hosted git repository.
dmvk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 960e3b752c0 [FLINK-37783] Auto-disable buffer debloating for hybrid
shuffle.
960e3b752c0 is described below
commit 960e3b752c075cf5800a813991ad6ade27b657de
Author: David Moravek <[email protected]>
AuthorDate: Tue May 13 10:28:07 2025 +0200
[FLINK-37783] Auto-disable buffer debloating for hybrid shuffle.
---
.../io/network/partition/consumer/IndexedInputGate.java | 8 ++++++++
.../io/network/partition/consumer/SingleInputGate.java | 6 +-----
.../flink/runtime/taskmanager/InputGateWithMetrics.java | 6 ++++++
.../apache/flink/streaming/runtime/tasks/StreamTask.java | 8 ++++++++
.../flink/streaming/runtime/io/MockIndexedInputGate.java | 6 ++++++
.../apache/flink/streaming/runtime/io/MockInputGate.java | 6 ++++++
.../checkpointing/AlignedCheckpointsMassiveRandomTest.java | 6 ++++++
.../org/apache/flink/test/runtime/HybridShuffleITCase.java | 14 ++++++++++++++
8 files changed, 55 insertions(+), 5 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/IndexedInputGate.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/IndexedInputGate.java
index bae9755754d..5daa277cd9b 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/IndexedInputGate.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/IndexedInputGate.java
@@ -20,6 +20,7 @@ package
org.apache.flink.runtime.io.network.partition.consumer;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import java.io.IOException;
import java.util.List;
@@ -65,5 +66,12 @@ public abstract class IndexedInputGate extends InputGate
implements Checkpointab
getChannel(channelIndex).convertToPriorityEvent(sequenceNumber);
}
+ /**
+ * Returns the type of this input channel's consumed result partition.
+ *
+ * @return consumed result partition type
+ */
+ public abstract ResultPartitionType getConsumedPartitionType();
+
public abstract void triggerDebloating();
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index eb8543f7f60..a0bbb4e7b9e 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -475,11 +475,7 @@ public class SingleInputGate extends IndexedInputGate {
return bufferDebloater.getLastEstimatedTimeToConsumeBuffers();
}
- /**
- * Returns the type of this input channel's consumed result partition.
- *
- * @return consumed result partition type
- */
+ @Override
public ResultPartitionType getConsumedPartitionType() {
return consumedPartitionType;
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
index 122e36ed6a7..828d8e2ca3c 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
@@ -22,6 +22,7 @@ import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.event.TaskEvent;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
@@ -89,6 +90,11 @@ public class InputGateWithMetrics extends IndexedInputGate {
return inputGate.getUnfinishedChannels();
}
+ @Override
+ public ResultPartitionType getConsumedPartitionType() {
+ return inputGate.getConsumedPartitionType();
+ }
+
@Override
public void triggerDebloating() {
inputGate.triggerDebloating();
diff --git
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index b99061bfe90..73858e75ecc 100644
---
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -58,6 +58,7 @@ import
org.apache.flink.runtime.io.network.api.writer.RecordWriterDelegate;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.api.writer.SingleRecordWriter;
import org.apache.flink.runtime.io.network.partition.ChannelStateHolder;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -923,12 +924,19 @@ public abstract class StreamTask<OUT, OP extends
StreamOperator<OUT>>
}
private void scheduleBufferDebloater() {
+ // Hybrid shuffle currently can't adjust buffer size.
+ final boolean isHybridShuffle =
+ Arrays.stream(getEnvironment().getAllInputGates())
+ .map(IndexedInputGate::getConsumedPartitionType)
+
.anyMatch(ResultPartitionType::isHybridResultPartition);
+
// See https://issues.apache.org/jira/browse/FLINK-23560
// If there are no input gates, there is no point of calculating the
throughput and running
// the debloater. At the same time, for SourceStreamTask using legacy
sources and checkpoint
// lock, enqueuing even a single mailbox action can cause performance
regression. This is
// especially visible in batch, with disabled checkpointing and no
processing time timers.
if (getEnvironment().getAllInputGates().length == 0
+ || isHybridShuffle
|| !environment
.getTaskManagerInfo()
.getConfiguration()
diff --git
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/MockIndexedInputGate.java
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/MockIndexedInputGate.java
index 7415dcc306e..e0b2a3f8bfa 100644
---
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/MockIndexedInputGate.java
+++
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/MockIndexedInputGate.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.io;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.event.TaskEvent;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
@@ -129,6 +130,11 @@ public class MockIndexedInputGate extends IndexedInputGate
{
return Collections.emptyList();
}
+ @Override
+ public ResultPartitionType getConsumedPartitionType() {
+ return ResultPartitionType.PIPELINED;
+ }
+
@Override
public void triggerDebloating() {}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
index da920ef252c..fb9b3c88e07 100644
---
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
+++
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.io;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
@@ -169,6 +170,11 @@ public class MockInputGate extends IndexedInputGate {
}
}
+ @Override
+ public ResultPartitionType getConsumedPartitionType() {
+ return ResultPartitionType.PIPELINED;
+ }
+
@Override
public void triggerDebloating() {}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlignedCheckpointsMassiveRandomTest.java
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlignedCheckpointsMassiveRandomTest.java
index 1cf37f0afc5..16a30fe3cbb 100644
---
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlignedCheckpointsMassiveRandomTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlignedCheckpointsMassiveRandomTest.java
@@ -24,6 +24,7 @@ import
org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
@@ -243,6 +244,11 @@ class AlignedCheckpointsMassiveRandomTest {
@Override
public void checkpointStopped(long cancelledCheckpointId) {}
+ @Override
+ public ResultPartitionType getConsumedPartitionType() {
+ return ResultPartitionType.PIPELINED;
+ }
+
@Override
public void triggerDebloating() {}
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/runtime/HybridShuffleITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/runtime/HybridShuffleITCase.java
index 9f438ea8f6d..048627226de 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/runtime/HybridShuffleITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/runtime/HybridShuffleITCase.java
@@ -30,6 +30,7 @@ import
org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
+import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
@@ -83,6 +84,19 @@ class HybridShuffleITCase extends BatchShuffleITCaseBase {
executeJob(jobGraph, configuration, numRecordsToSend);
}
+ @TestTemplate
+ public void testAutoDisableBufferDebloat() throws Exception {
+ final int numRecordsToSend = 1_000_000;
+ Configuration configuration =
configureHybridOptions(getConfiguration(), false);
+ configuration.set(TaskManagerOptions.BUFFER_DEBLOAT_ENABLED, true);
+ configuration.set(TaskManagerOptions.BUFFER_DEBLOAT_PERIOD,
Duration.ofMillis(1));
+
+ JobGraph jobGraph =
+ createJobGraph(
+ numRecordsToSend, false, configuration,
enableAdaptiveAutoParallelism);
+ executeJob(jobGraph, configuration, numRecordsToSend);
+ }
+
private Configuration configureHybridOptions(Configuration configuration,
boolean isSelective) {
BatchShuffleMode shuffleMode =
isSelective