This is an automated email from the ASF dual-hosted git repository.
roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new c7f6470bb8c [FLINK-33077][runtime] Minimize the risk of hard
back-pressure with buffer debloating enabled
c7f6470bb8c is described below
commit c7f6470bb8cc314e7651b03e171af057f4edec1e
Author: Roman Khachatryan <[email protected]>
AuthorDate: Fri Aug 11 13:51:07 2023 +0200
[FLINK-33077][runtime] Minimize the risk of hard back-pressure with buffer
debloating enabled
Problem:
Buffer debloating sets buffer size to 256 bytes because of back-pressure.
Such small buffers might not be enough to emit the processing results of a
single record. The task thread would request new buffers, and often block.
That results in significant checkpoint delays (up to minutes instead of
seconds).
Adding more overdraft buffers helps, but depends on the job DoP
Raising taskmanager.memory.min-segment-size from 256 helps, but depends on
the multiplication factor of the operator.
Solution:
- Ignore Buffer Debloater hints and extend the buffer if possible - when
this prevents emitting an output record fully AND this is the last available
buffer.
- Prevent the subsequent flush of the buffer so that more output records
can be emitted (flatMap-like and join operators)
---
.../partition/BufferWritingResultPartition.java | 31 +++++++++++++++++++---
.../io/network/partition/ResultPartitionTest.java | 21 +++++++++++++++
2 files changed, 48 insertions(+), 4 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
index c4ced3d7c25..ddfe117cab7 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
@@ -295,11 +295,34 @@ public abstract class BufferWritingResultPartition
extends ResultPartition {
addToSubpartition(buffer, targetSubpartition, 0,
record.remaining());
}
- buffer.appendAndCommit(record);
+ append(record, buffer);
return buffer;
}
+ private int append(ByteBuffer record, BufferBuilder buffer) {
+ // Try to avoid hard back-pressure in the subsequent calls to request
buffers
+ // by ignoring Buffer Debloater hints and extending the buffer if
possible (trim).
+ // This decreases the probability of hard back-pressure in cases when
+ // the output size varies significantly and BD suggests too small
values.
+ // The hint will be re-applied on the next iteration.
+ if (record.remaining() >= buffer.getWritableBytes()) {
+ // This 2nd check is expensive, so it shouldn't be re-ordered.
+ // However, it has the same cost as the subsequent call to request
buffer, so it doesn't
+ // affect the performance much.
+ if (!bufferPool.isAvailable()) {
+ // add 1 byte to prevent immediately flushing the buffer and
potentially fit the
+ // next record
+ int newSize =
+ buffer.getMaxCapacity()
+ + (record.remaining() -
buffer.getWritableBytes())
+ + 1;
+ buffer.trim(Math.max(buffer.getMaxCapacity(), newSize));
+ }
+ }
+ return buffer.appendAndCommit(record);
+ }
+
private void addToSubpartition(
BufferBuilder buffer,
int targetSubpartition,
@@ -339,7 +362,7 @@ public abstract class BufferWritingResultPartition extends
ResultPartition {
// starting
// with a complete record.
// !! The next two lines can not change order.
- final int partialRecordBytes =
buffer.appendAndCommit(remainingRecordBytes);
+ final int partialRecordBytes = append(remainingRecordBytes, buffer);
addToSubpartition(buffer, targetSubpartition, partialRecordBytes,
partialRecordBytes);
return buffer;
@@ -354,7 +377,7 @@ public abstract class BufferWritingResultPartition extends
ResultPartition {
createBroadcastBufferConsumers(buffer, 0, record.remaining());
}
- buffer.appendAndCommit(record);
+ append(record, buffer);
return buffer;
}
@@ -368,7 +391,7 @@ public abstract class BufferWritingResultPartition extends
ResultPartition {
// starting
// with a complete record.
// !! The next two lines can not change order.
- final int partialRecordBytes =
buffer.appendAndCommit(remainingRecordBytes);
+ final int partialRecordBytes = append(remainingRecordBytes, buffer);
createBroadcastBufferConsumers(buffer, partialRecordBytes,
partialRecordBytes);
return buffer;
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
index af66ea24df8..4422d40cb6b 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
@@ -477,6 +477,27 @@ class ResultPartitionTest {
}
}
+ @Test
+ void testEmitRecordExpandsLastBuffer() throws IOException {
+ int recordSize = 10;
+ int maxBufferSize = 2 * recordSize;
+ // create a pool with just 1 buffer - so that the test times out in
case of back-pressure
+ NetworkBufferPool globalPool = new NetworkBufferPool(1, maxBufferSize);
+ BufferPool localPool = globalPool.createBufferPool(1, 1, 1,
Integer.MAX_VALUE, 0);
+ ResultPartition resultPartition =
+ new ResultPartitionBuilder().setBufferPoolFactory(() ->
localPool).build();
+ resultPartition.setup();
+ // emulate BufferDebloater - and suggest small buffer size
+ resultPartition.createSubpartitionView(0, () ->
{}).notifyNewBufferSize(1);
+ // need to insert two records: the 1st one expands the buffer
regardless of back-pressure
+ resultPartition.emitRecord(ByteBuffer.allocate(recordSize), 0);
+ // insert the 2nd record:
+ // - the buffer should still be available for writing after the
previous record
+ // - it should be resized again to fit the new record fully
+ // - so no new buffer is necessary and there is no back-pressure
+ resultPartition.emitRecord(ByteBuffer.allocate(recordSize), 0);
+ }
+
@Test
void testBroadcastRecordWithRecordSpanningMultipleBuffers() throws
Exception {
BufferWritingResultPartition bufferWritingResultPartition =