This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new a0eeac3 [FLINK-21753][runtime] Don't cycle reference between memory
manager and gc cleaner action
a0eeac3 is described below
commit a0eeac3bdb7f83f9b73cdcdd8b507f6d0069115c
Author: Kezhu Wang <[email protected]>
AuthorDate: Tue Mar 16 00:51:43 2021 +0800
[FLINK-21753][runtime] Don't cycle reference between memory manager and gc
cleaner action
This closes #15224
---
.../flink/core/memory/MemorySegmentFactory.java | 6 +++---
.../apache/flink/runtime/memory/MemoryManager.java | 14 ++++++------
.../flink/runtime/memory/UnsafeMemoryBudget.java | 12 +++++++++++
.../flink/runtime/memory/MemoryManagerTest.java | 25 ++++++++++++++++++++++
4 files changed, 48 insertions(+), 9 deletions(-)
diff --git
a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
index 6016d4c..b66d716 100644
---
a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
+++
b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
@@ -169,14 +169,14 @@ public final class MemorySegmentFactory {
*
* @param size The size of the off-heap unsafe memory segment to allocate.
* @param owner The owner to associate with the off-heap unsafe memory
segment.
- * @param customCleanupAction A custom action to run upon calling GC
cleaner.
+ * @param gcCleanupAction A custom action to run upon calling GC cleaner.
* @return A new memory segment, backed by off-heap unsafe memory.
*/
public static MemorySegment allocateOffHeapUnsafeMemory(
- int size, Object owner, Runnable customCleanupAction) {
+ int size, Object owner, Runnable gcCleanupAction) {
long address = MemoryUtils.allocateUnsafe(size);
ByteBuffer offHeapBuffer =
MemoryUtils.wrapUnsafeMemoryWithByteBuffer(address, size);
- MemoryUtils.createMemoryGcCleaner(offHeapBuffer, address,
customCleanupAction);
+ MemoryUtils.createMemoryGcCleaner(offHeapBuffer, address,
gcCleanupAction);
return new HybridMemorySegment(offHeapBuffer, owner);
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
index 23a8224..75ea629 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
@@ -156,6 +156,12 @@ public class MemoryManager {
}
}
+ /** Gets memory budget. */
+ @VisibleForTesting
+ public UnsafeMemoryBudget getMemoryBudget() {
+ return memoryBudget;
+ }
+
/**
* Checks whether the MemoryManager has been shut down.
*
@@ -234,7 +240,7 @@ public class MemoryManager {
String.format("Could not allocate %d pages",
numberOfPages), e);
}
- Runnable pageCleanup = this::releasePage;
+ Runnable gcCleanup =
memoryBudget.getReleaseMemoryAction(getPageSize());
allocatedSegments.compute(
owner,
(o, currentSegmentsForOwner) -> {
@@ -244,7 +250,7 @@ public class MemoryManager {
: currentSegmentsForOwner;
for (long i = numberOfPages; i > 0; i--) {
MemorySegment segment =
- allocateOffHeapUnsafeMemory(getPageSize(),
owner, pageCleanup);
+ allocateOffHeapUnsafeMemory(getPageSize(),
owner, gcCleanup);
target.add(segment);
segmentsForOwner.add(segment);
}
@@ -254,10 +260,6 @@ public class MemoryManager {
Preconditions.checkState(!isShutDown, "Memory manager has been
concurrently shut down.");
}
- private void releasePage() {
- memoryBudget.releaseMemory(getPageSize());
- }
-
/**
* Tries to release the memory for the specified segment.
*
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/UnsafeMemoryBudget.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/UnsafeMemoryBudget.java
index 063030c..b4f7446 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/UnsafeMemoryBudget.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/UnsafeMemoryBudget.java
@@ -213,4 +213,16 @@ class UnsafeMemoryBudget {
size, currentAvailableMemorySize,
totalMemorySize));
}
}
+
+ /**
+ * Generates an release memory action that can be performed later
+ *
+ * <p>The generated runnable could be safely referenced by possible gc
cleaner action without
+ * worrying about cycle reference back to memory manager.
+ */
+ Runnable getReleaseMemoryAction(@Nonnegative long size) {
+ return () -> {
+ releaseMemory(size);
+ };
+ }
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java
index 9c6fe38..2f5884a 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java
@@ -35,6 +35,7 @@ import java.util.Random;
import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/** Tests for the memory manager. */
@@ -404,4 +405,28 @@ public class MemoryManagerTest {
// expected
}
}
+
+ private UnsafeMemoryBudget allocateLeakingPagesAndGetBudget() throws
MemoryAllocationException {
+ // We create a new memory manager here since we want it and all its
segments to be leaking.
+ MemoryManager memoryManager =
+ MemoryManagerBuilder.newBuilder()
+ .setMemorySize(MEMORY_SIZE)
+ .setPageSize(PAGE_SIZE)
+ .build();
+ memoryManager.allocatePages(new Object(), (int)
memoryManager.getMemorySize() / PAGE_SIZE);
+
+ return memoryManager.getMemoryBudget();
+ }
+
+ @Test
+ public void testGcCleanup() throws Exception {
+ UnsafeMemoryBudget memoryBudget = allocateLeakingPagesAndGetBudget();
+ for (int i = 0;
+ i < 20 && memoryBudget.getAvailableMemorySize() <
memoryBudget.getTotalMemorySize();
+ i++) {
+ System.gc();
+ Thread.sleep(50);
+ }
+ assertTrue(memoryBudget.verifyEmpty());
+ }
}