wsry commented on a change in pull request #17936:
URL: https://github.com/apache/flink/pull/17936#discussion_r776961949



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java
##########
@@ -121,12 +126,22 @@
 
     SortMergeResultPartitionReadScheduler(
             BatchShuffleReadBufferPool bufferPool, Executor ioExecutor, Object 
lock) {
+        this(bufferPool, ioExecutor, lock, DEFAULT_BUFFER_REQUEST_TIMEOUT);
+    }
+
+    SortMergeResultPartitionReadScheduler(
+            BatchShuffleReadBufferPool bufferPool,
+            Executor ioExecutor,
+            Object lock,
+            Duration bufferRequestTimeout) {
+
         this.lock = checkNotNull(lock);
         this.bufferPool = checkNotNull(bufferPool);
         this.ioExecutor = checkNotNull(ioExecutor);
         // one partition reader can consume at most 32M (the expected buffers 
per request is 8M)
         // buffers for data read. Currently, it is only an empirical value can 
not be configured
         this.maxRequestedBuffers = Math.max(1, 4 * 
bufferPool.getNumBuffersPerRequest());
+        this.bufferRequestTimeout = bufferRequestTimeout;

Review comment:
       nit: checkNotNull

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
##########
@@ -192,6 +203,132 @@ public void testOnReadBufferRequestError() throws 
Exception {
         assertAllResourcesReleased();
     }
 
