This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.12 by this push:
     new 6464749  [FLINK-21552][runtime] Unreserve managed memory if 
OpaqueMemoryResource cannot be initialized.
6464749 is described below

commit 64647490f3e96bdbdfe535654c667f1ead0b026c
Author: Xintong Song <tonysong...@gmail.com>
AuthorDate: Tue Mar 2 16:21:28 2021 +0800

    [FLINK-21552][runtime] Unreserve managed memory if OpaqueMemoryResource 
cannot be initialized.
    
    This closes #15057
---
 .../org/apache/flink/runtime/memory/MemoryManager.java  |  7 ++++++-
 .../memory/MemoryManagerSharedResourcesTest.java        | 17 +++++++++++++++++
 2 files changed, 23 insertions(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
index 2ef80fa..cf44e79 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
@@ -540,7 +540,12 @@ public class MemoryManager {
                                 e);
                     }
 
-                    return initializer.apply(size);
+                    try {
+                        return initializer.apply(size);
+                    } catch (Throwable t) {
+                        releaseMemory(type, size);
+                        throw t;
+                    }
                 };
 
         final Consumer<Long> releaser = (size) -> releaseMemory(type, size);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerSharedResourcesTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerSharedResourcesTest.java
index d224a15..9c4fb9e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerSharedResourcesTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerSharedResourcesTest.java
@@ -224,6 +224,23 @@ public class MemoryManagerSharedResourcesTest {
         assertTrue(resource.getResourceHandle().closed);
     }
 
+    @Test
+    public void testAllocateResourceInitializeFail() {
+        final MemoryManager memoryManager = createMemoryManager();
+
+        try {
+            memoryManager.getSharedMemoryResourceForManagedMemory(
+                    "type",
+                    (ignore) -> {
+                        throw new RuntimeException("initialization fail");
+                    },
+                    0.1);
+            fail("expect to fail");
+        } catch (Throwable t) {
+            // expected
+        }
+        assertTrue(memoryManager.verifyEmpty());
+    }
     // ------------------------------------------------------------------------
     //  Utils
     // ------------------------------------------------------------------------

Reply via email to