This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new a0eeac3  [FLINK-21753][runtime] Don't cycle reference between memory 
manager and gc cleaner action
a0eeac3 is described below

commit a0eeac3bdb7f83f9b73cdcdd8b507f6d0069115c
Author: Kezhu Wang <[email protected]>
AuthorDate: Tue Mar 16 00:51:43 2021 +0800

    [FLINK-21753][runtime] Don't cycle reference between memory manager and gc 
cleaner action
    
    This closes #15224
---
 .../flink/core/memory/MemorySegmentFactory.java    |  6 +++---
 .../apache/flink/runtime/memory/MemoryManager.java | 14 ++++++------
 .../flink/runtime/memory/UnsafeMemoryBudget.java   | 12 +++++++++++
 .../flink/runtime/memory/MemoryManagerTest.java    | 25 ++++++++++++++++++++++
 4 files changed, 48 insertions(+), 9 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
 
b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
index 6016d4c..b66d716 100644
--- 
a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
@@ -169,14 +169,14 @@ public final class MemorySegmentFactory {
      *
      * @param size The size of the off-heap unsafe memory segment to allocate.
      * @param owner The owner to associate with the off-heap unsafe memory 
segment.
-     * @param customCleanupAction A custom action to run upon calling GC 
cleaner.
+     * @param gcCleanupAction A custom action to run upon calling GC cleaner.
      * @return A new memory segment, backed by off-heap unsafe memory.
      */
     public static MemorySegment allocateOffHeapUnsafeMemory(
-            int size, Object owner, Runnable customCleanupAction) {
+            int size, Object owner, Runnable gcCleanupAction) {
         long address = MemoryUtils.allocateUnsafe(size);
         ByteBuffer offHeapBuffer = 
MemoryUtils.wrapUnsafeMemoryWithByteBuffer(address, size);
-        MemoryUtils.createMemoryGcCleaner(offHeapBuffer, address, 
customCleanupAction);
+        MemoryUtils.createMemoryGcCleaner(offHeapBuffer, address, 
gcCleanupAction);
         return new HybridMemorySegment(offHeapBuffer, owner);
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
index 23a8224..75ea629 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
@@ -156,6 +156,12 @@ public class MemoryManager {
         }
     }
 
+    /** Gets memory budget. */
+    @VisibleForTesting
+    public UnsafeMemoryBudget getMemoryBudget() {
+        return memoryBudget;
+    }
+
     /**
      * Checks whether the MemoryManager has been shut down.
      *
@@ -234,7 +240,7 @@ public class MemoryManager {
                     String.format("Could not allocate %d pages", 
numberOfPages), e);
         }
 
-        Runnable pageCleanup = this::releasePage;
+        Runnable gcCleanup = 
memoryBudget.getReleaseMemoryAction(getPageSize());
         allocatedSegments.compute(
                 owner,
                 (o, currentSegmentsForOwner) -> {
@@ -244,7 +250,7 @@ public class MemoryManager {
                                     : currentSegmentsForOwner;
                     for (long i = numberOfPages; i > 0; i--) {
                         MemorySegment segment =
-                                allocateOffHeapUnsafeMemory(getPageSize(), 
owner, pageCleanup);
+                                allocateOffHeapUnsafeMemory(getPageSize(), 
owner, gcCleanup);
                         target.add(segment);
                         segmentsForOwner.add(segment);
                     }
@@ -254,10 +260,6 @@ public class MemoryManager {
         Preconditions.checkState(!isShutDown, "Memory manager has been 
concurrently shut down.");
     }
 
-    private void releasePage() {
-        memoryBudget.releaseMemory(getPageSize());
-    }
-
     /**
      * Tries to release the memory for the specified segment.
      *
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/UnsafeMemoryBudget.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/UnsafeMemoryBudget.java
index 063030c..b4f7446 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/UnsafeMemoryBudget.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/UnsafeMemoryBudget.java
@@ -213,4 +213,16 @@ class UnsafeMemoryBudget {
                             size, currentAvailableMemorySize, 
totalMemorySize));
         }
     }
+
+    /**
+     * Generates an release memory action that can be performed later
+     *
+     * <p>The generated runnable could be safely referenced by possible gc 
cleaner action without
+     * worrying about cycle reference back to memory manager.
+     */
+    Runnable getReleaseMemoryAction(@Nonnegative long size) {
+        return () -> {
+            releaseMemory(size);
+        };
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java
index 9c6fe38..2f5884a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java
@@ -35,6 +35,7 @@ import java.util.Random;
 import java.util.stream.Collectors;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 /** Tests for the memory manager. */
@@ -404,4 +405,28 @@ public class MemoryManagerTest {
             // expected
         }
     }
+
+    private UnsafeMemoryBudget allocateLeakingPagesAndGetBudget() throws 
MemoryAllocationException {
+        // We create a new memory manager here since we want it and all its 
segments to be leaking.
+        MemoryManager memoryManager =
+                MemoryManagerBuilder.newBuilder()
+                        .setMemorySize(MEMORY_SIZE)
+                        .setPageSize(PAGE_SIZE)
+                        .build();
+        memoryManager.allocatePages(new Object(), (int) 
memoryManager.getMemorySize() / PAGE_SIZE);
+
+        return memoryManager.getMemoryBudget();
+    }
+
+    @Test
+    public void testGcCleanup() throws Exception {
+        UnsafeMemoryBudget memoryBudget = allocateLeakingPagesAndGetBudget();
+        for (int i = 0;
+                i < 20 && memoryBudget.getAvailableMemorySize() < 
memoryBudget.getTotalMemorySize();
+                i++) {
+            System.gc();
+            Thread.sleep(50);
+        }
+        assertTrue(memoryBudget.verifyEmpty());
+    }
 }

Reply via email to