This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c7f6470bb8c [FLINK-33077][runtime] Minimize the risk of hard 
back-pressure with buffer debloating enabled
c7f6470bb8c is described below

commit c7f6470bb8cc314e7651b03e171af057f4edec1e
Author: Roman Khachatryan <[email protected]>
AuthorDate: Fri Aug 11 13:51:07 2023 +0200

    [FLINK-33077][runtime] Minimize the risk of hard back-pressure with buffer 
debloating enabled
    
    Problem:
    Buffer debloating sets buffer size to 256 bytes because of back-pressure.
    Such small buffers might not be enough to emit the processing results of a 
single record. The task thread would request new buffers, and often block.
    That results in significant checkpoint delays (up to minutes instead of 
seconds).
    
    Adding more overdraft buffers helps, but depends on the job DoP
    Raising taskmanager.memory.min-segment-size from 256 helps, but depends on 
the multiplication factor of the operator.
    
    Solution:
    - Ignore Buffer Debloater hints and extend the buffer if possible - when 
this prevents emitting an output record fully AND this is the last available 
buffer.
    - Prevent the subsequent flush of the buffer so that more output records 
can be emitted (flatMap-like and join operators)
---
 .../partition/BufferWritingResultPartition.java    | 31 +++++++++++++++++++---
 .../io/network/partition/ResultPartitionTest.java  | 21 +++++++++++++++
 2 files changed, 48 insertions(+), 4 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
index c4ced3d7c25..ddfe117cab7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
@@ -295,11 +295,34 @@ public abstract class BufferWritingResultPartition 
extends ResultPartition {
             addToSubpartition(buffer, targetSubpartition, 0, 
record.remaining());
         }
 
-        buffer.appendAndCommit(record);
+        append(record, buffer);
 
         return buffer;
     }
 
+    private int append(ByteBuffer record, BufferBuilder buffer) {
+        // Try to avoid hard back-pressure in the subsequent calls to request 
buffers
+        // by ignoring Buffer Debloater hints and extending the buffer if 
possible (trim).
+        // This decreases the probability of hard back-pressure in cases when
+        // the output size varies significantly and BD suggests too small 
values.
+        // The hint will be re-applied on the next iteration.
+        if (record.remaining() >= buffer.getWritableBytes()) {
+            // This 2nd check is expensive, so it shouldn't be re-ordered.
+            // However, it has the same cost as the subsequent call to request 
buffer, so it doesn't
+            // affect the performance much.
+            if (!bufferPool.isAvailable()) {
+                // add 1 byte to prevent immediately flushing the buffer and 
potentially fit the
+                // next record
+                int newSize =
+                        buffer.getMaxCapacity()
+                                + (record.remaining() - 
buffer.getWritableBytes())
+                                + 1;
+                buffer.trim(Math.max(buffer.getMaxCapacity(), newSize));
+            }
+        }
+        return buffer.appendAndCommit(record);
+    }
+
     private void addToSubpartition(
             BufferBuilder buffer,
             int targetSubpartition,
@@ -339,7 +362,7 @@ public abstract class BufferWritingResultPartition extends 
ResultPartition {
         // starting
         // with a complete record.
         // !! The next two lines can not change order.
-        final int partialRecordBytes = 
buffer.appendAndCommit(remainingRecordBytes);
+        final int partialRecordBytes = append(remainingRecordBytes, buffer);
         addToSubpartition(buffer, targetSubpartition, partialRecordBytes, 
partialRecordBytes);
 
         return buffer;
@@ -354,7 +377,7 @@ public abstract class BufferWritingResultPartition extends 
ResultPartition {
             createBroadcastBufferConsumers(buffer, 0, record.remaining());
         }
 
-        buffer.appendAndCommit(record);
+        append(record, buffer);
 
         return buffer;
     }
@@ -368,7 +391,7 @@ public abstract class BufferWritingResultPartition extends 
ResultPartition {
         // starting
         // with a complete record.
         // !! The next two lines can not change order.
-        final int partialRecordBytes = 
buffer.appendAndCommit(remainingRecordBytes);
+        final int partialRecordBytes = append(remainingRecordBytes, buffer);
         createBroadcastBufferConsumers(buffer, partialRecordBytes, 
partialRecordBytes);
 
         return buffer;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
index af66ea24df8..4422d40cb6b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
@@ -477,6 +477,27 @@ class ResultPartitionTest {
         }
     }
 
+    @Test
+    void testEmitRecordExpandsLastBuffer() throws IOException {
+        int recordSize = 10;
+        int maxBufferSize = 2 * recordSize;
+        // create a pool with just 1 buffer - so that the test times out in 
case of back-pressure
+        NetworkBufferPool globalPool = new NetworkBufferPool(1, maxBufferSize);
+        BufferPool localPool = globalPool.createBufferPool(1, 1, 1, 
Integer.MAX_VALUE, 0);
+        ResultPartition resultPartition =
+                new ResultPartitionBuilder().setBufferPoolFactory(() -> 
localPool).build();
+        resultPartition.setup();
+        // emulate BufferDebloater - and suggest small buffer size
+        resultPartition.createSubpartitionView(0, () -> 
{}).notifyNewBufferSize(1);
+        // need to insert two records: the 1st one expands the buffer 
regardless of back-pressure
+        resultPartition.emitRecord(ByteBuffer.allocate(recordSize), 0);
+        // insert the 2nd record:
+        // - the buffer should still be available for writing after the 
previous record
+        // - it should be resized again to fit the new record fully
+        // - so no new buffer is necessary and there is no back-pressure
+        resultPartition.emitRecord(ByteBuffer.allocate(recordSize), 0);
+    }
+
     @Test
     void testBroadcastRecordWithRecordSpanningMultipleBuffers() throws 
Exception {
         BufferWritingResultPartition bufferWritingResultPartition =

Reply via email to