This is an automated email from the ASF dual-hosted git repository.

yingjie pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 936f1d76b3cf494cc98c0c81bb5acfe5b438d93d
Author: Yuxin Tan <[email protected]>
AuthorDate: Fri Nov 26 16:46:55 2021 +0800

    [FLINK-24954][network] Refresh read buffer request timeout on buffer 
recycling/requesting for sort-shuffle
    
    The implementation of read buffer request timeout for sorting shuffle is a 
little aggressive. When a running task encounters data skew or the task is 
slow, a timeout exception may be triggered. To improve this situation, when at 
least one buffer is recycled or allocated, the buffer request timeout should be 
refreshed to avoid throwing a timeout exception.
    
    This closes #17936.
---
 .../io/disk/BatchShuffleReadBufferPool.java        |  12 +++
 .../SortMergeResultPartitionReadScheduler.java     |  38 ++++++--
 .../io/disk/BatchShuffleReadBufferPoolTest.java    |  28 ++++++
 .../SortMergeResultPartitionReadSchedulerTest.java | 107 +++++++++++++++++++++
 4 files changed, 176 insertions(+), 9 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPool.java
index 1000a44..7b69936 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPool.java
@@ -75,6 +75,10 @@ public class BatchShuffleReadBufferPool {
     @GuardedBy("buffers")
     private final Queue<MemorySegment> buffers = new ArrayDeque<>();
 
+    /** The timestamp when the last buffer is recycled or allocated. */
+    @GuardedBy("buffers")
+    private long lastBufferOperationTimestamp = System.nanoTime();
+
     /** Whether this buffer pool has been destroyed or not. */
     @GuardedBy("buffers")
     private boolean destroyed;
@@ -203,6 +207,7 @@ public class BatchShuffleReadBufferPool {
             while (allocated.size() < numBuffersPerRequest) {
                 allocated.add(buffers.poll());
             }
+            lastBufferOperationTimestamp = System.nanoTime();
         }
         return allocated;
     }
@@ -236,12 +241,19 @@ public class BatchShuffleReadBufferPool {
             }
 
             buffers.addAll(segments);
+            lastBufferOperationTimestamp = System.nanoTime();
             if (buffers.size() >= numBuffersPerRequest) {
                 buffers.notifyAll();
             }
         }
     }
 
+    public long getLastBufferOperationTimestamp() {
+        synchronized (buffers) {
+            return lastBufferOperationTimestamp;
+        }
+    }
+
     /** Destroys this buffer pool and after which, no buffer can be allocated 
any more. */
     public void destroy() {
         synchronized (buffers) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java
index a55a0c0..7551dc8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
@@ -65,10 +64,10 @@ class SortMergeResultPartitionReadScheduler implements 
Runnable, BufferRecycler
             
LoggerFactory.getLogger(SortMergeResultPartitionReadScheduler.class);
 
     /**
-     * Maximum time (5min) to wait when requesting read buffers from the 
buffer pool before throwing
-     * an exception.
+     * Default maximum time (5min) to wait when requesting read buffers from 
the buffer pool before
+     * throwing an exception.
      */
-    private static final Duration BUFFER_REQUEST_TIMEOUT = 
Duration.ofMinutes(5);
+    private static final Duration DEFAULT_BUFFER_REQUEST_TIMEOUT = 
Duration.ofMinutes(5);
 
     /** Lock used to synchronize multi-thread access to thread-unsafe fields. 
*/
     private final Object lock;
@@ -88,6 +87,12 @@ class SortMergeResultPartitionReadScheduler implements 
Runnable, BufferRecycler
     /** Maximum number of buffers can be allocated by this partition reader. */
     private final int maxRequestedBuffers;
 
+    /**
+     * Maximum time to wait when requesting read buffers from the buffer pool 
before throwing an
+     * exception.
+     */
+    private final Duration bufferRequestTimeout;
+
     /** All failed subpartition readers to be released. */
     @GuardedBy("lock")
     private final Set<SortMergeSubpartitionReader> failedReaders = new 
HashSet<>();
@@ -121,12 +126,22 @@ class SortMergeResultPartitionReadScheduler implements 
Runnable, BufferRecycler
 
     SortMergeResultPartitionReadScheduler(
             BatchShuffleReadBufferPool bufferPool, Executor ioExecutor, Object 
lock) {
+        this(bufferPool, ioExecutor, lock, DEFAULT_BUFFER_REQUEST_TIMEOUT);
+    }
+
+    SortMergeResultPartitionReadScheduler(
+            BatchShuffleReadBufferPool bufferPool,
+            Executor ioExecutor,
+            Object lock,
+            Duration bufferRequestTimeout) {
+
         this.lock = checkNotNull(lock);
         this.bufferPool = checkNotNull(bufferPool);
         this.ioExecutor = checkNotNull(ioExecutor);
         // one partition reader can consume at most 32M (the expected buffers 
per request is 8M)
         // buffers for data read. Currently, it is only an empirical value can 
not be configured
         this.maxRequestedBuffers = Math.max(1, 4 * 
bufferPool.getNumBuffersPerRequest());
+        this.bufferRequestTimeout = checkNotNull(bufferRequestTimeout);
 
         // initialize the buffer pool eagerly to avoid reporting errors like 
OOM too late
         bufferPool.initialize();
@@ -147,21 +162,22 @@ class SortMergeResultPartitionReadScheduler implements 
Runnable, BufferRecycler
         removeFinishedAndFailedReaders(numBuffersRead, finishedReaders);
     }
 
