This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a63b7dd35bdb1aaacd7b622d0c694ee06334759e
Author: fanrui <[email protected]>
AuthorDate: Wed Jun 29 11:35:58 2022 +0800

    [FLINK-27789][network] Disable overdraft buffer for LegacySource
---
 .../network/api/writer/MultipleRecordWriters.java  |  7 ++++++
 .../io/network/api/writer/NonRecordWriter.java     |  3 +++
 .../io/network/api/writer/RecordWriter.java        |  5 +++++
 .../network/api/writer/RecordWriterDelegate.java   |  3 +++
 .../network/api/writer/ResultPartitionWriter.java  |  3 +++
 .../io/network/api/writer/SingleRecordWriter.java  |  5 +++++
 .../runtime/io/network/buffer/BufferPool.java      |  6 +++++
 .../runtime/io/network/buffer/LocalBufferPool.java | 10 ++++++++-
 .../io/network/partition/ResultPartition.java      |  4 ++++
 .../runtime/io/network/buffer/NoOpBufferPool.java  | 10 +++++++++
 .../io/network/buffer/UnpooledBufferPool.java      |  8 +++++++
 .../partition/MockResultPartitionWriter.java       |  3 +++
 .../streaming/runtime/tasks/SourceStreamTask.java  |  1 +
 .../flink/streaming/runtime/tasks/StreamTask.java  |  2 +-
 .../runtime/tasks/SourceStreamTaskTest.java        | 26 ++++++++++++++++++++++
 15 files changed, 94 insertions(+), 2 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/MultipleRecordWriters.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/MultipleRecordWriters.java
index cfcf702eb3c..9b0f99e55a6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/MultipleRecordWriters.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/MultipleRecordWriters.java
@@ -70,6 +70,13 @@ public class MultipleRecordWriters<T extends 
IOReadableWritable>
         return recordWriters.get(outputIndex);
     }
 
+    @Override
+    public void setMaxOverdraftBuffersPerGate(int maxOverdraftBuffersPerGate) {
+        for (RecordWriter<T> recordWriter : recordWriters) {
+            
recordWriter.setMaxOverdraftBuffersPerGate(maxOverdraftBuffersPerGate);
+        }
+    }
+
     @Override
     public CompletableFuture<?> getAvailableFuture() {
         for (int i = 0; i < futures.length; i++) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/NonRecordWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/NonRecordWriter.java
index 84a58a61b0c..63783a66e89 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/NonRecordWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/NonRecordWriter.java
@@ -37,6 +37,9 @@ public class NonRecordWriter<T extends IOReadableWritable> 
implements RecordWrit
         throw new UnsupportedOperationException("No record writer instance.");
     }
 
+    @Override
+    public void setMaxOverdraftBuffersPerGate(int maxOverdraftBuffersPerGate) 
{}
+
     @Override
     public CompletableFuture<?> getAvailableFuture() {
         throw new UnsupportedOperationException("No record writer instance.");
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index 9d5f351ef89..57f0a800e37 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -217,6 +217,11 @@ public abstract class RecordWriter<T extends 
IOReadableWritable> implements Avai
         }
     }
 
+    /** Sets the max overdraft buffer size of per gate. */
+    public void setMaxOverdraftBuffersPerGate(int maxOverdraftBuffersPerGate) {
+        
targetPartition.setMaxOverdraftBuffersPerGate(maxOverdraftBuffersPerGate);
+    }
+
     // ------------------------------------------------------------------------
 
     /**
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegate.java
index f2d99939cd6..4b4790c7db8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegate.java
@@ -45,4 +45,7 @@ public interface RecordWriterDelegate<T extends 
IOReadableWritable>
      * @param outputIndex the index respective to the record writer instance.
      */
     RecordWriter<T> getRecordWriter(int outputIndex);
+
+    /** Sets the max overdraft buffer size of per gate. */
+    void setMaxOverdraftBuffersPerGate(int maxOverdraftBuffersPerGate);
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
index 9a505722f6c..80774198c5c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
@@ -53,6 +53,9 @@ public interface ResultPartitionWriter extends AutoCloseable, 
AvailabilityProvid
 
     int getNumTargetKeyGroups();
 
