This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit a63b7dd35bdb1aaacd7b622d0c694ee06334759e Author: fanrui <[email protected]> AuthorDate: Wed Jun 29 11:35:58 2022 +0800 [FLINK-27789][network] Disable overdraft buffer for LegacySource --- .../network/api/writer/MultipleRecordWriters.java | 7 ++++++ .../io/network/api/writer/NonRecordWriter.java | 3 +++ .../io/network/api/writer/RecordWriter.java | 5 +++++ .../network/api/writer/RecordWriterDelegate.java | 3 +++ .../network/api/writer/ResultPartitionWriter.java | 3 +++ .../io/network/api/writer/SingleRecordWriter.java | 5 +++++ .../runtime/io/network/buffer/BufferPool.java | 6 +++++ .../runtime/io/network/buffer/LocalBufferPool.java | 10 ++++++++- .../io/network/partition/ResultPartition.java | 4 ++++ .../runtime/io/network/buffer/NoOpBufferPool.java | 10 +++++++++ .../io/network/buffer/UnpooledBufferPool.java | 8 +++++++ .../partition/MockResultPartitionWriter.java | 3 +++ .../streaming/runtime/tasks/SourceStreamTask.java | 1 + .../flink/streaming/runtime/tasks/StreamTask.java | 2 +- .../runtime/tasks/SourceStreamTaskTest.java | 26 ++++++++++++++++++++++ 15 files changed, 94 insertions(+), 2 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/MultipleRecordWriters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/MultipleRecordWriters.java index cfcf702eb3c..9b0f99e55a6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/MultipleRecordWriters.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/MultipleRecordWriters.java @@ -70,6 +70,13 @@ public class MultipleRecordWriters<T extends IOReadableWritable> return recordWriters.get(outputIndex); } + @Override + public void setMaxOverdraftBuffersPerGate(int maxOverdraftBuffersPerGate) { + for (RecordWriter<T> recordWriter : recordWriters) { + recordWriter.setMaxOverdraftBuffersPerGate(maxOverdraftBuffersPerGate); + } + } + @Override public CompletableFuture<?> getAvailableFuture() { for (int i = 0; i < futures.length; i++) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/NonRecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/NonRecordWriter.java index 84a58a61b0c..63783a66e89 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/NonRecordWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/NonRecordWriter.java @@ -37,6 +37,9 @@ public class NonRecordWriter<T extends IOReadableWritable> implements RecordWrit throw new UnsupportedOperationException("No record writer instance."); } + @Override + public void setMaxOverdraftBuffersPerGate(int maxOverdraftBuffersPerGate) {} + @Override public CompletableFuture<?> getAvailableFuture() { throw new UnsupportedOperationException("No record writer instance."); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java index 9d5f351ef89..57f0a800e37 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java @@ -217,6 +217,11 @@ public abstract class RecordWriter<T extends IOReadableWritable> implements Avai } } + /** Sets the max overdraft buffer size of per gate. */ + public void setMaxOverdraftBuffersPerGate(int maxOverdraftBuffersPerGate) { + targetPartition.setMaxOverdraftBuffersPerGate(maxOverdraftBuffersPerGate); + } + // ------------------------------------------------------------------------ /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegate.java index f2d99939cd6..4b4790c7db8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegate.java @@ -45,4 +45,7 @@ public interface RecordWriterDelegate<T extends IOReadableWritable> * @param outputIndex the index respective to the record writer instance. */ RecordWriter<T> getRecordWriter(int outputIndex); + + /** Sets the max overdraft buffer size of per gate. */ + void setMaxOverdraftBuffersPerGate(int maxOverdraftBuffersPerGate); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java index 9a505722f6c..80774198c5c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java @@ -53,6 +53,9 @@ public interface ResultPartitionWriter extends AutoCloseable, AvailabilityProvid int getNumTargetKeyGroups(); + /** Sets the max overdraft buffer size of per gate. */ + void setMaxOverdraftBuffersPerGate(int maxOverdraftBuffersPerGate); + /** Writes the given serialized record to the target subpartition. */ void emitRecord(ByteBuffer record, int targetSubpartition) throws IOException; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/SingleRecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/SingleRecordWriter.java index bd51cb5f1ca..e1cc51ea959 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/SingleRecordWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/SingleRecordWriter.java @@ -50,6 +50,11 @@ public class SingleRecordWriter<T extends IOReadableWritable> implements RecordW return recordWriter; } + @Override + public void setMaxOverdraftBuffersPerGate(int maxOverdraftBuffersPerGate) { + recordWriter.setMaxOverdraftBuffersPerGate(maxOverdraftBuffersPerGate); + } + @Override public CompletableFuture<?> getAvailableFuture() { return recordWriter.getAvailableFuture(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java index 8474bf8ae78..c574607e28e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java @@ -64,6 +64,12 @@ public interface BufferPool extends BufferProvider, BufferRecycler { */ void setNumBuffers(int numBuffers); + /** Sets the max overdraft buffer size of per gate. */ + void setMaxOverdraftBuffersPerGate(int maxOverdraftBuffersPerGate); + + /** Returns the max overdraft buffer size of per gate. */ + int getMaxOverdraftBuffersPerGate(); + /** Returns the number memory segments, which are currently held by this buffer pool. */ int getNumberOfAvailableMemorySegments(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java index a5c54d3ea7a..e5bfc087982 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java @@ -119,7 +119,7 @@ class LocalBufferPool implements BufferPool { @GuardedBy("availableMemorySegments") private int unavailableSubpartitionsCount = 0; - private final int maxOverdraftBuffersPerGate; + private int maxOverdraftBuffersPerGate; @GuardedBy("availableMemorySegments") private int numberOfRequestedOverdraftMemorySegments; @@ -665,6 +665,14 @@ class LocalBufferPool implements BufferPool { mayNotifyAvailable(toNotify); } + public void setMaxOverdraftBuffersPerGate(int maxOverdraftBuffersPerGate) { + this.maxOverdraftBuffersPerGate = maxOverdraftBuffersPerGate; + } + + public int getMaxOverdraftBuffersPerGate() { + return maxOverdraftBuffersPerGate; + } + @Override public CompletableFuture<?> getAvailableFuture() { return availabilityHelper.getAvailableFuture(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java index 09b9582f0d0..ccdf8f4a138 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java @@ -192,6 +192,10 @@ public abstract class ResultPartition implements ResultPartitionWriter { /** Returns the number of queued buffers of the given target subpartition. */ public abstract int getNumberOfQueuedBuffers(int targetSubpartition); + public void setMaxOverdraftBuffersPerGate(int maxOverdraftBuffersPerGate) { + this.bufferPool.setMaxOverdraftBuffersPerGate(maxOverdraftBuffersPerGate); + } + /** * Returns the type of this result partition. * diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NoOpBufferPool.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NoOpBufferPool.java index bdd6b4b5276..d4ab33fa053 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NoOpBufferPool.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NoOpBufferPool.java @@ -99,6 +99,16 @@ public class NoOpBufferPool implements BufferPool { throw new UnsupportedOperationException(); } + @Override + public void setMaxOverdraftBuffersPerGate(int maxOverdraftBuffersPerGate) { + throw new UnsupportedOperationException(); + } + + @Override + public int getMaxOverdraftBuffersPerGate() { + throw new UnsupportedOperationException(); + } + @Override public int getNumberOfAvailableMemorySegments() { throw new UnsupportedOperationException(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/UnpooledBufferPool.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/UnpooledBufferPool.java index eb4d01986b8..0ff07afe5f2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/UnpooledBufferPool.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/UnpooledBufferPool.java @@ -100,6 +100,14 @@ public class UnpooledBufferPool implements BufferPool { throw new UnsupportedOperationException(); } + @Override + public void setMaxOverdraftBuffersPerGate(int maxOverdraftBuffersPerGate) {} + + @Override + public int getMaxOverdraftBuffersPerGate() { + return 0; + } + @Override public int getNumberOfAvailableMemorySegments() { return Integer.MAX_VALUE; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockResultPartitionWriter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockResultPartitionWriter.java index 25ee8161ba4..99d87c8c066 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockResultPartitionWriter.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockResultPartitionWriter.java @@ -53,6 +53,9 @@ public class MockResultPartitionWriter implements ResultPartitionWriter { return 1; } + @Override + public void setMaxOverdraftBuffersPerGate(int maxOverdraftBuffersPerGate) {} + @Override public void emitRecord(ByteBuffer record, int targetSubpartition) throws IOException {} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java index 0e5bdd049c0..04cb9dcca65 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java @@ -164,6 +164,7 @@ public class SourceStreamTask< .gauge( MetricNames.CHECKPOINT_START_DELAY_TIME, this::getAsyncCheckpointStartDelayNanos); + recordWriter.setMaxOverdraftBuffersPerGate(0); } @Override diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index add4174135d..55a2f527a10 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -272,7 +272,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> /** Thread pool for async snapshot workers. */ private final ExecutorService asyncOperationsThreadPool; - private final RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriter; + protected final RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriter; protected final MailboxProcessor mailboxProcessor; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java index d16c5abc2a4..3778a90eb6d 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java @@ -95,6 +95,8 @@ import static org.apache.flink.streaming.runtime.tasks.StreamTaskFinalCheckpoint import static org.apache.flink.util.Preconditions.checkState; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.fail; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; /** * These tests verify that the RichFunction methods are called (in correct order). And that @@ -598,6 +600,30 @@ class SourceStreamTaskTest extends SourceStreamTaskTestBase { } } + @Test + void testDisableOverdraftBuffer() throws Exception { + try (NettyShuffleEnvironment env = + new NettyShuffleEnvironmentBuilder().setNumNetworkBuffers(2).build(); + ResultPartition partitionWriter = + PartitionTestUtils.createPartition( + env, ResultPartitionType.PIPELINED_BOUNDED, 1)) { + partitionWriter.setup(); + assertTrue(partitionWriter.getBufferPool().getMaxOverdraftBuffersPerGate() > 0); + + final CompletableFuture<Long> checkpointCompleted = new CompletableFuture<>(); + try (StreamTaskMailboxTestHarness<String> testHarness = + new StreamTaskMailboxTestHarnessBuilder<>( + SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO) + .addAdditionalOutput(partitionWriter) + .setupOperatorChain(new StreamSource<>(new MockSource(0, 0, 1))) + .finishForSingletonOperatorChain(StringSerializer.INSTANCE) + .build()) { + + assertEquals(0, partitionWriter.getBufferPool().getMaxOverdraftBuffersPerGate()); + } + } + } + @Test void testClosedOnRestoreSourceSkipExecution() throws Exception { LifeCycleMonitorSource testSource = new LifeCycleMonitorSource();
