This is an automated email from the ASF dual-hosted git repository.

dmvk pushed a commit to branch release-1.19
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.19 by this push:
     new 92b0e80a34b [FLINK-37783] Auto-disable buffer debloating for hybrid 
shuffle.
92b0e80a34b is described below

commit 92b0e80a34baa90a068e182189a4768496a03d67
Author: David Moravek <[email protected]>
AuthorDate: Tue May 13 10:28:07 2025 +0200

    [FLINK-37783] Auto-disable buffer debloating for hybrid shuffle.
---
 .../io/network/partition/consumer/IndexedInputGate.java    |  8 ++++++++
 .../io/network/partition/consumer/SingleInputGate.java     |  6 +-----
 .../flink/runtime/taskmanager/InputGateWithMetrics.java    |  6 ++++++
 .../apache/flink/streaming/runtime/tasks/StreamTask.java   |  8 ++++++++
 .../flink/streaming/runtime/io/MockIndexedInputGate.java   |  6 ++++++
 .../apache/flink/streaming/runtime/io/MockInputGate.java   |  6 ++++++
 .../checkpointing/AlignedCheckpointsMassiveRandomTest.java |  6 ++++++
 .../org/apache/flink/test/runtime/HybridShuffleITCase.java | 14 ++++++++++++++
 8 files changed, 55 insertions(+), 5 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/IndexedInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/IndexedInputGate.java
index bae9755754d..5daa277cd9b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/IndexedInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/IndexedInputGate.java
@@ -20,6 +20,7 @@ package 
org.apache.flink.runtime.io.network.partition.consumer;
 import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 
 import java.io.IOException;
 import java.util.List;
@@ -65,5 +66,12 @@ public abstract class IndexedInputGate extends InputGate 
implements Checkpointab
         getChannel(channelIndex).convertToPriorityEvent(sequenceNumber);
     }
 
+    /**
+     * Returns the type of this input channel's consumed result partition.
+     *
+     * @return consumed result partition type
+     */
+    public abstract ResultPartitionType getConsumedPartitionType();
+
     public abstract void triggerDebloating();
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 354b1cf5241..a1d306a153c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -471,11 +471,7 @@ public class SingleInputGate extends IndexedInputGate {
         return bufferDebloater.getLastEstimatedTimeToConsumeBuffers();
     }
 
-    /**
-     * Returns the type of this input channel's consumed result partition.
-     *
-     * @return consumed result partition type
-     */
+    @Override
     public ResultPartitionType getConsumedPartitionType() {
         return consumedPartitionType;
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
index e3f6cfda6a5..31483b89df6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
@@ -22,6 +22,7 @@ import org.apache.flink.metrics.Counter;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
 import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
 import org.apache.flink.runtime.event.TaskEvent;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
@@ -85,6 +86,11 @@ public class InputGateWithMetrics extends IndexedInputGate {
         return inputGate.getUnfinishedChannels();
     }
 
+    @Override
+    public ResultPartitionType getConsumedPartitionType() {
+        return inputGate.getConsumedPartitionType();
+    }
+
     @Override
     public void triggerDebloating() {
         inputGate.triggerDebloating();
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index a5f26dbc0e5..3fa90d8a76e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -56,6 +56,7 @@ import 
org.apache.flink.runtime.io.network.api.writer.RecordWriterDelegate;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.api.writer.SingleRecordWriter;
 import org.apache.flink.runtime.io.network.partition.ChannelStateHolder;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -865,12 +866,19 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
     }
 
     private void scheduleBufferDebloater() {
+        // Hybrid shuffle currently can't adjust buffer size.
+        final boolean isHybridShuffle =
+                Arrays.stream(getEnvironment().getAllInputGates())
+                        .map(IndexedInputGate::getConsumedPartitionType)
+                        
.anyMatch(ResultPartitionType::isHybridResultPartition);
+
         // See https://issues.apache.org/jira/browse/FLINK-23560
         // If there are no input gates, there is no point of calculating the 
throughput and running
         // the debloater. At the same time, for SourceStreamTask using legacy 
sources and checkpoint
         // lock, enqueuing even a single mailbox action can cause performance 
regression. This is
         // especially visible in batch, with disabled checkpointing and no 
processing time timers.
         if (getEnvironment().getAllInputGates().length == 0
+                || isHybridShuffle
                 || !environment
                         .getTaskManagerInfo()
                         .getConfiguration()
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockIndexedInputGate.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockIndexedInputGate.java
index ae4721c3484..635510893a1 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockIndexedInputGate.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockIndexedInputGate.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.io;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
 import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
 import org.apache.flink.runtime.event.TaskEvent;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
@@ -126,6 +127,11 @@ public class MockIndexedInputGate extends IndexedInputGate 
{
         return Collections.emptyList();
     }
 
+    @Override
+    public ResultPartitionType getConsumedPartitionType() {
+        return ResultPartitionType.PIPELINED;
+    }
+
     @Override
     public void triggerDebloating() {}
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
index 1c095d66119..a7746b67cad 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.io;
 import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
@@ -163,6 +164,11 @@ public class MockInputGate extends IndexedInputGate {
         }
     }
 
+    @Override
+    public ResultPartitionType getConsumedPartitionType() {
+        return ResultPartitionType.PIPELINED;
+    }
+
     @Override
     public void triggerDebloating() {}
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlignedCheckpointsMassiveRandomTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlignedCheckpointsMassiveRandomTest.java
index 42086fa8812..940ec70f363 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlignedCheckpointsMassiveRandomTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlignedCheckpointsMassiveRandomTest.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
@@ -240,6 +241,11 @@ public class AlignedCheckpointsMassiveRandomTest {
         @Override
         public void checkpointStopped(long cancelledCheckpointId) {}
 
+        @Override
+        public ResultPartitionType getConsumedPartitionType() {
+            return ResultPartitionType.PIPELINED;
+        }
+
         @Override
         public void triggerDebloating() {}
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/HybridShuffleITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/HybridShuffleITCase.java
index 9b606b03826..702a475e2b0 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/HybridShuffleITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/HybridShuffleITCase.java
@@ -31,6 +31,7 @@ import 
org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
 import org.junit.jupiter.api.TestTemplate;
 import org.junit.jupiter.api.extension.ExtendWith;
 
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collection;
 
@@ -91,6 +92,19 @@ class HybridShuffleITCase extends BatchShuffleITCaseBase {
         executeJob(jobGraph, configuration, numRecordsToSend);
     }
 
+    @TestTemplate
+    public void testAutoDisableBufferDebloat() throws Exception {
+        final int numRecordsToSend = 1_000_000;
+        Configuration configuration = 
configureHybridOptions(getConfiguration(), false);
+        configuration.set(TaskManagerOptions.BUFFER_DEBLOAT_ENABLED, true);
+        configuration.set(TaskManagerOptions.BUFFER_DEBLOAT_PERIOD, 
Duration.ofMillis(1));
+
+        JobGraph jobGraph =
+                createJobGraph(
+                        numRecordsToSend, false, configuration, 
enableAdaptiveAutoParallelism);
+        executeJob(jobGraph, configuration, numRecordsToSend);
+    }
+
     private Configuration configureHybridOptions(Configuration configuration, 
boolean isSelective) {
         BatchShuffleMode shuffleMode =
                 isSelective

Reply via email to