+    /** Sets the max overdraft buffer size of per gate. */
+    void setMaxOverdraftBuffersPerGate(int maxOverdraftBuffersPerGate);
+
     /** Writes the given serialized record to the target subpartition. */
     void emitRecord(ByteBuffer record, int targetSubpartition) throws 
IOException;
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/SingleRecordWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/SingleRecordWriter.java
index bd51cb5f1ca..e1cc51ea959 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/SingleRecordWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/SingleRecordWriter.java
@@ -50,6 +50,11 @@ public class SingleRecordWriter<T extends 
IOReadableWritable> implements RecordW
         return recordWriter;
     }
 
+    @Override
+    public void setMaxOverdraftBuffersPerGate(int maxOverdraftBuffersPerGate) {
+        recordWriter.setMaxOverdraftBuffersPerGate(maxOverdraftBuffersPerGate);
+    }
+
     @Override
     public CompletableFuture<?> getAvailableFuture() {
         return recordWriter.getAvailableFuture();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java
index 8474bf8ae78..c574607e28e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java
@@ -64,6 +64,12 @@ public interface BufferPool extends BufferProvider, 
BufferRecycler {
      */
     void setNumBuffers(int numBuffers);
 
+    /** Sets the max overdraft buffer size of per gate. */
+    void setMaxOverdraftBuffersPerGate(int maxOverdraftBuffersPerGate);
+
+    /** Returns the max overdraft buffer size of per gate. */
+    int getMaxOverdraftBuffersPerGate();
+
     /** Returns the number memory segments, which are currently held by this 
buffer pool. */
     int getNumberOfAvailableMemorySegments();
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
index a5c54d3ea7a..e5bfc087982 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
@@ -119,7 +119,7 @@ class LocalBufferPool implements BufferPool {
     @GuardedBy("availableMemorySegments")
     private int unavailableSubpartitionsCount = 0;
 
-    private final int maxOverdraftBuffersPerGate;
+    private int maxOverdraftBuffersPerGate;
 
     @GuardedBy("availableMemorySegments")
     private int numberOfRequestedOverdraftMemorySegments;
@@ -665,6 +665,14 @@ class LocalBufferPool implements BufferPool {
         mayNotifyAvailable(toNotify);
     }
 
+    public void setMaxOverdraftBuffersPerGate(int maxOverdraftBuffersPerGate) {
+        this.maxOverdraftBuffersPerGate = maxOverdraftBuffersPerGate;
+    }
+
+    public int getMaxOverdraftBuffersPerGate() {
+        return maxOverdraftBuffersPerGate;
+    }
+
     @Override
     public CompletableFuture<?> getAvailableFuture() {
         return availabilityHelper.getAvailableFuture();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index 09b9582f0d0..ccdf8f4a138 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -192,6 +192,10 @@ public abstract class ResultPartition implements 
ResultPartitionWriter {
     /** Returns the number of queued buffers of the given target subpartition. 
*/
     public abstract int getNumberOfQueuedBuffers(int targetSubpartition);
 
+    public void setMaxOverdraftBuffersPerGate(int maxOverdraftBuffersPerGate) {
+        
this.bufferPool.setMaxOverdraftBuffersPerGate(maxOverdraftBuffersPerGate);
+    }
+
     /**
      * Returns the type of this result partition.
      *
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NoOpBufferPool.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NoOpBufferPool.java
index bdd6b4b5276..d4ab33fa053 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NoOpBufferPool.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NoOpBufferPool.java
@@ -99,6 +99,16 @@ public class NoOpBufferPool implements BufferPool {
         throw new UnsupportedOperationException();
     }
 
+    @Override
+    public void setMaxOverdraftBuffersPerGate(int maxOverdraftBuffersPerGate) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public int getMaxOverdraftBuffersPerGate() {
+        throw new UnsupportedOperationException();
+    }
+
     @Override
     public int getNumberOfAvailableMemorySegments() {
         throw new UnsupportedOperationException();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/UnpooledBufferPool.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/UnpooledBufferPool.java
index eb4d01986b8..0ff07afe5f2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/UnpooledBufferPool.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/UnpooledBufferPool.java
@@ -100,6 +100,14 @@ public class UnpooledBufferPool implements BufferPool {
         throw new UnsupportedOperationException();
     }
 
+    @Override
+    public void setMaxOverdraftBuffersPerGate(int maxOverdraftBuffersPerGate) 
{}
+
+    @Override
+    public int getMaxOverdraftBuffersPerGate() {
+        return 0;
+    }
+
     @Override
     public int getNumberOfAvailableMemorySegments() {
         return Integer.MAX_VALUE;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockResultPartitionWriter.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockResultPartitionWriter.java
index 25ee8161ba4..99d87c8c066 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockResultPartitionWriter.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockResultPartitionWriter.java
@@ -53,6 +53,9 @@ public class MockResultPartitionWriter implements 
ResultPartitionWriter {
         return 1;
     }
 
+    @Override
+    public void setMaxOverdraftBuffersPerGate(int maxOverdraftBuffersPerGate) 
{}
+
     @Override
     public void emitRecord(ByteBuffer record, int targetSubpartition) throws 
IOException {}
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index 0e5bdd049c0..04cb9dcca65 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -164,6 +164,7 @@ public class SourceStreamTask<
                 .gauge(
                         MetricNames.CHECKPOINT_START_DELAY_TIME,
                         this::getAsyncCheckpointStartDelayNanos);
+        recordWriter.setMaxOverdraftBuffersPerGate(0);
     }
 
     @Override
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index add4174135d..55a2f527a10 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -272,7 +272,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
     /** Thread pool for async snapshot workers. */
     private final ExecutorService asyncOperationsThreadPool;
 
-    private final 
RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriter;
+    protected final 
RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriter;
 
     protected final MailboxProcessor mailboxProcessor;
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index d16c5abc2a4..3778a90eb6d 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -95,6 +95,8 @@ import static 
org.apache.flink.streaming.runtime.tasks.StreamTaskFinalCheckpoint
 import static org.apache.flink.util.Preconditions.checkState;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.fail;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
  * These tests verify that the RichFunction methods are called (in correct 
order). And that
@@ -598,6 +600,30 @@ class SourceStreamTaskTest extends 
SourceStreamTaskTestBase {
         }
     }
 
+    @Test
+    void testDisableOverdraftBuffer() throws Exception {
+        try (NettyShuffleEnvironment env =
+                        new 
NettyShuffleEnvironmentBuilder().setNumNetworkBuffers(2).build();
+                ResultPartition partitionWriter =
+                        PartitionTestUtils.createPartition(
+                                env, ResultPartitionType.PIPELINED_BOUNDED, 
1)) {
+            partitionWriter.setup();
+            
assertTrue(partitionWriter.getBufferPool().getMaxOverdraftBuffersPerGate() > 0);
+
+            final CompletableFuture<Long> checkpointCompleted = new 
CompletableFuture<>();
+            try (StreamTaskMailboxTestHarness<String> testHarness =
+                    new StreamTaskMailboxTestHarnessBuilder<>(
+                                    SourceStreamTask::new, 
BasicTypeInfo.STRING_TYPE_INFO)
+                            .addAdditionalOutput(partitionWriter)
+                            .setupOperatorChain(new StreamSource<>(new 
MockSource(0, 0, 1)))
+                            
.finishForSingletonOperatorChain(StringSerializer.INSTANCE)
+                            .build()) {
+
+                assertEquals(0, 
partitionWriter.getBufferPool().getMaxOverdraftBuffersPerGate());
+            }
+        }
+    }
+
     @Test
     void testClosedOnRestoreSourceSkipExecution() throws Exception {
         LifeCycleMonitorSource testSource = new LifeCycleMonitorSource();

Reply via email to