carp84 commented on a change in pull request #9693: [FLINK-13984] Separate 
on-heap and off-heap managed memory pools
URL: https://github.com/apache/flink/pull/9693#discussion_r335641273
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
 ##########
 @@ -474,67 +424,141 @@ public void releaseAll(Object owner) {
                        return;
                }
 
-               // -------------------- BEGIN CRITICAL SECTION 
-------------------
-               synchronized (lock) {
-                       if (isShutDown) {
-                               throw new IllegalStateException("Memory manager 
has been shut down.");
-                       }
-
-                       // get all segments
-                       final Set<MemorySegment> segments = 
allocatedSegments.remove(owner);
+               Preconditions.checkState(!isShutDown, "Memory manager has been 
shut down.");
 
-                       // all segments may have been freed previously 
individually
-                       if (segments == null || segments.isEmpty()) {
-                               return;
-                       }
+               // get all segments
+               Set<MemorySegment> segments = allocatedSegments.remove(owner);
 
-                       // free each segment
-                       if (isPreAllocated) {
-                               for (MemorySegment seg : segments) {
-                                       memoryPool.returnSegmentToPool(seg);
-                               }
-                       }
-                       else {
-                               for (MemorySegment seg : segments) {
-                                       seg.free();
-                               }
-                               numNonAllocatedPages += segments.size();
-                       }
+               // all segments may have been freed previously individually
+               if (segments == null || segments.isEmpty()) {
+                       return;
+               }
 
-                       segments.clear();
+               // free each segment
+               EnumMap<MemoryType, Long> releasedMemory = new 
EnumMap<>(MemoryType.class);
+               for (MemorySegment segment : segments) {
+                       segment.free();
+                       releaseSegment(segment, releasedMemory);
                }
-               // -------------------- END CRITICAL SECTION -------------------
+               budgetByType.releaseBudgetForKeys(releasedMemory);
+
+               segments.clear();
        }
 
-       // 
------------------------------------------------------------------------
-       //  Properties, sizes and size conversions
-       // 
------------------------------------------------------------------------
+       /**
+        * Reserves memory of a certain type for an owner from this memory 
manager.
+        *
+        * @param owner The owner to associate with the memory reservation, for 
the fallback release.
+        * @param memoryType type of memory to reserve (heap / off-heap).
+        * @param size size of memory to reserve.
+        * @throws MemoryAllocationException Thrown, if this memory manager 
does not have the requested amount
+        *                                   of memory any more.
+        */
+       public void reserveMemory(Object owner, MemoryType memoryType, long 
size) throws MemoryAllocationException {
+               checkMemoryReservationPreconditions(owner, memoryType, size);
+               if (size == 0L) {
+                       return;
+               }
+
+               long acquiredMemory = 
budgetByType.acquireBudgetForKey(memoryType, size);
+               if (acquiredMemory < size) {
+                       throw new MemoryAllocationException(
+                               String.format("Could not allocate %d bytes. 
Only %d bytes are remaining.", size, acquiredMemory));
+               }
+
+               reservedMemory.compute(owner, (o, reservations) -> {
+                       Map<MemoryType, Long> newReservations = reservations;
+                       if (reservations == null) {
+                               newReservations = new 
EnumMap<>(MemoryType.class);
+                               newReservations.put(memoryType, size);
+                       } else {
+                               reservations.compute(
+                                       memoryType,
+                                       (mt, currentlyReserved) -> 
currentlyReserved == null ? size : currentlyReserved + size);
+                       }
+                       return newReservations;
+               });
+
+               Preconditions.checkState(!isShutDown, "Memory manager has been 
concurrently shut down.");
+       }
 
        /**
-        * Gets the type of memory (heap / off-heap) managed by this memory 
manager.
+        * Releases memory of a certain type from an owner to this memory 
manager.
         *
-        * @return The type of memory managed by this memory manager.
+        * @param owner The owner to associate with the memory reservation, for 
the fallback release.
+        * @param memoryType type of memory to release (heap / off-heap).
+        * @param size size of memory to release.
         */
-       public MemoryType getMemoryType() {
-               return memoryType;
+       public void releaseMemory(Object owner, MemoryType memoryType, long 
size) {
+               checkMemoryReservationPreconditions(owner, memoryType, size);
+               if (size == 0L) {
+                       return;
+               }
+
+               reservedMemory.compute(owner, (o, reservations) -> {
 
 Review comment:
   It seems we're using a "fail silently" way here if the `compute` result is 
`null` which means we cannot find the owner in the map anymore. Is it possible 
that one releases its reserved memory multiple times? If so, does it worth any 
logging here?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to