This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch release-1.12 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.12 by this push: new 6464749 [FLINK-21552][runtime] Unreserve managed memory if OpaqueMemoryResource cannot be initialized. 6464749 is described below commit 64647490f3e96bdbdfe535654c667f1ead0b026c Author: Xintong Song <tonysong...@gmail.com> AuthorDate: Tue Mar 2 16:21:28 2021 +0800 [FLINK-21552][runtime] Unreserve managed memory if OpaqueMemoryResource cannot be initialized. This closes #15057 --- .../org/apache/flink/runtime/memory/MemoryManager.java | 7 ++++++- .../memory/MemoryManagerSharedResourcesTest.java | 17 +++++++++++++++++ 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java index 2ef80fa..cf44e79 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java @@ -540,7 +540,12 @@ public class MemoryManager { e); } - return initializer.apply(size); + try { + return initializer.apply(size); + } catch (Throwable t) { + releaseMemory(type, size); + throw t; + } }; final Consumer<Long> releaser = (size) -> releaseMemory(type, size); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerSharedResourcesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerSharedResourcesTest.java index d224a15..9c4fb9e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerSharedResourcesTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerSharedResourcesTest.java @@ -224,6 +224,23 @@ public class MemoryManagerSharedResourcesTest { assertTrue(resource.getResourceHandle().closed); } + @Test + public void testAllocateResourceInitializeFail() { + final MemoryManager memoryManager = createMemoryManager(); + + try { + memoryManager.getSharedMemoryResourceForManagedMemory( + "type", + (ignore) -> { + throw new RuntimeException("initialization fail"); + }, + 0.1); + fail("expect to fail"); + } catch (Throwable t) { + // expected + } + assertTrue(memoryManager.verifyEmpty()); + } // ------------------------------------------------------------------------ // Utils // ------------------------------------------------------------------------