tillrohrmann commented on a change in pull request #11109: [FLINK-15758][MemManager] Release segment and its unsafe memory in GC Cleaner URL: https://github.com/apache/flink/pull/11109#discussion_r410218003
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java ########## @@ -83,41 +73,47 @@ private final Map<Object, Set<MemorySegment>> allocatedSegments; /** Reserved memory per memory owner. */ - private final Map<Object, Map<MemoryType, Long>> reservedMemory; + private final Map<Object, Long> reservedMemory; + + private final long pageSize; + + private final long totalMemorySize; + + private final long totalNumberOfPages; - private final KeyedBudgetManager<MemoryType> budgetByType; + private final AtomicLong availableMemorySize; private final SharedResources sharedResources; /** Flag whether the close() has already been invoked. */ private volatile boolean isShutDown; /** - * Creates a memory manager with the given memory types, capacity and given page size. + * Creates a memory manager with the given capacity and given page size. * - * @param memorySizeByType The total size of the memory to be managed by this memory manager for each type (heap / off-heap). + * @param memorySize The total size of the off-heap memory to be managed by this memory manager. * @param pageSize The size of the pages handed out by the memory manager. */ - public MemoryManager(Map<MemoryType, Long> memorySizeByType, int pageSize) { - for (Entry<MemoryType, Long> sizeForType : memorySizeByType.entrySet()) { - sanityCheck(sizeForType.getValue(), pageSize, sizeForType.getKey()); - } + public MemoryManager(long memorySize, int pageSize) { + sanityCheck(memorySize, pageSize); + this.pageSize = pageSize; + this.totalMemorySize = memorySize; + this.totalNumberOfPages = memorySize / pageSize; this.allocatedSegments = new ConcurrentHashMap<>(); this.reservedMemory = new ConcurrentHashMap<>(); - this.budgetByType = new KeyedBudgetManager<>(memorySizeByType, pageSize); + this.availableMemorySize = new AtomicLong(totalMemorySize); this.sharedResources = new SharedResources(); - verifyIntTotalNumberOfPages(memorySizeByType, budgetByType.maxTotalNumberOfPages()); + verifyIntTotalNumberOfPages(totalMemorySize, totalNumberOfPages); LOG.debug( "Initialized MemoryManager with total memory size {} ({}), page size {}.", - budgetByType.totalAvailableBudget(), - memorySizeByType, + memorySize, + memorySize, Review comment: I think we can remove on `memorySize` here. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services