-    private Queue<MemorySegment> allocateBuffers(
-            Queue<SortMergeSubpartitionReader> availableReaders) {
+    @VisibleForTesting
+    Queue<MemorySegment> allocateBuffers(Queue<SortMergeSubpartitionReader> 
availableReaders) {
         if (availableReaders.isEmpty()) {
             return new ArrayDeque<>();
         }
 
         try {
-            Deadline deadline = Deadline.fromNow(BUFFER_REQUEST_TIMEOUT);
-            while (deadline.hasTimeLeft()) {
+            long timeoutTime = getBufferRequestTimeoutTime();
+            do {
                 List<MemorySegment> buffers = bufferPool.requestBuffers();
                 if (!buffers.isEmpty()) {
                     return new ArrayDeque<>(buffers);
                 }
                 checkState(!isReleased, "Result partition has been already 
released.");
-            }
+            } while (System.nanoTime() < timeoutTime
+                    || System.nanoTime() < (timeoutTime = 
getBufferRequestTimeoutTime()));
 
             if (numRequestedBuffers <= 0) {
                 throw new TimeoutException(
@@ -178,6 +194,10 @@ class SortMergeResultPartitionReadScheduler implements 
Runnable, BufferRecycler
         return new ArrayDeque<>();
     }
 
+    private long getBufferRequestTimeoutTime() {
+        return bufferPool.getLastBufferOperationTimestamp() + 
bufferRequestTimeout.toNanos();
+    }
+
     private void releaseBuffers(Queue<MemorySegment> buffers) {
         if (!buffers.isEmpty()) {
             try {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPoolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPoolTest.java
index 52823f0..6a3654b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPoolTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPoolTest.java
@@ -99,6 +99,34 @@ public class BatchShuffleReadBufferPoolTest {
     }
 
     @Test
+    public void testBufferOperationTimestampUpdated() throws Exception {
+        BatchShuffleReadBufferPool bufferPool = new 
BatchShuffleReadBufferPool(1024, 1024);
+        long oldTimestamp = bufferPool.getLastBufferOperationTimestamp();
+        Thread.sleep(100);
+        List<MemorySegment> buffers = bufferPool.requestBuffers();
+        assertEquals(1, buffers.size());
+        // The timestamp is updated when requesting buffers successfully
+        assertTrue(bufferPool.getLastBufferOperationTimestamp() > 
oldTimestamp);
+
+        oldTimestamp = bufferPool.getLastBufferOperationTimestamp();
+        Thread.sleep(100);
+        bufferPool.recycle(buffers);
+        // The timestamp is updated when recycling buffers
+        assertTrue(bufferPool.getLastBufferOperationTimestamp() > 
oldTimestamp);
+
+        buffers = bufferPool.requestBuffers();
+
+        oldTimestamp = bufferPool.getLastBufferOperationTimestamp();
+        Thread.sleep(100);
+        assertEquals(0, bufferPool.requestBuffers().size());
+        // The timestamp is not updated when requesting buffers is failed
+        assertEquals(oldTimestamp, 
bufferPool.getLastBufferOperationTimestamp());
+
+        bufferPool.recycle(buffers);
+        bufferPool.destroy();
+    }
+
+    @Test
     public void testBufferFulfilledByRecycledBuffers() throws Exception {
         int numRequestThreads = 2;
         AtomicReference<Throwable> exception = new AtomicReference<>();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
index ebdaf0b..78ef5fa 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
@@ -18,8 +18,10 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
+import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.After;
@@ -29,7 +31,16 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.junit.rules.Timeout;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.time.Duration;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Queue;
 import java.util.Random;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -192,6 +203,102 @@ public class SortMergeResultPartitionReadSchedulerTest 
extends TestLogger {
         assertAllResourcesReleased();
     }
 
+    @Test
+    public void testRequestBufferTimeoutAndFailed() throws Exception {
+        Duration bufferRequestTimeout = Duration.ofSeconds(3);
+        List<MemorySegment> buffers = bufferPool.requestBuffers();
+        SortMergeResultPartitionReadScheduler readScheduler =
+                new SortMergeResultPartitionReadScheduler(
+                        bufferPool, executor, this, bufferRequestTimeout);
+
+        SortMergeSubpartitionReader subpartitionReader =
+                readScheduler.crateSubpartitionReader(
+                        new NoOpBufferAvailablityListener(), 0, 
partitionedFile);
+
+        PriorityQueue<SortMergeSubpartitionReader> allReaders = new 
PriorityQueue<>();
+        allReaders.add(subpartitionReader);
+
+        long startTimestamp = System.nanoTime();
+        Queue<MemorySegment> allocatedBuffers = 
readScheduler.allocateBuffers(allReaders);
+        long requestDuration = System.nanoTime() - startTimestamp;
+
+        assertEquals(0, allocatedBuffers.size());
+        assertTrue(requestDuration > bufferRequestTimeout.toNanos());
+        assertExpectedTimeoutException(subpartitionReader.getFailureCause());
+
+        bufferPool.recycle(buffers);
+        readScheduler.release();
+    }
+
+    @Test
+    public void testRequestTimeoutIsRefreshedAndSuccess() throws Exception {
+        Duration bufferRequestTimeout = Duration.ofSeconds(3);
+        FakeBatchShuffleReadBufferPool bufferPool =
+                new FakeBatchShuffleReadBufferPool(bufferSize * 3, bufferSize);
+        SortMergeResultPartitionReadScheduler readScheduler =
+                new SortMergeResultPartitionReadScheduler(
+                        bufferPool, executor, this, bufferRequestTimeout);
+
+        FileChannel dataFileChannel = 
openFileChannel(partitionedFile.getDataFilePath());
+        FileChannel indexFileChannel = 
openFileChannel(partitionedFile.getIndexFilePath());
+
+        PartitionedFileReader fileReader =
+                new PartitionedFileReader(partitionedFile, 0, dataFileChannel, 
indexFileChannel);
+
+        SortMergeSubpartitionReader subpartitionReader =
+                new SortMergeSubpartitionReader(new 
NoOpBufferAvailablityListener(), fileReader);
+
+        PriorityQueue<SortMergeSubpartitionReader> allReaders = new 
PriorityQueue<>();
+        allReaders.add(subpartitionReader);
+
+        long startTimestamp = System.nanoTime();
+        Queue<MemorySegment> allocatedBuffers = 
readScheduler.allocateBuffers(allReaders);
+        long requestDuration = System.nanoTime() - startTimestamp;
+
+        assertEquals(3, allocatedBuffers.size());
+        assertTrue(requestDuration > bufferRequestTimeout.toNanos() * 2);
+        assertNull(subpartitionReader.getFailureCause());
+
+        bufferPool.recycle(allocatedBuffers);
+        bufferPool.destroy();
+        dataFileChannel.close();
+        indexFileChannel.close();
+        readScheduler.release();
+    }
+
+    private static class FakeBatchShuffleReadBufferPool extends 
BatchShuffleReadBufferPool {
+        private final Queue<MemorySegment> requestedBuffers;
+
+        FakeBatchShuffleReadBufferPool(long totalBytes, int bufferSize) throws 
Exception {
+            super(totalBytes, bufferSize);
+            this.requestedBuffers = new LinkedList<>(requestBuffers());
+        }
+
+        @Override
+        public long getLastBufferOperationTimestamp() {
+            recycle(requestedBuffers.poll());
+            return super.getLastBufferOperationTimestamp();
+        }
+
+        @Override
+        public void destroy() {
+            recycle(requestedBuffers);
+            requestedBuffers.clear();
+            super.destroy();
+        }
+    }
+
+    private static FileChannel openFileChannel(Path path) throws IOException {
+        return FileChannel.open(path, StandardOpenOption.READ);
+    }
+
+    private static void assertExpectedTimeoutException(Throwable throwable) {
+        assertNotNull(throwable);
+        assertTrue(
+                ExceptionUtils.findThrowableWithMessage(throwable, "Buffer 
request timeout")
+                        .isPresent());
+    }
+
     private void assertAllResourcesReleased() {
         assertNull(readScheduler.getDataFileChannel());
         assertNull(readScheduler.getIndexFileChannel());

Reply via email to