carp84 commented on a change in pull request #9693: [FLINK-13984] Separate
on-heap and off-heap managed memory pools
URL: https://github.com/apache/flink/pull/9693#discussion_r335641273
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
##########
@@ -474,67 +424,141 @@ public void releaseAll(Object owner) {
return;
}
- // -------------------- BEGIN CRITICAL SECTION
-------------------
- synchronized (lock) {
- if (isShutDown) {
- throw new IllegalStateException("Memory manager
has been shut down.");
- }
-
- // get all segments
- final Set<MemorySegment> segments =
allocatedSegments.remove(owner);
+ Preconditions.checkState(!isShutDown, "Memory manager has been
shut down.");
- // all segments may have been freed previously
individually
- if (segments == null || segments.isEmpty()) {
- return;
- }
+ // get all segments
+ Set<MemorySegment> segments = allocatedSegments.remove(owner);
- // free each segment
- if (isPreAllocated) {
- for (MemorySegment seg : segments) {
- memoryPool.returnSegmentToPool(seg);
- }
- }
- else {
- for (MemorySegment seg : segments) {
- seg.free();
- }
- numNonAllocatedPages += segments.size();
- }
+ // all segments may have been freed previously individually
+ if (segments == null || segments.isEmpty()) {
+ return;
+ }
- segments.clear();
+ // free each segment
+ EnumMap<MemoryType, Long> releasedMemory = new
EnumMap<>(MemoryType.class);
+ for (MemorySegment segment : segments) {
+ segment.free();
+ releaseSegment(segment, releasedMemory);
}
- // -------------------- END CRITICAL SECTION -------------------
+ budgetByType.releaseBudgetForKeys(releasedMemory);
+
+ segments.clear();
}
- //
------------------------------------------------------------------------
- // Properties, sizes and size conversions
- //
------------------------------------------------------------------------
+ /**
+ * Reserves memory of a certain type for an owner from this memory
manager.
+ *
+ * @param owner The owner to associate with the memory reservation, for
the fallback release.
+ * @param memoryType type of memory to reserve (heap / off-heap).
+ * @param size size of memory to reserve.
+ * @throws MemoryAllocationException Thrown, if this memory manager
does not have the requested amount
+ * of memory any more.
+ */
+ public void reserveMemory(Object owner, MemoryType memoryType, long
size) throws MemoryAllocationException {
+ checkMemoryReservationPreconditions(owner, memoryType, size);
+ if (size == 0L) {
+ return;
+ }
+
+ long acquiredMemory =
budgetByType.acquireBudgetForKey(memoryType, size);
+ if (acquiredMemory < size) {
+ throw new MemoryAllocationException(
+ String.format("Could not allocate %d bytes.
Only %d bytes are remaining.", size, acquiredMemory));
+ }
+
+ reservedMemory.compute(owner, (o, reservations) -> {
+ Map<MemoryType, Long> newReservations = reservations;
+ if (reservations == null) {
+ newReservations = new
EnumMap<>(MemoryType.class);
+ newReservations.put(memoryType, size);
+ } else {
+ reservations.compute(
+ memoryType,
+ (mt, currentlyReserved) ->
currentlyReserved == null ? size : currentlyReserved + size);
+ }
+ return newReservations;
+ });
+
+ Preconditions.checkState(!isShutDown, "Memory manager has been
concurrently shut down.");
+ }
/**
- * Gets the type of memory (heap / off-heap) managed by this memory
manager.
+ * Releases memory of a certain type from an owner to this memory
manager.
*
- * @return The type of memory managed by this memory manager.
+ * @param owner The owner to associate with the memory reservation, for
the fallback release.
+ * @param memoryType type of memory to release (heap / off-heap).
+ * @param size size of memory to release.
*/
- public MemoryType getMemoryType() {
- return memoryType;
+ public void releaseMemory(Object owner, MemoryType memoryType, long
size) {
+ checkMemoryReservationPreconditions(owner, memoryType, size);
+ if (size == 0L) {
+ return;
+ }
+
+ reservedMemory.compute(owner, (o, reservations) -> {
Review comment:
It seems we're using a "fail silently" way here if the `compute` result is
`null` which means we cannot find the owner in the map anymore. Is it possible
that one releases its reserved memory multiple times? If so, does it worth any
logging here?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services