+    @Test
+    public void testBufferRequestTimeoutWithoutRecycle() throws Exception {
+        occupyBuffers();
+
+        SortMergeSubpartitionReader subpartitionReader =
+                readScheduler.createSubpartitionReader(
+                        new NoOpBufferAvailablityListener(), 0, 
partitionedFile);
+
+        Set<SortMergeSubpartitionReader> allReaders = new HashSet<>();
+        allReaders.add(subpartitionReader);
+        readScheduler.allocateBuffers(new PriorityQueue<>(allReaders));

Review comment:
       nit: maybe use PriorityQueue directly instead of using a set first and 
then changing it to PriorityQueue?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPool.java
##########
@@ -75,6 +75,10 @@
     @GuardedBy("buffers")
     private final Queue<MemorySegment> buffers = new ArrayDeque<>();
 
+    /** The timestamp when the last buffer is recycled or allocated. */
+    @GuardedBy("buffers")
+    private long lastBufferOperationTimestamp = System.nanoTime();

Review comment:
       nit: maybe we can add a test to guard that this value is updated after 
requesting/recycling buffers successfully. If buffer requesting is not 
successful the value should be not updated.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
##########
@@ -192,6 +203,132 @@ public void testOnReadBufferRequestError() throws 
Exception {
         assertAllResourcesReleased();
     }
 
+    @Test
+    public void testBufferRequestTimeoutWithoutRecycle() throws Exception {
+        occupyBuffers();
+
+        SortMergeSubpartitionReader subpartitionReader =
+                readScheduler.createSubpartitionReader(
+                        new NoOpBufferAvailablityListener(), 0, 
partitionedFile);
+
+        Set<SortMergeSubpartitionReader> allReaders = new HashSet<>();
+        allReaders.add(subpartitionReader);
+        readScheduler.allocateBuffers(new PriorityQueue<>(allReaders));
+
+        assertExpectedTimeoutException(subpartitionReader.getFailureCause());
+    }
+
+    @Test
+    public void testBufferRequestSuccessWithRecycle() throws Exception {
+        List<MemorySegment> buffers = occupyBuffers();
+
+        // Recycle the buffer after some time
+        Random random = new Random();
+        int recycleBufferWaitMs = random.nextInt(2999);
+        startBufferRecycleThread(bufferPool, buffers, recycleBufferWaitMs, 
false);

Review comment:
       This may make the test unstable, for example, when the test environment 
is under high burden, this recycle thread may fail to recycle buffers in time. 
Please increase the timeout time to at lease 60s.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
##########
@@ -192,6 +203,132 @@ public void testOnReadBufferRequestError() throws 
Exception {
         assertAllResourcesReleased();
     }
 
+    @Test
+    public void testBufferRequestTimeoutWithoutRecycle() throws Exception {
+        occupyBuffers();
+
+        SortMergeSubpartitionReader subpartitionReader =
+                readScheduler.createSubpartitionReader(
+                        new NoOpBufferAvailablityListener(), 0, 
partitionedFile);
+
+        Set<SortMergeSubpartitionReader> allReaders = new HashSet<>();
+        allReaders.add(subpartitionReader);
+        readScheduler.allocateBuffers(new PriorityQueue<>(allReaders));
+
+        assertExpectedTimeoutException(subpartitionReader.getFailureCause());
+    }
+
+    @Test
+    public void testBufferRequestSuccessWithRecycle() throws Exception {
+        List<MemorySegment> buffers = occupyBuffers();
+
+        // Recycle the buffer after some time
+        Random random = new Random();
+        int recycleBufferWaitMs = random.nextInt(2999);

Review comment:
       nit: this timeout can be reduced to1s to reduce the test time.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
##########
@@ -192,6 +203,132 @@ public void testOnReadBufferRequestError() throws 
Exception {
         assertAllResourcesReleased();
     }
 
+    @Test
+    public void testBufferRequestTimeoutWithoutRecycle() throws Exception {
+        occupyBuffers();
+
+        SortMergeSubpartitionReader subpartitionReader =
+                readScheduler.createSubpartitionReader(
+                        new NoOpBufferAvailablityListener(), 0, 
partitionedFile);
+
+        Set<SortMergeSubpartitionReader> allReaders = new HashSet<>();
+        allReaders.add(subpartitionReader);
+        readScheduler.allocateBuffers(new PriorityQueue<>(allReaders));
+
+        assertExpectedTimeoutException(subpartitionReader.getFailureCause());
+    }
+
+    @Test
+    public void testBufferRequestSuccessWithRecycle() throws Exception {
+        List<MemorySegment> buffers = occupyBuffers();
+
+        // Recycle the buffer after some time
+        Random random = new Random();
+        int recycleBufferWaitMs = random.nextInt(2999);
+        startBufferRecycleThread(bufferPool, buffers, recycleBufferWaitMs, 
false);
+
+        SortMergeSubpartitionReader subpartitionReader =
+                readScheduler.createSubpartitionReader(
+                        new NoOpBufferAvailablityListener(), 0, 
partitionedFile);
+
+        Set<SortMergeSubpartitionReader> allReaders = new HashSet<>();
+        allReaders.add(subpartitionReader);
+        readScheduler.allocateBuffers(new PriorityQueue<>(allReaders));
+
+        assertNull(subpartitionReader.getFailureCause());
+    }
+
+    @Test
+    public void testBufferRequestSuccessAfterRecycle() throws Exception {
+        List<MemorySegment> buffers = occupyBuffers();
+
+        SortMergeSubpartitionReader subpartitionReader =
+                readScheduler.createSubpartitionReader(
+                        new NoOpBufferAvailablityListener(), 0, 
partitionedFile);
+        Set<SortMergeSubpartitionReader> allReaders = new HashSet<>();

Review comment:
       nit: maybe using PriorityQueue directly

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java
##########
@@ -147,21 +162,21 @@ public synchronized void run() {
         removeFinishedAndFailedReaders(numBuffersRead, finishedReaders);
     }
 
-    private Queue<MemorySegment> allocateBuffers(
-            Queue<SortMergeSubpartitionReader> availableReaders) {
+    Queue<MemorySegment> allocateBuffers(Queue<SortMergeSubpartitionReader> 
availableReaders) {

Review comment:
       nit: @VisibleForTesting

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
##########
@@ -192,6 +203,132 @@ public void testOnReadBufferRequestError() throws 
Exception {
         assertAllResourcesReleased();
     }
 
+    @Test
+    public void testBufferRequestTimeoutWithoutRecycle() throws Exception {
+        occupyBuffers();
+
+        SortMergeSubpartitionReader subpartitionReader =
+                readScheduler.createSubpartitionReader(
+                        new NoOpBufferAvailablityListener(), 0, 
partitionedFile);
+
+        Set<SortMergeSubpartitionReader> allReaders = new HashSet<>();
+        allReaders.add(subpartitionReader);
+        readScheduler.allocateBuffers(new PriorityQueue<>(allReaders));
+
+        assertExpectedTimeoutException(subpartitionReader.getFailureCause());
+    }
+
+    @Test
+    public void testBufferRequestSuccessWithRecycle() throws Exception {
+        List<MemorySegment> buffers = occupyBuffers();
+
+        // Recycle the buffer after some time
+        Random random = new Random();
+        int recycleBufferWaitMs = random.nextInt(2999);
+        startBufferRecycleThread(bufferPool, buffers, recycleBufferWaitMs, 
false);
+
+        SortMergeSubpartitionReader subpartitionReader =
+                readScheduler.createSubpartitionReader(
+                        new NoOpBufferAvailablityListener(), 0, 
partitionedFile);
+
+        Set<SortMergeSubpartitionReader> allReaders = new HashSet<>();
+        allReaders.add(subpartitionReader);
+        readScheduler.allocateBuffers(new PriorityQueue<>(allReaders));

Review comment:
       nit: may also check the return value

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
##########
@@ -192,6 +203,132 @@ public void testOnReadBufferRequestError() throws 
Exception {
         assertAllResourcesReleased();
     }
 
+    @Test
+    public void testBufferRequestTimeoutWithoutRecycle() throws Exception {
+        occupyBuffers();
+
+        SortMergeSubpartitionReader subpartitionReader =
+                readScheduler.createSubpartitionReader(
+                        new NoOpBufferAvailablityListener(), 0, 
partitionedFile);
+
+        Set<SortMergeSubpartitionReader> allReaders = new HashSet<>();
+        allReaders.add(subpartitionReader);
+        readScheduler.allocateBuffers(new PriorityQueue<>(allReaders));
+
+        assertExpectedTimeoutException(subpartitionReader.getFailureCause());
+    }
+
+    @Test
+    public void testBufferRequestSuccessWithRecycle() throws Exception {
+        List<MemorySegment> buffers = occupyBuffers();
+
+        // Recycle the buffer after some time
+        Random random = new Random();
+        int recycleBufferWaitMs = random.nextInt(2999);
+        startBufferRecycleThread(bufferPool, buffers, recycleBufferWaitMs, 
false);
+
+        SortMergeSubpartitionReader subpartitionReader =
+                readScheduler.createSubpartitionReader(
+                        new NoOpBufferAvailablityListener(), 0, 
partitionedFile);
+
+        Set<SortMergeSubpartitionReader> allReaders = new HashSet<>();
+        allReaders.add(subpartitionReader);
+        readScheduler.allocateBuffers(new PriorityQueue<>(allReaders));
+
+        assertNull(subpartitionReader.getFailureCause());
+    }
+
+    @Test
+    public void testBufferRequestSuccessAfterRecycle() throws Exception {
+        List<MemorySegment> buffers = occupyBuffers();
+
+        SortMergeSubpartitionReader subpartitionReader =
+                readScheduler.createSubpartitionReader(
+                        new NoOpBufferAvailablityListener(), 0, 
partitionedFile);
+        Set<SortMergeSubpartitionReader> allReaders = new HashSet<>();
+        allReaders.add(subpartitionReader);
+        // Recycle the buffer after 1s
+        startBufferRecycleThread(bufferPool, buffers, 1000, false);
+
+        Thread.sleep(3500);
+
+        readScheduler.allocateBuffers(new PriorityQueue<>(allReaders));
+        assertNull(subpartitionReader.getFailureCause());
+    }
+
+    @Test
+    public void testBufferRequestTimeoutAfterRecycle() throws Exception {
+        List<MemorySegment> buffers = occupyBuffers();
+
+        SortMergeSubpartitionReader subpartitionReader =
+                readScheduler.crateSubpartitionReader(
+                        new NoOpBufferAvailablityListener(), 0, 
partitionedFile);
+        Set<SortMergeSubpartitionReader> allReaders = new HashSet<>();

Review comment:
       nit: maybe using PriorityQueue directly

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
##########
@@ -192,6 +203,132 @@ public void testOnReadBufferRequestError() throws 
Exception {
         assertAllResourcesReleased();
     }
 
+    @Test
+    public void testBufferRequestTimeoutWithoutRecycle() throws Exception {
+        occupyBuffers();
+
+        SortMergeSubpartitionReader subpartitionReader =
+                readScheduler.createSubpartitionReader(
+                        new NoOpBufferAvailablityListener(), 0, 
partitionedFile);
+
+        Set<SortMergeSubpartitionReader> allReaders = new HashSet<>();
+        allReaders.add(subpartitionReader);
+        readScheduler.allocateBuffers(new PriorityQueue<>(allReaders));
+
+        assertExpectedTimeoutException(subpartitionReader.getFailureCause());
+    }
+
+    @Test
+    public void testBufferRequestSuccessWithRecycle() throws Exception {
+        List<MemorySegment> buffers = occupyBuffers();
+
+        // Recycle the buffer after some time
+        Random random = new Random();
+        int recycleBufferWaitMs = random.nextInt(2999);
+        startBufferRecycleThread(bufferPool, buffers, recycleBufferWaitMs, 
false);
+
+        SortMergeSubpartitionReader subpartitionReader =
+                readScheduler.createSubpartitionReader(
+                        new NoOpBufferAvailablityListener(), 0, 
partitionedFile);
+
+        Set<SortMergeSubpartitionReader> allReaders = new HashSet<>();
+        allReaders.add(subpartitionReader);
+        readScheduler.allocateBuffers(new PriorityQueue<>(allReaders));
+
+        assertNull(subpartitionReader.getFailureCause());
+    }
+
+    @Test
+    public void testBufferRequestSuccessAfterRecycle() throws Exception {
+        List<MemorySegment> buffers = occupyBuffers();
+
+        SortMergeSubpartitionReader subpartitionReader =
+                readScheduler.createSubpartitionReader(
+                        new NoOpBufferAvailablityListener(), 0, 
partitionedFile);
+        Set<SortMergeSubpartitionReader> allReaders = new HashSet<>();
+        allReaders.add(subpartitionReader);
+        // Recycle the buffer after 1s
+        startBufferRecycleThread(bufferPool, buffers, 1000, false);
+
+        Thread.sleep(3500);

Review comment:
       nit: reduce this time to for example 100ms to reduce test time

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
##########
@@ -192,6 +203,132 @@ public void testOnReadBufferRequestError() throws 
Exception {
         assertAllResourcesReleased();
     }
 
+    @Test
+    public void testBufferRequestTimeoutWithoutRecycle() throws Exception {
+        occupyBuffers();
+
+        SortMergeSubpartitionReader subpartitionReader =
+                readScheduler.createSubpartitionReader(
+                        new NoOpBufferAvailablityListener(), 0, 
partitionedFile);
+
+        Set<SortMergeSubpartitionReader> allReaders = new HashSet<>();
+        allReaders.add(subpartitionReader);
+        readScheduler.allocateBuffers(new PriorityQueue<>(allReaders));
+
+        assertExpectedTimeoutException(subpartitionReader.getFailureCause());
+    }
+
+    @Test
+    public void testBufferRequestSuccessWithRecycle() throws Exception {
+        List<MemorySegment> buffers = occupyBuffers();
+
+        // Recycle the buffer after some time
+        Random random = new Random();
+        int recycleBufferWaitMs = random.nextInt(2999);
+        startBufferRecycleThread(bufferPool, buffers, recycleBufferWaitMs, 
false);
+
+        SortMergeSubpartitionReader subpartitionReader =
+                readScheduler.createSubpartitionReader(
+                        new NoOpBufferAvailablityListener(), 0, 
partitionedFile);
+
+        Set<SortMergeSubpartitionReader> allReaders = new HashSet<>();
+        allReaders.add(subpartitionReader);
+        readScheduler.allocateBuffers(new PriorityQueue<>(allReaders));
+
+        assertNull(subpartitionReader.getFailureCause());
+    }
+
+    @Test
+    public void testBufferRequestSuccessAfterRecycle() throws Exception {
+        List<MemorySegment> buffers = occupyBuffers();
+
+        SortMergeSubpartitionReader subpartitionReader =
+                readScheduler.createSubpartitionReader(
+                        new NoOpBufferAvailablityListener(), 0, 
partitionedFile);
+        Set<SortMergeSubpartitionReader> allReaders = new HashSet<>();
+        allReaders.add(subpartitionReader);
+        // Recycle the buffer after 1s
+        startBufferRecycleThread(bufferPool, buffers, 1000, false);
+
+        Thread.sleep(3500);
+
+        readScheduler.allocateBuffers(new PriorityQueue<>(allReaders));
+        assertNull(subpartitionReader.getFailureCause());
+    }
+
+    @Test
+    public void testBufferRequestTimeoutAfterRecycle() throws Exception {
+        List<MemorySegment> buffers = occupyBuffers();
+
+        SortMergeSubpartitionReader subpartitionReader =
+                readScheduler.crateSubpartitionReader(
+                        new NoOpBufferAvailablityListener(), 0, 
partitionedFile);
+        Set<SortMergeSubpartitionReader> allReaders = new HashSet<>();
+        allReaders.add(subpartitionReader);
+        // Recycle the buffer after 1s
+        startBufferRecycleThread(bufferPool, buffers, 400, true);
+
+        Thread.sleep(3500);
+
+        readScheduler.allocateBuffers(new PriorityQueue<>(allReaders));
+        assertExpectedTimeoutException(subpartitionReader.getFailureCause());
+    }
+

Review comment:
       I think there is one more case we need to test: after we recycle one 
buffer the timeout time should be refreshed.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
##########
@@ -192,6 +203,132 @@ public void testOnReadBufferRequestError() throws 
Exception {
         assertAllResourcesReleased();
     }
 
+    @Test
+    public void testBufferRequestTimeoutWithoutRecycle() throws Exception {
+        occupyBuffers();
+
+        SortMergeSubpartitionReader subpartitionReader =
+                readScheduler.createSubpartitionReader(
+                        new NoOpBufferAvailablityListener(), 0, 
partitionedFile);
+
+        Set<SortMergeSubpartitionReader> allReaders = new HashSet<>();
+        allReaders.add(subpartitionReader);
+        readScheduler.allocateBuffers(new PriorityQueue<>(allReaders));
+
+        assertExpectedTimeoutException(subpartitionReader.getFailureCause());
+    }
+
+    @Test
+    public void testBufferRequestSuccessWithRecycle() throws Exception {
+        List<MemorySegment> buffers = occupyBuffers();
+
+        // Recycle the buffer after some time
+        Random random = new Random();
+        int recycleBufferWaitMs = random.nextInt(2999);
+        startBufferRecycleThread(bufferPool, buffers, recycleBufferWaitMs, 
false);
+
+        SortMergeSubpartitionReader subpartitionReader =
+                readScheduler.createSubpartitionReader(
+                        new NoOpBufferAvailablityListener(), 0, 
partitionedFile);
+
+        Set<SortMergeSubpartitionReader> allReaders = new HashSet<>();
+        allReaders.add(subpartitionReader);
+        readScheduler.allocateBuffers(new PriorityQueue<>(allReaders));
+
+        assertNull(subpartitionReader.getFailureCause());
+    }
+
+    @Test
+    public void testBufferRequestSuccessAfterRecycle() throws Exception {
+        List<MemorySegment> buffers = occupyBuffers();
+
+        SortMergeSubpartitionReader subpartitionReader =
+                readScheduler.createSubpartitionReader(
+                        new NoOpBufferAvailablityListener(), 0, 
partitionedFile);
+        Set<SortMergeSubpartitionReader> allReaders = new HashSet<>();
+        allReaders.add(subpartitionReader);
+        // Recycle the buffer after 1s
+        startBufferRecycleThread(bufferPool, buffers, 1000, false);
+
+        Thread.sleep(3500);
+
+        readScheduler.allocateBuffers(new PriorityQueue<>(allReaders));

Review comment:
       nit: may also check the return value

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
##########
@@ -192,6 +203,132 @@ public void testOnReadBufferRequestError() throws 
Exception {
         assertAllResourcesReleased();
     }
 
+    @Test
+    public void testBufferRequestTimeoutWithoutRecycle() throws Exception {
+        occupyBuffers();
+
+        SortMergeSubpartitionReader subpartitionReader =
+                readScheduler.createSubpartitionReader(
+                        new NoOpBufferAvailablityListener(), 0, 
partitionedFile);
+
+        Set<SortMergeSubpartitionReader> allReaders = new HashSet<>();
+        allReaders.add(subpartitionReader);
+        readScheduler.allocateBuffers(new PriorityQueue<>(allReaders));
+
+        assertExpectedTimeoutException(subpartitionReader.getFailureCause());
+    }
+
+    @Test
+    public void testBufferRequestSuccessWithRecycle() throws Exception {
+        List<MemorySegment> buffers = occupyBuffers();
+
+        // Recycle the buffer after some time
+        Random random = new Random();
+        int recycleBufferWaitMs = random.nextInt(2999);
+        startBufferRecycleThread(bufferPool, buffers, recycleBufferWaitMs, 
false);
+
+        SortMergeSubpartitionReader subpartitionReader =
+                readScheduler.createSubpartitionReader(
+                        new NoOpBufferAvailablityListener(), 0, 
partitionedFile);
+
+        Set<SortMergeSubpartitionReader> allReaders = new HashSet<>();
+        allReaders.add(subpartitionReader);
+        readScheduler.allocateBuffers(new PriorityQueue<>(allReaders));
+
+        assertNull(subpartitionReader.getFailureCause());
+    }
+
+    @Test
+    public void testBufferRequestSuccessAfterRecycle() throws Exception {
+        List<MemorySegment> buffers = occupyBuffers();
+
+        SortMergeSubpartitionReader subpartitionReader =
+                readScheduler.createSubpartitionReader(
+                        new NoOpBufferAvailablityListener(), 0, 
partitionedFile);
+        Set<SortMergeSubpartitionReader> allReaders = new HashSet<>();
+        allReaders.add(subpartitionReader);
+        // Recycle the buffer after 1s
+        startBufferRecycleThread(bufferPool, buffers, 1000, false);
+
+        Thread.sleep(3500);
+
+        readScheduler.allocateBuffers(new PriorityQueue<>(allReaders));
+        assertNull(subpartitionReader.getFailureCause());
+    }
+
+    @Test
+    public void testBufferRequestTimeoutAfterRecycle() throws Exception {

Review comment:
       The motivation of this test seems unclear, could you please explain more?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
##########
@@ -192,6 +203,132 @@ public void testOnReadBufferRequestError() throws 
Exception {
         assertAllResourcesReleased();
     }
 
+    @Test
+    public void testBufferRequestTimeoutWithoutRecycle() throws Exception {
+        occupyBuffers();
+
+        SortMergeSubpartitionReader subpartitionReader =
+                readScheduler.createSubpartitionReader(
+                        new NoOpBufferAvailablityListener(), 0, 
partitionedFile);
+
+        Set<SortMergeSubpartitionReader> allReaders = new HashSet<>();
+        allReaders.add(subpartitionReader);
+        readScheduler.allocateBuffers(new PriorityQueue<>(allReaders));
+
+        assertExpectedTimeoutException(subpartitionReader.getFailureCause());
+    }
+
+    @Test
+    public void testBufferRequestSuccessWithRecycle() throws Exception {
+        List<MemorySegment> buffers = occupyBuffers();
+
+        // Recycle the buffer after some time
+        Random random = new Random();
+        int recycleBufferWaitMs = random.nextInt(2999);
+        startBufferRecycleThread(bufferPool, buffers, recycleBufferWaitMs, 
false);
+
+        SortMergeSubpartitionReader subpartitionReader =
+                readScheduler.createSubpartitionReader(
+                        new NoOpBufferAvailablityListener(), 0, 
partitionedFile);
+
+        Set<SortMergeSubpartitionReader> allReaders = new HashSet<>();

Review comment:
       nit: maybe using PriorityQueue directly

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
##########
@@ -192,6 +203,132 @@ public void testOnReadBufferRequestError() throws 
Exception {
         assertAllResourcesReleased();
     }
 
+    @Test
+    public void testBufferRequestTimeoutWithoutRecycle() throws Exception {
+        occupyBuffers();
+
+        SortMergeSubpartitionReader subpartitionReader =
+                readScheduler.createSubpartitionReader(
+                        new NoOpBufferAvailablityListener(), 0, 
partitionedFile);
+
+        Set<SortMergeSubpartitionReader> allReaders = new HashSet<>();
+        allReaders.add(subpartitionReader);
+        readScheduler.allocateBuffers(new PriorityQueue<>(allReaders));
+
+        assertExpectedTimeoutException(subpartitionReader.getFailureCause());
+    }
+
+    @Test
+    public void testBufferRequestSuccessWithRecycle() throws Exception {
+        List<MemorySegment> buffers = occupyBuffers();
+
+        // Recycle the buffer after some time
+        Random random = new Random();
+        int recycleBufferWaitMs = random.nextInt(2999);
+        startBufferRecycleThread(bufferPool, buffers, recycleBufferWaitMs, 
false);
+
+        SortMergeSubpartitionReader subpartitionReader =
+                readScheduler.createSubpartitionReader(
+                        new NoOpBufferAvailablityListener(), 0, 
partitionedFile);
+
+        Set<SortMergeSubpartitionReader> allReaders = new HashSet<>();
+        allReaders.add(subpartitionReader);
+        readScheduler.allocateBuffers(new PriorityQueue<>(allReaders));
+
+        assertNull(subpartitionReader.getFailureCause());
+    }
+
+    @Test
+    public void testBufferRequestSuccessAfterRecycle() throws Exception {
+        List<MemorySegment> buffers = occupyBuffers();
+
+        SortMergeSubpartitionReader subpartitionReader =
+                readScheduler.createSubpartitionReader(
+                        new NoOpBufferAvailablityListener(), 0, 
partitionedFile);
+        Set<SortMergeSubpartitionReader> allReaders = new HashSet<>();
+        allReaders.add(subpartitionReader);
+        // Recycle the buffer after 1s
+        startBufferRecycleThread(bufferPool, buffers, 1000, false);

Review comment:
       nit: recycle directly (not waiting 1s) to reduce test time




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to