wsry commented on a change in pull request #17936:
URL: https://github.com/apache/flink/pull/17936#discussion_r767578165



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java
##########
@@ -119,6 +119,12 @@
     @GuardedBy("lock")
     private volatile boolean isReleased;
 
+    /** Number of buffers recycled in the last loop. */
+    private long lastNumRecycledBuffers;

Review comment:
       I  guess this can be a local variable.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java
##########
@@ -119,6 +119,12 @@
     @GuardedBy("lock")
     private volatile boolean isReleased;
 
+    /** Number of buffers recycled in the last loop. */
+    private long lastNumRecycledBuffers;
+
+    /** Deadline of requesting buffers. */
+    private Deadline requestBufferDeadline;

Review comment:
       I think we can remove this filed and pass BUFFER_REQUEST_TIMEOUT in the 
constructor.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java
##########
@@ -154,8 +160,9 @@ public synchronized void run() {
         }
 
         try {
-            Deadline deadline = Deadline.fromNow(BUFFER_REQUEST_TIMEOUT);
-            while (deadline.hasTimeLeft()) {
+            requestBufferDeadline = Deadline.fromNow(BUFFER_REQUEST_TIMEOUT);
+            lastNumRecycledBuffers = bufferPool.getNumRecycledBuffers();
+            while (getBufferRequestDeadline().hasTimeLeft()) {

Review comment:
       I guess this logic may double the timeout time to 10min in the worst 
case. We need to reset the timeout when a buffer is recycled. For example, we 
can add a lastBufferRecycleTimestamp in the buffer pool and use 
(lastBufferRecycleTimestamp + 5min) as deadline. WDYT?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java
##########
@@ -178,6 +185,23 @@ public synchronized void run() {
         return new ArrayDeque<>();
     }
 
+    @VisibleForTesting
+    void setRequestBufferDeadline(Deadline deadline) {
+        this.requestBufferDeadline = deadline;
+    }
+
+    @VisibleForTesting
+    Deadline getBufferRequestDeadline() {

Review comment:
       I guess we can make it private, we can just check the end to end time to 
ensure it works.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
##########
@@ -456,7 +456,7 @@ public ResultSubpartitionView createSubpartitionView(
             checkState(!isReleased(), "Partition released.");
             checkState(isFinished(), "Trying to read unfinished blocking 
partition.");
 
-            return readScheduler.crateSubpartitionReader(

Review comment:
       Thanks for also fixing this typo. I think the typo fix can be a 
separated hotfix commit.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
##########
@@ -192,6 +194,53 @@ public void testOnReadBufferRequestError() throws 
Exception {
         assertAllResourcesReleased();
     }
 
+    @Test
+    public void testIncreaseDeadlineWhenRecycleBuffer() throws Exception {
+        testRequestDeadlineInternal(true);
+        assertTrue(readScheduler.getBufferRequestDeadline().hasTimeLeft());

Review comment:
       I guess we can make allocateBuffers package private (visible for 
testing) and verify the timeout exception. Only verify the deadline without 
verify the end result seems a little weak. BTW, in this test, verifying the 
deadline still has left may make the test unstable in the slow test environment.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
##########
@@ -192,6 +194,53 @@ public void testOnReadBufferRequestError() throws 
Exception {
         assertAllResourcesReleased();
     }
 
+    @Test
+    public void testIncreaseDeadlineWhenRecycleBuffer() throws Exception {
+        testRequestDeadlineInternal(true);
+        assertTrue(readScheduler.getBufferRequestDeadline().hasTimeLeft());
+    }
+
+    @Test
+    public void testDeadlineTimeoutWhenNoRecycleBuffer() throws Exception {
+        testRequestDeadlineInternal(false);
+        assertFalse(readScheduler.getBufferRequestDeadline().hasTimeLeft());
+    }
+
+    private void testRequestDeadlineInternal(boolean needTriggerRecycleBuffer) 
throws Exception {
+        
readScheduler.setRequestBufferDeadline(Deadline.fromNow(Duration.ofMillis(500)));
+        assertTrue(readScheduler.getBufferRequestDeadline().hasTimeLeft());
+
+        readScheduler.run();
+        // Waiting for the deadline timeout
+        Thread.sleep(800);
+
+        if (needTriggerRecycleBuffer) {
+            // If at least one buffer is recycled, the deadline will be 
increased
+            triggerRecycleBuffer();
+        }
+    }
+
+    private void triggerRecycleBuffer() throws Exception {

Review comment:
       We can just request and recycle buffers directly to the buffer pool 
which can make the logic less complicated.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
##########
@@ -192,6 +194,53 @@ public void testOnReadBufferRequestError() throws 
Exception {
         assertAllResourcesReleased();
     }
 
+    @Test
+    public void testIncreaseDeadlineWhenRecycleBuffer() throws Exception {

Review comment:
       About the test, I think we can test at least 3 cases:
   1. buffer request timeout without buffer recycle.
   2. buffer request timeout refreshed on buffer recycle.
   3. buffer request timeout after buffer recycle.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java
##########
@@ -178,6 +185,23 @@ public synchronized void run() {
         return new ArrayDeque<>();
     }
 
+    @VisibleForTesting
+    void setRequestBufferDeadline(Deadline deadline) {

Review comment:
       After removing the requestBufferDeadline filed, I think we can also 
remove this method.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to