This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1ea2a7a5c90288dff08f702bac71de4d91c00f6f Author: kevin.cyj <[email protected]> AuthorDate: Thu Dec 30 21:56:15 2021 +0800 [FLINK-25407][network] Fix the issues caused by FLINK-24035 This PR tries to fix the issues caused by FLINK-24035. More specifically, there are two issues, the first one is the deadlock caused by acquiring the 'factoryLock' in NetworkBufferPool and the other is the incorrect decreasing of the required segments of NetworkBufferPool. Both issues occur during exception handling of requesting segments. Actually, when reserving memory segments for LocalBufferPool, there is no need to modify the value of required segments. As a result, there is no n [...] This closes #18173. --- .../io/network/buffer/NetworkBufferPool.java | 17 ++++--- .../io/network/buffer/LocalBufferPoolTest.java | 59 ++++++++++++++++++++++ 2 files changed, 70 insertions(+), 6 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java index 509db03..d9717a4 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java @@ -208,7 +208,13 @@ public class NetworkBufferPool tryRedistributeBuffers(numberOfSegmentsToRequest); } - return internalRequestMemorySegments(numberOfSegmentsToRequest); + try { + return internalRequestMemorySegments(numberOfSegmentsToRequest); + } catch (IOException exception) { + revertRequiredBuffers(numberOfSegmentsToRequest); + ExceptionUtils.rethrowIOException(exception); + return null; + } } private List<MemorySegment> internalRequestMemorySegments(int numberOfSegmentsToRequest) @@ -248,7 +254,7 @@ public class NetworkBufferPool } } } catch (Throwable e) { - recycleMemorySegments(segments, numberOfSegmentsToRequest); + internalRecycleMemorySegments(segments); ExceptionUtils.rethrowIOException(e); } @@ -272,12 +278,11 @@ public class NetworkBufferPool */ @Override public void recycleUnpooledMemorySegments(Collection<MemorySegment> segments) { - recycleMemorySegments(segments, segments.size()); - } - - private void recycleMemorySegments(Collection<MemorySegment> segments, int size) { internalRecycleMemorySegments(segments); + revertRequiredBuffers(segments.size()); + } + private void revertRequiredBuffers(int size) { synchronized (factoryLock) { numTotalRequiredBuffers -= size; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java index f77275a..27ad673 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java @@ -127,6 +127,65 @@ public class LocalBufferPoolTest extends TestLogger { } } + @Test(timeout = 10000) // timeout can indicate a potential deadlock + public void testReserveSegmentsAndCancel() throws Exception { + int totalSegments = 4; + int segmentsToReserve = 2; + + NetworkBufferPool globalPool = new NetworkBufferPool(totalSegments, memorySegmentSize); + BufferPool localPool1 = globalPool.createBufferPool(segmentsToReserve, totalSegments); + List<MemorySegment> segments = new ArrayList<>(); + + try { + for (int i = 0; i < totalSegments; ++i) { + segments.add(localPool1.requestMemorySegmentBlocking()); + } + + BufferPool localPool2 = globalPool.createBufferPool(segmentsToReserve, totalSegments); + // the segment reserve thread will be blocked for no buffer is available + Thread reserveThread = + new Thread( + () -> { + try { + localPool2.reserveSegments(segmentsToReserve); + } catch (Throwable ignored) { + } + }); + reserveThread.start(); + Thread.sleep(100); // wait to be blocked + + // the cancel thread can be blocked when redistributing buffers + Thread cancelThread = + new Thread( + () -> { + localPool1.lazyDestroy(); + localPool2.lazyDestroy(); + }); + cancelThread.start(); + + // it is expected that the segment reserve thread can be cancelled successfully + Thread interruptThread = + new Thread( + () -> { + try { + do { + reserveThread.interrupt(); + Thread.sleep(100); + } while (reserveThread.isAlive() || cancelThread.isAlive()); + } catch (Throwable ignored) { + } + }); + interruptThread.start(); + + interruptThread.join(); + } finally { + segments.forEach(localPool1::recycle); + localPool1.lazyDestroy(); + assertEquals(0, globalPool.getNumberOfUsedMemorySegments()); + globalPool.destroy(); + } + } + @Test public void testRequestMoreThanAvailable() { localBufferPool.setNumBuffers(numBuffers);
