TanYuxin-tyx commented on a change in pull request #17936:
URL: https://github.com/apache/flink/pull/17936#discussion_r780038225



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
##########
@@ -192,6 +203,132 @@ public void testOnReadBufferRequestError() throws 
Exception {
         assertAllResourcesReleased();
     }
 
+    @Test
+    public void testBufferRequestTimeoutWithoutRecycle() throws Exception {
+        occupyBuffers();
+
+        SortMergeSubpartitionReader subpartitionReader =
+                readScheduler.createSubpartitionReader(
+                        new NoOpBufferAvailablityListener(), 0, 
partitionedFile);
+
+        Set<SortMergeSubpartitionReader> allReaders = new HashSet<>();
+        allReaders.add(subpartitionReader);
+        readScheduler.allocateBuffers(new PriorityQueue<>(allReaders));
+
+        assertExpectedTimeoutException(subpartitionReader.getFailureCause());
+    }
+
+    @Test
+    public void testBufferRequestSuccessWithRecycle() throws Exception {
+        List<MemorySegment> buffers = occupyBuffers();
+
+        // Recycle the buffer after some time
+        Random random = new Random();
+        int recycleBufferWaitMs = random.nextInt(2999);
+        startBufferRecycleThread(bufferPool, buffers, recycleBufferWaitMs, 
false);
+
+        SortMergeSubpartitionReader subpartitionReader =
+                readScheduler.createSubpartitionReader(
+                        new NoOpBufferAvailablityListener(), 0, 
partitionedFile);
+
+        Set<SortMergeSubpartitionReader> allReaders = new HashSet<>();
+        allReaders.add(subpartitionReader);
+        readScheduler.allocateBuffers(new PriorityQueue<>(allReaders));
+
+        assertNull(subpartitionReader.getFailureCause());
+    }
+
+    @Test
+    public void testBufferRequestSuccessAfterRecycle() throws Exception {
+        List<MemorySegment> buffers = occupyBuffers();
+
+        SortMergeSubpartitionReader subpartitionReader =
+                readScheduler.createSubpartitionReader(
+                        new NoOpBufferAvailablityListener(), 0, 
partitionedFile);
+        Set<SortMergeSubpartitionReader> allReaders = new HashSet<>();
+        allReaders.add(subpartitionReader);
+        // Recycle the buffer after 1s
+        startBufferRecycleThread(bufferPool, buffers, 1000, false);
+
+        Thread.sleep(3500);

Review comment:
       The request timeout is set to 3s in 
`testRequestTimeoutIsRefreshedAndSuccess`. Because the inner sleep time in the 
method `BatchShuffleReadBufferPool#requestBuffers` is 2s, I think the timeout 
time should be greater than that value. 
   
   After the request timeout is set to 3s, the `firstRecycleTimeMs` is set to 
2s and the `secondRecycleTimeMs` is set to 4s to prove the timeout time is 
refreshed and increased when buffers are recycled.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
##########
@@ -192,6 +203,132 @@ public void testOnReadBufferRequestError() throws 
Exception {
         assertAllResourcesReleased();
     }
 
+    @Test
+    public void testBufferRequestTimeoutWithoutRecycle() throws Exception {
+        occupyBuffers();
+
+        SortMergeSubpartitionReader subpartitionReader =
+                readScheduler.createSubpartitionReader(
+                        new NoOpBufferAvailablityListener(), 0, 
partitionedFile);
+
+        Set<SortMergeSubpartitionReader> allReaders = new HashSet<>();
+        allReaders.add(subpartitionReader);
+        readScheduler.allocateBuffers(new PriorityQueue<>(allReaders));
+
+        assertExpectedTimeoutException(subpartitionReader.getFailureCause());
+    }
+
+    @Test
+    public void testBufferRequestSuccessWithRecycle() throws Exception {
+        List<MemorySegment> buffers = occupyBuffers();
+
+        // Recycle the buffer after some time
+        Random random = new Random();
+        int recycleBufferWaitMs = random.nextInt(2999);
+        startBufferRecycleThread(bufferPool, buffers, recycleBufferWaitMs, 
false);
+
+        SortMergeSubpartitionReader subpartitionReader =
+                readScheduler.createSubpartitionReader(
+                        new NoOpBufferAvailablityListener(), 0, 
partitionedFile);
+
+        Set<SortMergeSubpartitionReader> allReaders = new HashSet<>();
+        allReaders.add(subpartitionReader);
+        readScheduler.allocateBuffers(new PriorityQueue<>(allReaders));
+
+        assertNull(subpartitionReader.getFailureCause());
+    }
+
+    @Test
+    public void testBufferRequestSuccessAfterRecycle() throws Exception {
+        List<MemorySegment> buffers = occupyBuffers();
+
+        SortMergeSubpartitionReader subpartitionReader =
+                readScheduler.createSubpartitionReader(
+                        new NoOpBufferAvailablityListener(), 0, 
partitionedFile);
+        Set<SortMergeSubpartitionReader> allReaders = new HashSet<>();
+        allReaders.add(subpartitionReader);
+        // Recycle the buffer after 1s
+        startBufferRecycleThread(bufferPool, buffers, 1000, false);
+
+        Thread.sleep(3500);
+
+        readScheduler.allocateBuffers(new PriorityQueue<>(allReaders));

Review comment:
       Fixed.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
##########
@@ -192,6 +203,132 @@ public void testOnReadBufferRequestError() throws 
Exception {
         assertAllResourcesReleased();
     }
 
+    @Test
+    public void testBufferRequestTimeoutWithoutRecycle() throws Exception {
+        occupyBuffers();
+
+        SortMergeSubpartitionReader subpartitionReader =
+                readScheduler.createSubpartitionReader(
+                        new NoOpBufferAvailablityListener(), 0, 
partitionedFile);
+
+        Set<SortMergeSubpartitionReader> allReaders = new HashSet<>();
+        allReaders.add(subpartitionReader);
+        readScheduler.allocateBuffers(new PriorityQueue<>(allReaders));
+
+        assertExpectedTimeoutException(subpartitionReader.getFailureCause());
+    }
+
+    @Test
+    public void testBufferRequestSuccessWithRecycle() throws Exception {
+        List<MemorySegment> buffers = occupyBuffers();
+
+        // Recycle the buffer after some time
+        Random random = new Random();
+        int recycleBufferWaitMs = random.nextInt(2999);
+        startBufferRecycleThread(bufferPool, buffers, recycleBufferWaitMs, 
false);
+
+        SortMergeSubpartitionReader subpartitionReader =
+                readScheduler.createSubpartitionReader(
+                        new NoOpBufferAvailablityListener(), 0, 
partitionedFile);
+
+        Set<SortMergeSubpartitionReader> allReaders = new HashSet<>();
+        allReaders.add(subpartitionReader);
+        readScheduler.allocateBuffers(new PriorityQueue<>(allReaders));
+
+        assertNull(subpartitionReader.getFailureCause());
+    }
+
+    @Test
+    public void testBufferRequestSuccessAfterRecycle() throws Exception {
+        List<MemorySegment> buffers = occupyBuffers();
+
+        SortMergeSubpartitionReader subpartitionReader =
+                readScheduler.createSubpartitionReader(
+                        new NoOpBufferAvailablityListener(), 0, 
partitionedFile);
+        Set<SortMergeSubpartitionReader> allReaders = new HashSet<>();
+        allReaders.add(subpartitionReader);
+        // Recycle the buffer after 1s
+        startBufferRecycleThread(bufferPool, buffers, 1000, false);
+
+        Thread.sleep(3500);
+
+        readScheduler.allocateBuffers(new PriorityQueue<>(allReaders));
+        assertNull(subpartitionReader.getFailureCause());
+    }
+
+    @Test
+    public void testBufferRequestTimeoutAfterRecycle() throws Exception {
+        List<MemorySegment> buffers = occupyBuffers();
+
+        SortMergeSubpartitionReader subpartitionReader =
+                readScheduler.crateSubpartitionReader(
+                        new NoOpBufferAvailablityListener(), 0, 
partitionedFile);
+        Set<SortMergeSubpartitionReader> allReaders = new HashSet<>();

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to