This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7784ec7284bfc26a2561d8cd9ec3ab4aead6c104 Author: kevin.cyj <[email protected]> AuthorDate: Thu Dec 30 21:56:15 2021 +0800 [FLINK-25407][network] Fix the issues caused by FLINK-24035 This PR tries to fix the issues caused by FLINK-24035. More specifically, there are two issues, the first one is the deadlock caused by acquiring the 'factoryLock' in NetworkBufferPool and the other is the incorrect decreasing of the required segments of NetworkBufferPool. Both issues occur during exception handling of requesting segments. Actually, when reserving memory segments for LocalBufferPool, there is no need to modify the value of required segments. As a result, there is no n [...] This closes #18173. --- .../io/network/buffer/NetworkBufferPool.java | 17 ++++--- .../io/network/buffer/LocalBufferPoolTest.java | 59 ++++++++++++++++++++++ 2 files changed, 70 insertions(+), 6 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java index 509db03..d9717a4 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java @@ -208,7 +208,13 @@ public class NetworkBufferPool tryRedistributeBuffers(numberOfSegmentsToRequest); } - return internalRequestMemorySegments(numberOfSegmentsToRequest); + try { + return internalRequestMemorySegments(numberOfSegmentsToRequest); + } catch (IOException exception) { + revertRequiredBuffers(numberOfSegmentsToRequest); + ExceptionUtils.rethrowIOException(exception); + return null; + } } private List<MemorySegment> internalRequestMemorySegments(int numberOfSegmentsToRequest) @@ -248,7 +254,7 @@ public class NetworkBufferPool } } } catch (Throwable e) { - recycleMemorySegments(segments, numberOfSegmentsToRequest); + internalRecycleMemorySegments(segments); ExceptionUtils.rethrowIOException(e); } @@ -272,12 +278,11 @@ public class NetworkBufferPool */ @Override public void recycleUnpooledMemorySegments(Collection<MemorySegment> segments) { - recycleMemorySegments(segments, segments.size()); - } - - private void recycleMemorySegments(Collection<MemorySegment> segments, int size) { internalRecycleMemorySegments(segments); + revertRequiredBuffers(segments.size()); + } + private void revertRequiredBuffers(int size) { synchronized (factoryLock) { numTotalRequiredBuffers -= size; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java index b4b56fe..57edaca 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java @@ -135,6 +135,65 @@ public class LocalBufferPoolTest extends TestLogger { } } + @Test(timeout = 10000) // timeout can indicate a potential deadlock + public void testReserveSegmentsAndCancel() throws Exception { + int totalSegments = 4; + int segmentsToReserve = 2; + + NetworkBufferPool globalPool = new NetworkBufferPool(totalSegments, memorySegmentSize); + BufferPool localPool1 = globalPool.createBufferPool(segmentsToReserve, totalSegments); + List<MemorySegment> segments = new ArrayList<>(); + + try { + for (int i = 0; i < totalSegments; ++i) { + segments.add(localPool1.requestMemorySegmentBlocking()); + } + + BufferPool localPool2 = globalPool.createBufferPool(segmentsToReserve, totalSegments); + // the segment reserve thread will be blocked for no buffer is available + Thread reserveThread = + new Thread( + () -> { + try { + localPool2.reserveSegments(segmentsToReserve); + } catch (Throwable ignored) { + } + }); + reserveThread.start(); + Thread.sleep(100); // wait to be blocked + + // the cancel thread can be blocked when redistributing buffers + Thread cancelThread = + new Thread( + () -> { + localPool1.lazyDestroy(); + localPool2.lazyDestroy(); + }); + cancelThread.start(); + + // it is expected that the segment reserve thread can be cancelled successfully + Thread interruptThread = + new Thread( + () -> { + try { + do { + reserveThread.interrupt(); + Thread.sleep(100); + } while (reserveThread.isAlive() || cancelThread.isAlive()); + } catch (Throwable ignored) { + } + }); + interruptThread.start(); + + interruptThread.join(); + } finally { + segments.forEach(localPool1::recycle); + localPool1.lazyDestroy(); + assertEquals(0, globalPool.getNumberOfUsedMemorySegments()); + globalPool.destroy(); + } + } + @Test public void testRequestMoreThanAvailable() { localBufferPool.setNumBuffers(numBuffers);
