This is an automated email from the ASF dual-hosted git repository. yingjie pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 936f1d76b3cf494cc98c0c81bb5acfe5b438d93d Author: Yuxin Tan <[email protected]> AuthorDate: Fri Nov 26 16:46:55 2021 +0800 [FLINK-24954][network] Refresh read buffer request timeout on buffer recycling/requesting for sort-shuffle The implementation of read buffer request timeout for sorting shuffle is a little aggressive. When a running task encounters data skew or the task is slow, a timeout exception may be triggered. To improve this situation, when at least one buffer is recycled or allocated, the buffer request timeout should be refreshed to avoid throwing a timeout exception. This closes #17936. --- .../io/disk/BatchShuffleReadBufferPool.java | 12 +++ .../SortMergeResultPartitionReadScheduler.java | 38 ++++++-- .../io/disk/BatchShuffleReadBufferPoolTest.java | 28 ++++++ .../SortMergeResultPartitionReadSchedulerTest.java | 107 +++++++++++++++++++++ 4 files changed, 176 insertions(+), 9 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPool.java index 1000a44..7b69936 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPool.java @@ -75,6 +75,10 @@ public class BatchShuffleReadBufferPool { @GuardedBy("buffers") private final Queue<MemorySegment> buffers = new ArrayDeque<>(); + /** The timestamp when the last buffer is recycled or allocated. */ + @GuardedBy("buffers") + private long lastBufferOperationTimestamp = System.nanoTime(); + /** Whether this buffer pool has been destroyed or not. */ @GuardedBy("buffers") private boolean destroyed; @@ -203,6 +207,7 @@ public class BatchShuffleReadBufferPool { while (allocated.size() < numBuffersPerRequest) { allocated.add(buffers.poll()); } + lastBufferOperationTimestamp = System.nanoTime(); } return allocated; } @@ -236,12 +241,19 @@ public class BatchShuffleReadBufferPool { } buffers.addAll(segments); + lastBufferOperationTimestamp = System.nanoTime(); if (buffers.size() >= numBuffersPerRequest) { buffers.notifyAll(); } } } + public long getLastBufferOperationTimestamp() { + synchronized (buffers) { + return lastBufferOperationTimestamp; + } + } + /** Destroys this buffer pool and after which, no buffer can be allocated any more. */ public void destroy() { synchronized (buffers) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java index a55a0c0..7551dc8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.io.network.partition; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.api.common.time.Deadline; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool; @@ -65,10 +64,10 @@ class SortMergeResultPartitionReadScheduler implements Runnable, BufferRecycler LoggerFactory.getLogger(SortMergeResultPartitionReadScheduler.class); /** - * Maximum time (5min) to wait when requesting read buffers from the buffer pool before throwing - * an exception. + * Default maximum time (5min) to wait when requesting read buffers from the buffer pool before + * throwing an exception. */ - private static final Duration BUFFER_REQUEST_TIMEOUT = Duration.ofMinutes(5); + private static final Duration DEFAULT_BUFFER_REQUEST_TIMEOUT = Duration.ofMinutes(5); /** Lock used to synchronize multi-thread access to thread-unsafe fields. */ private final Object lock; @@ -88,6 +87,12 @@ class SortMergeResultPartitionReadScheduler implements Runnable, BufferRecycler /** Maximum number of buffers can be allocated by this partition reader. */ private final int maxRequestedBuffers; + /** + * Maximum time to wait when requesting read buffers from the buffer pool before throwing an + * exception. + */ + private final Duration bufferRequestTimeout; + /** All failed subpartition readers to be released. */ @GuardedBy("lock") private final Set<SortMergeSubpartitionReader> failedReaders = new HashSet<>(); @@ -121,12 +126,22 @@ class SortMergeResultPartitionReadScheduler implements Runnable, BufferRecycler SortMergeResultPartitionReadScheduler( BatchShuffleReadBufferPool bufferPool, Executor ioExecutor, Object lock) { + this(bufferPool, ioExecutor, lock, DEFAULT_BUFFER_REQUEST_TIMEOUT); + } + + SortMergeResultPartitionReadScheduler( + BatchShuffleReadBufferPool bufferPool, + Executor ioExecutor, + Object lock, + Duration bufferRequestTimeout) { + this.lock = checkNotNull(lock); this.bufferPool = checkNotNull(bufferPool); this.ioExecutor = checkNotNull(ioExecutor); // one partition reader can consume at most 32M (the expected buffers per request is 8M) // buffers for data read. Currently, it is only an empirical value can not be configured this.maxRequestedBuffers = Math.max(1, 4 * bufferPool.getNumBuffersPerRequest()); + this.bufferRequestTimeout = checkNotNull(bufferRequestTimeout); // initialize the buffer pool eagerly to avoid reporting errors like OOM too late bufferPool.initialize(); @@ -147,21 +162,22 @@ class SortMergeResultPartitionReadScheduler implements Runnable, BufferRecycler removeFinishedAndFailedReaders(numBuffersRead, finishedReaders); } - private Queue<MemorySegment> allocateBuffers( - Queue<SortMergeSubpartitionReader> availableReaders) { + @VisibleForTesting + Queue<MemorySegment> allocateBuffers(Queue<SortMergeSubpartitionReader> availableReaders) { if (availableReaders.isEmpty()) { return new ArrayDeque<>(); } try { - Deadline deadline = Deadline.fromNow(BUFFER_REQUEST_TIMEOUT); - while (deadline.hasTimeLeft()) { + long timeoutTime = getBufferRequestTimeoutTime(); + do { List<MemorySegment> buffers = bufferPool.requestBuffers(); if (!buffers.isEmpty()) { return new ArrayDeque<>(buffers); } checkState(!isReleased, "Result partition has been already released."); - } + } while (System.nanoTime() < timeoutTime + || System.nanoTime() < (timeoutTime = getBufferRequestTimeoutTime())); if (numRequestedBuffers <= 0) { throw new TimeoutException( @@ -178,6 +194,10 @@ class SortMergeResultPartitionReadScheduler implements Runnable, BufferRecycler return new ArrayDeque<>(); } + private long getBufferRequestTimeoutTime() { + return bufferPool.getLastBufferOperationTimestamp() + bufferRequestTimeout.toNanos(); + } + private void releaseBuffers(Queue<MemorySegment> buffers) { if (!buffers.isEmpty()) { try { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPoolTest.java index 52823f0..6a3654b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPoolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPoolTest.java @@ -99,6 +99,34 @@ public class BatchShuffleReadBufferPoolTest { } @Test + public void testBufferOperationTimestampUpdated() throws Exception { + BatchShuffleReadBufferPool bufferPool = new BatchShuffleReadBufferPool(1024, 1024); + long oldTimestamp = bufferPool.getLastBufferOperationTimestamp(); + Thread.sleep(100); + List<MemorySegment> buffers = bufferPool.requestBuffers(); + assertEquals(1, buffers.size()); + // The timestamp is updated when requesting buffers successfully + assertTrue(bufferPool.getLastBufferOperationTimestamp() > oldTimestamp); + + oldTimestamp = bufferPool.getLastBufferOperationTimestamp(); + Thread.sleep(100); + bufferPool.recycle(buffers); + // The timestamp is updated when recycling buffers + assertTrue(bufferPool.getLastBufferOperationTimestamp() > oldTimestamp); + + buffers = bufferPool.requestBuffers(); + + oldTimestamp = bufferPool.getLastBufferOperationTimestamp(); + Thread.sleep(100); + assertEquals(0, bufferPool.requestBuffers().size()); + // The timestamp is not updated when requesting buffers is failed + assertEquals(oldTimestamp, bufferPool.getLastBufferOperationTimestamp()); + + bufferPool.recycle(buffers); + bufferPool.destroy(); + } + + @Test public void testBufferFulfilledByRecycledBuffers() throws Exception { int numRequestThreads = 2; AtomicReference<Throwable> exception = new AtomicReference<>(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java index ebdaf0b..78ef5fa 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java @@ -18,8 +18,10 @@ package org.apache.flink.runtime.io.network.partition; +import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool; import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.TestLogger; import org.junit.After; @@ -29,7 +31,16 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; import org.junit.rules.Timeout; +import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.time.Duration; +import java.util.LinkedList; +import java.util.List; +import java.util.PriorityQueue; +import java.util.Queue; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -192,6 +203,102 @@ public class SortMergeResultPartitionReadSchedulerTest extends TestLogger { assertAllResourcesReleased(); } + @Test + public void testRequestBufferTimeoutAndFailed() throws Exception { + Duration bufferRequestTimeout = Duration.ofSeconds(3); + List<MemorySegment> buffers = bufferPool.requestBuffers(); + SortMergeResultPartitionReadScheduler readScheduler = + new SortMergeResultPartitionReadScheduler( + bufferPool, executor, this, bufferRequestTimeout); + + SortMergeSubpartitionReader subpartitionReader = + readScheduler.crateSubpartitionReader( + new NoOpBufferAvailablityListener(), 0, partitionedFile); + + PriorityQueue<SortMergeSubpartitionReader> allReaders = new PriorityQueue<>(); + allReaders.add(subpartitionReader); + + long startTimestamp = System.nanoTime(); + Queue<MemorySegment> allocatedBuffers = readScheduler.allocateBuffers(allReaders); + long requestDuration = System.nanoTime() - startTimestamp; + + assertEquals(0, allocatedBuffers.size()); + assertTrue(requestDuration > bufferRequestTimeout.toNanos()); + assertExpectedTimeoutException(subpartitionReader.getFailureCause()); + + bufferPool.recycle(buffers); + readScheduler.release(); + } + + @Test + public void testRequestTimeoutIsRefreshedAndSuccess() throws Exception { + Duration bufferRequestTimeout = Duration.ofSeconds(3); + FakeBatchShuffleReadBufferPool bufferPool = + new FakeBatchShuffleReadBufferPool(bufferSize * 3, bufferSize); + SortMergeResultPartitionReadScheduler readScheduler = + new SortMergeResultPartitionReadScheduler( + bufferPool, executor, this, bufferRequestTimeout); + + FileChannel dataFileChannel = openFileChannel(partitionedFile.getDataFilePath()); + FileChannel indexFileChannel = openFileChannel(partitionedFile.getIndexFilePath()); + + PartitionedFileReader fileReader = + new PartitionedFileReader(partitionedFile, 0, dataFileChannel, indexFileChannel); + + SortMergeSubpartitionReader subpartitionReader = + new SortMergeSubpartitionReader(new NoOpBufferAvailablityListener(), fileReader); + + PriorityQueue<SortMergeSubpartitionReader> allReaders = new PriorityQueue<>(); + allReaders.add(subpartitionReader); + + long startTimestamp = System.nanoTime(); + Queue<MemorySegment> allocatedBuffers = readScheduler.allocateBuffers(allReaders); + long requestDuration = System.nanoTime() - startTimestamp; + + assertEquals(3, allocatedBuffers.size()); + assertTrue(requestDuration > bufferRequestTimeout.toNanos() * 2); + assertNull(subpartitionReader.getFailureCause()); + + bufferPool.recycle(allocatedBuffers); + bufferPool.destroy(); + dataFileChannel.close(); + indexFileChannel.close(); + readScheduler.release(); + } + + private static class FakeBatchShuffleReadBufferPool extends BatchShuffleReadBufferPool { + private final Queue<MemorySegment> requestedBuffers; + + FakeBatchShuffleReadBufferPool(long totalBytes, int bufferSize) throws Exception { + super(totalBytes, bufferSize); + this.requestedBuffers = new LinkedList<>(requestBuffers()); + } + + @Override + public long getLastBufferOperationTimestamp() { + recycle(requestedBuffers.poll()); + return super.getLastBufferOperationTimestamp(); + } + + @Override + public void destroy() { + recycle(requestedBuffers); + requestedBuffers.clear(); + super.destroy(); + } + } + + private static FileChannel openFileChannel(Path path) throws IOException { + return FileChannel.open(path, StandardOpenOption.READ); + } + + private static void assertExpectedTimeoutException(Throwable throwable) { + assertNotNull(throwable); + assertTrue( + ExceptionUtils.findThrowableWithMessage(throwable, "Buffer request timeout") + .isPresent()); + } + private void assertAllResourcesReleased() { assertNull(readScheduler.getDataFileChannel()); assertNull(readScheduler.getIndexFileChannel());
