This is an automated email from the ASF dual-hosted git repository.
dmvk pushed a commit to branch release-1.19
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.19 by this push:
new 92b0e80a34b [FLINK-37783] Auto-disable buffer debloating for hybrid
shuffle.
92b0e80a34b is described below
commit 92b0e80a34baa90a068e182189a4768496a03d67
Author: David Moravek <[email protected]>
AuthorDate: Tue May 13 10:28:07 2025 +0200
[FLINK-37783] Auto-disable buffer debloating for hybrid shuffle.
---
.../io/network/partition/consumer/IndexedInputGate.java | 8 ++++++++
.../io/network/partition/consumer/SingleInputGate.java | 6 +-----
.../flink/runtime/taskmanager/InputGateWithMetrics.java | 6 ++++++
.../apache/flink/streaming/runtime/tasks/StreamTask.java | 8 ++++++++
.../flink/streaming/runtime/io/MockIndexedInputGate.java | 6 ++++++
.../apache/flink/streaming/runtime/io/MockInputGate.java | 6 ++++++
.../checkpointing/AlignedCheckpointsMassiveRandomTest.java | 6 ++++++
.../org/apache/flink/test/runtime/HybridShuffleITCase.java | 14 ++++++++++++++
8 files changed, 55 insertions(+), 5 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/IndexedInputGate.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/IndexedInputGate.java
index bae9755754d..5daa277cd9b 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/IndexedInputGate.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/IndexedInputGate.java
@@ -20,6 +20,7 @@ package
org.apache.flink.runtime.io.network.partition.consumer;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import java.io.IOException;
import java.util.List;
@@ -65,5 +66,12 @@ public abstract class IndexedInputGate extends InputGate
implements Checkpointab
getChannel(channelIndex).convertToPriorityEvent(sequenceNumber);
}
+ /**
+ * Returns the type of this input channel's consumed result partition.
+ *
+ * @return consumed result partition type
+ */
+ public abstract ResultPartitionType getConsumedPartitionType();
+
public abstract void triggerDebloating();
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 354b1cf5241..a1d306a153c 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -471,11 +471,7 @@ public class SingleInputGate extends IndexedInputGate {
return bufferDebloater.getLastEstimatedTimeToConsumeBuffers();
}
- /**
- * Returns the type of this input channel's consumed result partition.
- *
- * @return consumed result partition type
- */
+ @Override
public ResultPartitionType getConsumedPartitionType() {
return consumedPartitionType;
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
index e3f6cfda6a5..31483b89df6 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
@@ -22,6 +22,7 @@ import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.event.TaskEvent;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
@@ -85,6 +86,11 @@ public class InputGateWithMetrics extends IndexedInputGate {
return inputGate.getUnfinishedChannels();
}
+ @Override
+ public ResultPartitionType getConsumedPartitionType() {
+ return inputGate.getConsumedPartitionType();
+ }
+
@Override
public void triggerDebloating() {
inputGate.triggerDebloating();
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index a5f26dbc0e5..3fa90d8a76e 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -56,6 +56,7 @@ import
org.apache.flink.runtime.io.network.api.writer.RecordWriterDelegate;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.api.writer.SingleRecordWriter;
import org.apache.flink.runtime.io.network.partition.ChannelStateHolder;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -865,12 +866,19 @@ public abstract class StreamTask<OUT, OP extends
StreamOperator<OUT>>
}
private void scheduleBufferDebloater() {
+ // Hybrid shuffle currently can't adjust buffer size.
+ final boolean isHybridShuffle =
+ Arrays.stream(getEnvironment().getAllInputGates())
+ .map(IndexedInputGate::getConsumedPartitionType)
+
.anyMatch(ResultPartitionType::isHybridResultPartition);
+
// See https://issues.apache.org/jira/browse/FLINK-23560
// If there are no input gates, there is no point of calculating the
throughput and running
// the debloater. At the same time, for SourceStreamTask using legacy
sources and checkpoint
// lock, enqueuing even a single mailbox action can cause performance
regression. This is
// especially visible in batch, with disabled checkpointing and no
processing time timers.
if (getEnvironment().getAllInputGates().length == 0
+ || isHybridShuffle
|| !environment
.getTaskManagerInfo()
.getConfiguration()
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockIndexedInputGate.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockIndexedInputGate.java
index ae4721c3484..635510893a1 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockIndexedInputGate.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockIndexedInputGate.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.io;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.event.TaskEvent;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
@@ -126,6 +127,11 @@ public class MockIndexedInputGate extends IndexedInputGate
{
return Collections.emptyList();
}
+ @Override
+ public ResultPartitionType getConsumedPartitionType() {
+ return ResultPartitionType.PIPELINED;
+ }
+
@Override
public void triggerDebloating() {}
}
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
index 1c095d66119..a7746b67cad 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.io;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
@@ -163,6 +164,11 @@ public class MockInputGate extends IndexedInputGate {
}
}
+ @Override
+ public ResultPartitionType getConsumedPartitionType() {
+ return ResultPartitionType.PIPELINED;
+ }
+
@Override
public void triggerDebloating() {}
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlignedCheckpointsMassiveRandomTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlignedCheckpointsMassiveRandomTest.java
index 42086fa8812..940ec70f363 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlignedCheckpointsMassiveRandomTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlignedCheckpointsMassiveRandomTest.java
@@ -24,6 +24,7 @@ import
org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
@@ -240,6 +241,11 @@ public class AlignedCheckpointsMassiveRandomTest {
@Override
public void checkpointStopped(long cancelledCheckpointId) {}
+ @Override
+ public ResultPartitionType getConsumedPartitionType() {
+ return ResultPartitionType.PIPELINED;
+ }
+
@Override
public void triggerDebloating() {}
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/runtime/HybridShuffleITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/runtime/HybridShuffleITCase.java
index 9b606b03826..702a475e2b0 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/runtime/HybridShuffleITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/runtime/HybridShuffleITCase.java
@@ -31,6 +31,7 @@ import
org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
+import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
@@ -91,6 +92,19 @@ class HybridShuffleITCase extends BatchShuffleITCaseBase {
executeJob(jobGraph, configuration, numRecordsToSend);
}
+ @TestTemplate
+ public void testAutoDisableBufferDebloat() throws Exception {
+ final int numRecordsToSend = 1_000_000;
+ Configuration configuration =
configureHybridOptions(getConfiguration(), false);
+ configuration.set(TaskManagerOptions.BUFFER_DEBLOAT_ENABLED, true);
+ configuration.set(TaskManagerOptions.BUFFER_DEBLOAT_PERIOD,
Duration.ofMillis(1));
+
+ JobGraph jobGraph =
+ createJobGraph(
+ numRecordsToSend, false, configuration,
enableAdaptiveAutoParallelism);
+ executeJob(jobGraph, configuration, numRecordsToSend);
+ }
+
private Configuration configureHybridOptions(Configuration configuration,
boolean isSelective) {
BatchShuffleMode shuffleMode =
isSelective