This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e70f5853895cfa96c665b2f7bec0e62216f7f507
Author: Andrey Zagrebin <[email protected]>
AuthorDate: Thu Jan 30 14:19:39 2020 +0100

    [FLINK-15758][MemManager] Remove KeyedBudgetManager and use AtomicLong
---
 .../org/apache/flink/core/memory/MemoryType.java   |  38 ---
 .../python/AbstractPythonFunctionOperator.java     |   6 +-
 .../apache/flink/runtime/memory/MemoryManager.java | 348 +++++++++------------
 .../flink/runtime/taskexecutor/slot/TaskSlot.java  |   6 +-
 .../flink/runtime/util/KeyedBudgetManager.java     | 294 -----------------
 .../flink/runtime/memory/MemoryManagerBuilder.java |  19 +-
 .../memory/MemoryManagerSharedResourcesTest.java   |  10 +-
 .../flink/runtime/memory/MemoryManagerTest.java    | 112 +++----
 .../operators/testutils/MockEnvironment.java       |   3 +-
 .../flink/runtime/util/KeyedBudgetManagerTest.java | 262 ----------------
 .../runtime/tasks/StreamMockEnvironment.java       |   3 +-
 11 files changed, 190 insertions(+), 911 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/core/memory/MemoryType.java 
b/flink-core/src/main/java/org/apache/flink/core/memory/MemoryType.java
deleted file mode 100644
index 804f00d..0000000
--- a/flink-core/src/main/java/org/apache/flink/core/memory/MemoryType.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.core.memory;
-
-import org.apache.flink.annotation.Internal;
-
-/**
- * The class of memory, such as heap or off-heap.
- */
-@Internal
-public enum MemoryType {
-
-       /**
-        * Denotes memory that is part of the Java heap.
-        */
-       HEAP,
-
-       /**
-        * Denotes memory that is outside the Java heap (but still part of tha 
Java process).
-        */
-       OFF_HEAP
-}
diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
index 33fb35d..6f94043 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
@@ -21,7 +21,6 @@ package org.apache.flink.streaming.api.operators.python;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MemorySize;
-import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.python.PythonConfig;
 import org.apache.flink.python.PythonFunctionRunner;
 import org.apache.flink.python.PythonOptions;
@@ -172,8 +171,7 @@ public abstract class AbstractPythonFunctionOperator<IN, 
OUT>
                                pythonFunctionRunner = null;
                        }
                        if (reservedMemory > 0) {
-                               
getContainingTask().getEnvironment().getMemoryManager().releaseMemory(
-                                       this, MemoryType.OFF_HEAP, 
reservedMemory);
+                               
getContainingTask().getEnvironment().getMemoryManager().releaseMemory(this, 
reservedMemory);
                                reservedMemory = -1;
                        }
                } finally {
@@ -277,7 +275,7 @@ public abstract class AbstractPythonFunctionOperator<IN, 
OUT>
                long availableManagedMemory = memoryManager.computeMemorySize(
                        getOperatorConfig().getManagedMemoryFraction());
                if (requiredPythonWorkerMemory <= availableManagedMemory) {
-                       memoryManager.reserveMemory(this, MemoryType.OFF_HEAP, 
requiredPythonWorkerMemory);
+                       memoryManager.reserveMemory(this, 
requiredPythonWorkerMemory);
                        LOG.info("Reserved memory {} for Python worker.", 
requiredPythonWorkerMemory);
                        this.reservedMemory = requiredPythonWorkerMemory;
                        // TODO enforce the memory limit of the Python worker
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
index 3bcfa7c..72df659 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
@@ -21,9 +21,6 @@ package org.apache.flink.runtime.memory;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.memory.HybridMemorySegment;
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.core.memory.MemoryType;
-import org.apache.flink.runtime.util.KeyedBudgetManager;
-import org.apache.flink.runtime.util.KeyedBudgetManager.AcquisitionResult;
 import org.apache.flink.util.MathUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.LongFunctionWithException;
@@ -32,41 +29,34 @@ import org.apache.flink.util.function.ThrowingRunnable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnegative;
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.ConcurrentModificationException;
-import java.util.EnumMap;
-import java.util.EnumSet;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 
 import static 
org.apache.flink.core.memory.MemorySegmentFactory.allocateOffHeapUnsafeMemory;
-import static 
org.apache.flink.core.memory.MemorySegmentFactory.allocateUnpooledSegment;
 
 /**
- * The memory manager governs the memory that Flink uses for sorting, hashing, 
and caching. Memory is represented
- * either in {@link MemorySegment}s of equal size and arbitrary type or in 
reserved chunks of certain size and {@link MemoryType}.
- * Operators allocate the memory either by requesting a number of memory 
segments or by reserving chunks.
+ * The memory manager governs the memory that Flink uses for sorting, hashing, 
caching or off-heap state backends
+ * (e.g. RocksDB). Memory is represented either in {@link MemorySegment}s of 
equal size or in reserved chunks of certain
+ * size. Operators allocate the memory either by requesting a number of memory 
segments or by reserving chunks.
  * Any allocated memory has to be released to be reused later.
  *
- * <p>Which {@link MemoryType}s the MemoryManager serves and their total sizes 
can be passed as an argument
- * to the constructor.
- *
- * <p>The memory segments may be represented as on-heap byte arrays or as 
off-heap memory regions
- * (both via {@link HybridMemorySegment}). Releasing a memory segment will 
make it re-claimable
- * by the garbage collector.
+ * <p>The memory segments are represented as off-heap unsafe memory regions 
(both via {@link HybridMemorySegment}).
+ * Releasing a memory segment will make it re-claimable by the garbage 
collector, but does not necessarily immediately
+ * releases the underlying memory.
  */
 public class MemoryManager {
 
@@ -83,9 +73,15 @@ public class MemoryManager {
        private final Map<Object, Set<MemorySegment>> allocatedSegments;
 
        /** Reserved memory per memory owner. */
-       private final Map<Object, Map<MemoryType, Long>> reservedMemory;
+       private final Map<Object, Long> reservedMemory;
+
+       private final long pageSize;
+
+       private final long totalMemorySize;
+
+       private final long totalNumberOfPages;
 
-       private final KeyedBudgetManager<MemoryType> budgetByType;
+       private final AtomicLong availableMemorySize;
 
        private final SharedResources sharedResources;
 
@@ -93,31 +89,30 @@ public class MemoryManager {
        private volatile boolean isShutDown;
 
        /**
-        * Creates a memory manager with the given memory types, capacity and 
given page size.
+        * Creates a memory manager with the given capacity and given page size.
         *
-        * @param memorySizeByType The total size of the memory to be managed 
by this memory manager for each type (heap / off-heap).
+        * @param memorySize The total size of the off-heap memory to be 
managed by this memory manager.
         * @param pageSize The size of the pages handed out by the memory 
manager.
         */
-       public MemoryManager(Map<MemoryType, Long> memorySizeByType, int 
pageSize) {
-               for (Entry<MemoryType, Long> sizeForType : 
memorySizeByType.entrySet()) {
-                       sanityCheck(sizeForType.getValue(), pageSize, 
sizeForType.getKey());
-               }
+       public MemoryManager(long memorySize, int pageSize) {
+               sanityCheck(memorySize, pageSize);
 
+               this.pageSize = pageSize;
+               this.totalMemorySize = memorySize;
+               this.totalNumberOfPages = memorySize / pageSize;
                this.allocatedSegments = new ConcurrentHashMap<>();
                this.reservedMemory = new ConcurrentHashMap<>();
-               this.budgetByType = new KeyedBudgetManager<>(memorySizeByType, 
pageSize);
+               this.availableMemorySize = new AtomicLong(totalMemorySize);
                this.sharedResources = new SharedResources();
-               verifyIntTotalNumberOfPages(memorySizeByType, 
budgetByType.maxTotalNumberOfPages());
+               verifyIntTotalNumberOfPages(totalMemorySize, 
totalNumberOfPages);
 
                LOG.debug(
-                       "Initialized MemoryManager with total memory size {} 
({}), page size {}.",
-                       budgetByType.totalAvailableBudget(),
-                       memorySizeByType,
+                       "Initialized MemoryManager with total memory size {} 
and page size {}.",
+                       memorySize,
                        pageSize);
        }
 
-       private static void sanityCheck(long memorySize, int pageSize, 
MemoryType memoryType) {
-               Preconditions.checkNotNull(memoryType);
+       private static void sanityCheck(long memorySize, int pageSize) {
                Preconditions.checkArgument(memorySize >= 0L, "Size of total 
memory must be non-negative.");
                Preconditions.checkArgument(
                        pageSize >= MIN_PAGE_SIZE,
@@ -127,12 +122,13 @@ public class MemoryManager {
                        "The given page size is not a power of two.");
        }
 
-       private static void verifyIntTotalNumberOfPages(Map<MemoryType, Long> 
memorySizeByType, long numberOfPagesLong) {
+       private static void verifyIntTotalNumberOfPages(long memorySize, long 
numberOfPagesLong) {
                Preconditions.checkArgument(
                        numberOfPagesLong <= Integer.MAX_VALUE,
-                       "The given number of memory bytes (%d: %s) corresponds 
to more than MAX_INT pages.",
+                       "The given number of memory bytes (%d) corresponds to 
more than MAX_INT pages (%d > %d).",
+                       memorySize,
                        numberOfPagesLong,
-                       memorySizeByType);
+                       Integer.MAX_VALUE);
        }
 
        // 
------------------------------------------------------------------------
@@ -150,7 +146,7 @@ public class MemoryManager {
                        // mark as shutdown and release memory
                        isShutDown = true;
                        reservedMemory.clear();
-                       budgetByType.releaseAll();
+                       availableMemorySize.set(totalMemorySize);
 
                        // go over all allocated segments and release them
                        for (Set<MemorySegment> segments : 
allocatedSegments.values()) {
@@ -179,7 +175,7 @@ public class MemoryManager {
         * @return True, if the memory manager is empty and valid, false if it 
is not empty or corrupted.
         */
        public boolean verifyEmpty() {
-               return budgetByType.totalAvailableBudget() == 
budgetByType.maxTotalBudget();
+               return availableMemorySize.get() == totalMemorySize;
        }
 
        // 
------------------------------------------------------------------------
@@ -189,8 +185,7 @@ public class MemoryManager {
        /**
         * Allocates a set of memory segments from this memory manager.
         *
-        * <p>The returned segments can have any memory type. The total 
allocated memory for each type will not exceed its
-        * size limit, announced in the constructor.
+        * <p>The total allocated memory will not exceed its size limit, 
announced in the constructor.
         *
         * @param owner The owner to associate with the memory segment, for the 
fallback release.
         * @param numPages The number of pages to allocate.
@@ -209,8 +204,7 @@ public class MemoryManager {
        /**
         * Allocates a set of memory segments from this memory manager.
         *
-        * <p>The allocated segments can have any memory type. The total 
allocated memory for each type will not exceed its
-        * size limit, announced in the constructor.
+        * <p>The total allocated memory will not exceed its size limit, 
announced in the constructor.
         *
         * @param owner The owner to associate with the memory segment, for the 
fallback release.
         * @param target The list into which to put the allocated memory pages.
@@ -226,7 +220,6 @@ public class MemoryManager {
                        int numberOfPages) throws MemoryAllocationException {
                allocatePages(AllocationRequest
                        .newBuilder(owner)
-                       .ofAllTypes()
                        .numberOfPages(numberOfPages)
                        .withOutput(target)
                        .build());
@@ -251,30 +244,31 @@ public class MemoryManager {
                // sanity check
                Preconditions.checkNotNull(owner, "The memory owner must not be 
null.");
                Preconditions.checkState(!isShutDown, "Memory manager has been 
shut down.");
+               Preconditions.checkArgument(
+                       numberOfPages <= totalNumberOfPages,
+                       "Cannot allocate more segments %d than the max number 
%d",
+                       numberOfPages,
+                       totalNumberOfPages);
 
                // reserve array space, if applicable
                if (target instanceof ArrayList) {
                        ((ArrayList<MemorySegment>) 
target).ensureCapacity(numberOfPages);
                }
 
-               AcquisitionResult<MemoryType> acquiredBudget = 
budgetByType.acquirePagedBudget(request.getTypes(), numberOfPages);
-               if (acquiredBudget.isFailure()) {
-                       throw new MemoryAllocationException(
-                               String.format(
-                                       "Could not allocate %d pages. Only %d 
pages are remaining.",
-                                       numberOfPages,
-                                       
acquiredBudget.getTotalAvailableForAllQueriedKeys()));
+               long memoryToReserve = numberOfPages * pageSize;
+               try {
+                       reserveMemory(memoryToReserve);
+               } catch (MemoryReservationException e) {
+                       throw new 
MemoryAllocationException(String.format("Could not allocate %d pages", 
numberOfPages), e);
                }
 
                allocatedSegments.compute(owner, (o, currentSegmentsForOwner) 
-> {
                        Set<MemorySegment> segmentsForOwner = 
currentSegmentsForOwner == null ?
                                new HashSet<>(numberOfPages) : 
currentSegmentsForOwner;
-                       for (MemoryType memoryType : 
acquiredBudget.getAcquiredPerKey().keySet()) {
-                               for (long i = 
acquiredBudget.getAcquiredPerKey().get(memoryType); i > 0; i--) {
-                                       MemorySegment segment = 
allocateManagedSegment(memoryType, owner);
-                                       target.add(segment);
-                                       segmentsForOwner.add(segment);
-                               }
+                       for (long i = numberOfPages; i > 0; i--) {
+                               MemorySegment segment = 
allocateOffHeapUnsafeMemory(getPageSize(), owner);
+                               target.add(segment);
+                               segmentsForOwner.add(segment);
                        }
                        return segmentsForOwner;
                });
@@ -289,10 +283,9 @@ public class MemoryManager {
         *
         * <p>If the segment has already been released, it is only freed. If it 
is null or has no owner, the request is simply ignored.
         * The segment is only freed and made eligible for reclamation by the 
GC. The segment will be returned to
-        * the memory pool of its type, increasing its available limit for the 
later allocations.
+        * the memory pool, increasing its available limit for the later 
allocations.
         *
         * @param segment The segment to be released.
-        * @throws IllegalArgumentException Thrown, if the given segment is of 
an incompatible type.
         */
        public void release(MemorySegment segment) {
                Preconditions.checkState(!isShutDown, "Memory manager has been 
shut down.");
@@ -307,9 +300,8 @@ public class MemoryManager {
                        allocatedSegments.computeIfPresent(segment.getOwner(), 
(o, segsForOwner) -> {
                                segment.free();
                                if (segsForOwner.remove(segment)) {
-                                       
budgetByType.releasePageForKey(getSegmentType(segment));
+                                       releaseMemory(getPageSize());
                                }
-                               //noinspection ReturnOfNull
                                return segsForOwner.isEmpty() ? null : 
segsForOwner;
                        });
                }
@@ -322,10 +314,9 @@ public class MemoryManager {
         * Tries to release many memory segments together.
         *
         * <p>The segment is only freed and made eligible for reclamation by 
the GC. Each segment will be returned to
-        * the memory pool of its type, increasing its available limit for the 
later allocations.
+        * the memory pool, increasing its available limit for the later 
allocations.
         *
         * @param segments The segments to be released.
-        * @throws IllegalArgumentException Thrown, if the segments are of an 
incompatible type.
         */
        public void release(Collection<MemorySegment> segments) {
                if (segments == null) {
@@ -334,7 +325,7 @@ public class MemoryManager {
 
                Preconditions.checkState(!isShutDown, "Memory manager has been 
shut down.");
 
-               EnumMap<MemoryType, Long> releasedMemory = new 
EnumMap<>(MemoryType.class);
+               AtomicLong releasedMemory = new AtomicLong(0L);
 
                // since concurrent modifications to the collection
                // can disturb the release, we need to try potentially multiple 
times
@@ -365,17 +356,17 @@ public class MemoryManager {
                        }
                } while (!successfullyReleased);
 
-               budgetByType.releaseBudgetForKeys(releasedMemory);
+               releaseMemory(releasedMemory.get());
        }
 
        private MemorySegment releaseSegmentsForOwnerUntilNextOwner(
                        MemorySegment firstSeg,
                        Iterator<MemorySegment> segmentsIterator,
-                       EnumMap<MemoryType, Long> releasedMemory) {
+                       AtomicLong releasedMemory) {
                AtomicReference<MemorySegment> nextOwnerMemorySegment = new 
AtomicReference<>();
                Object owner = firstSeg.getOwner();
                allocatedSegments.compute(owner, (o, segsForOwner) -> {
-                       freeSegment(firstSeg, segsForOwner, releasedMemory);
+                       releasedMemory.addAndGet(freeSegment(firstSeg, 
segsForOwner));
                        while (segmentsIterator.hasNext()) {
                                MemorySegment segment = segmentsIterator.next();
                                try {
@@ -387,26 +378,20 @@ public class MemoryManager {
                                                
nextOwnerMemorySegment.set(segment);
                                                break;
                                        }
-                                       freeSegment(segment, segsForOwner, 
releasedMemory);
+                                       
releasedMemory.addAndGet(freeSegment(segment, segsForOwner));
                                } catch (Throwable t) {
                                        throw new RuntimeException(
                                                "Error removing book-keeping 
reference to allocated memory segment.", t);
                                }
                        }
-                       //noinspection ReturnOfNull
                        return segsForOwner == null || segsForOwner.isEmpty() ? 
null : segsForOwner;
                });
                return nextOwnerMemorySegment.get();
        }
 
-       private void freeSegment(
-                       MemorySegment segment,
-                       @Nullable Collection<MemorySegment> segments,
-                       EnumMap<MemoryType, Long> releasedMemory) {
+       private long freeSegment(MemorySegment segment, @Nullable 
Collection<MemorySegment> segments) {
                segment.free();
-               if (segments != null && segments.remove(segment)) {
-                       releaseSegment(segment, releasedMemory);
-               }
+               return segments != null && segments.remove(segment) ? 
getPageSize() : 0L;
        }
 
        /**
@@ -430,125 +415,92 @@ public class MemoryManager {
                }
 
                // free each segment
-               EnumMap<MemoryType, Long> releasedMemory = new 
EnumMap<>(MemoryType.class);
+               long releasedMemory = 0L;
                for (MemorySegment segment : segments) {
                        segment.free();
-                       releaseSegment(segment, releasedMemory);
+                       releasedMemory += getPageSize();
                }
-               budgetByType.releaseBudgetForKeys(releasedMemory);
+               releaseMemory(releasedMemory);
 
                segments.clear();
        }
 
        /**
-        * Reserves memory of a certain type for an owner from this memory 
manager.
+        * Reserves a memory chunk of a certain size for an owner from this 
memory manager.
         *
         * @param owner The owner to associate with the memory reservation, for 
the fallback release.
-        * @param memoryType type of memory to reserve (heap / off-heap).
         * @param size size of memory to reserve.
         * @throws MemoryReservationException Thrown, if this memory manager 
does not have the requested amount
         *                                    of memory any more.
         */
-       public void reserveMemory(Object owner, MemoryType memoryType, long 
size) throws MemoryReservationException {
-               checkMemoryReservationPreconditions(owner, memoryType, size);
+       public void reserveMemory(Object owner, long size) throws 
MemoryReservationException {
+               checkMemoryReservationPreconditions(owner, size);
                if (size == 0L) {
                        return;
                }
 
-               long acquiredMemory = 
budgetByType.acquireBudgetForKey(memoryType, size);
-               if (acquiredMemory < size) {
-                       throw new MemoryReservationException(
-                               String.format("Could not allocate %d bytes. 
Only %d bytes are remaining.", size, acquiredMemory));
-               }
+               reserveMemory(size);
 
-               reservedMemory.compute(owner, (o, reservations) -> {
-                       Map<MemoryType, Long> newReservations = reservations;
-                       if (reservations == null) {
-                               newReservations = new 
EnumMap<>(MemoryType.class);
-                               newReservations.put(memoryType, size);
-                       } else {
-                               reservations.compute(
-                                       memoryType,
-                                       (mt, currentlyReserved) -> 
currentlyReserved == null ? size : currentlyReserved + size);
-                       }
-                       return newReservations;
-               });
+               reservedMemory.compute(owner, (o, memoryReservedForOwner) ->
+                       memoryReservedForOwner == null ? size : 
memoryReservedForOwner + size);
 
                Preconditions.checkState(!isShutDown, "Memory manager has been 
concurrently shut down.");
        }
 
        /**
-        * Releases memory of a certain type from an owner to this memory 
manager.
+        * Releases a memory chunk of a certain size from an owner to this 
memory manager.
         *
         * @param owner The owner to associate with the memory reservation, for 
the fallback release.
-        * @param memoryType type of memory to release (heap / off-heap).
         * @param size size of memory to release.
         */
-       public void releaseMemory(Object owner, MemoryType memoryType, long 
size) {
-               checkMemoryReservationPreconditions(owner, memoryType, size);
+       public void releaseMemory(Object owner, long size) {
+               checkMemoryReservationPreconditions(owner, size);
                if (size == 0L) {
                        return;
                }
 
-               reservedMemory.compute(owner, (o, reservations) -> {
-                       if (reservations != null) {
-                               reservations.compute(
-                                       memoryType,
-                                       (mt, currentlyReserved) -> {
-                                               long newReservedMemory = 0;
-                                               if (currentlyReserved != null) {
-                                                       if (currentlyReserved < 
size) {
-                                                               LOG.warn(
-                                                                       "Trying 
to release more memory {} than it was reserved {} so far for the owner {}",
-                                                                       size,
-                                                                       
currentlyReserved,
-                                                                       owner);
-                                                       }
-
-                                                       newReservedMemory = 
releaseAndCalculateReservedMemory(size, memoryType, currentlyReserved);
-                                               }
-
-                                               return newReservedMemory == 0 ? 
null : newReservedMemory;
-                                       });
+               reservedMemory.compute(owner, (o, currentlyReserved) -> {
+                       long newReservedMemory = 0;
+                       if (currentlyReserved != null) {
+                               if (currentlyReserved < size) {
+                                       LOG.warn(
+                                               "Trying to release more memory 
{} than it was reserved {} so far for the owner {}",
+                                               size,
+                                               currentlyReserved,
+                                               owner);
+                               }
+
+                               newReservedMemory = 
releaseAndCalculateReservedMemory(size, currentlyReserved);
                        }
-                       //noinspection ReturnOfNull
-                       return reservations == null || reservations.isEmpty() ? 
null : reservations;
+
+                       return newReservedMemory == 0 ? null : 
newReservedMemory;
                });
        }
 
-       private long releaseAndCalculateReservedMemory(long memoryToFree, 
MemoryType memoryType, long currentlyReserved) {
+       private long releaseAndCalculateReservedMemory(long memoryToFree, long 
currentlyReserved) {
                final long effectiveMemoryToRelease = 
Math.min(currentlyReserved, memoryToFree);
-               budgetByType.releaseBudgetForKey(memoryType, 
effectiveMemoryToRelease);
+               releaseMemory(effectiveMemoryToRelease);
 
                return currentlyReserved - effectiveMemoryToRelease;
        }
 
-       private void checkMemoryReservationPreconditions(Object owner, 
MemoryType memoryType, long size) {
+       private void checkMemoryReservationPreconditions(Object owner, long 
size) {
                Preconditions.checkNotNull(owner, "The memory owner must not be 
null.");
-               Preconditions.checkNotNull(memoryType, "The memory type must 
not be null.");
                Preconditions.checkState(!isShutDown, "Memory manager has been 
shut down.");
                Preconditions.checkArgument(size >= 0L, "The memory size (%s) 
has to have non-negative size", size);
        }
 
        /**
-        * Releases all memory of a certain type from an owner to this memory 
manager.
+        * Releases all reserved memory chunks from an owner to this memory 
manager.
         *
         * @param owner The owner to associate with the memory reservation, for 
the fallback release.
-        * @param memoryType type of memory to release (heap / off-heap).
         */
-       public void releaseAllMemory(Object owner, MemoryType memoryType) {
-               checkMemoryReservationPreconditions(owner, memoryType, 0L);
-
-               reservedMemory.compute(owner, (o, reservations) -> {
-                       if (reservations != null) {
-                               Long size = reservations.remove(memoryType);
-                               if (size != null) {
-                                       
budgetByType.releaseBudgetForKey(memoryType, size);
-                               }
-                       }
-                       //noinspection ReturnOfNull
-                       return reservations == null || reservations.isEmpty() ? 
null : reservations;
-               });
+       public void releaseAllMemory(Object owner) {
+               checkMemoryReservationPreconditions(owner, 0L);
+               Long memoryReservedForOwner = reservedMemory.remove(owner);
+               if (memoryReservedForOwner != null) {
+                       releaseMemory(memoryReservedForOwner);
+               }
        }
 
        // 
------------------------------------------------------------------------
@@ -598,7 +550,7 @@ public class MemoryManager {
                // and release should happen
                final LongFunctionWithException<T, Exception> 
reserveAndInitialize = (size) -> {
                        try {
-                               reserveMemory(type, MemoryType.OFF_HEAP, size);
+                               reserveMemory(type, size);
                        } catch (MemoryReservationException e) {
                                throw new MemoryAllocationException("Could not 
created the shared memory resource of size " + size +
                                        ". Not enough memory left to reserve 
from the slot's managed memory.", e);
@@ -607,7 +559,7 @@ public class MemoryManager {
                        return initializer.apply(size);
                };
 
-               final Consumer<Long> releaser = (size) -> releaseMemory(type, 
MemoryType.OFF_HEAP, size);
+               final Consumer<Long> releaser = (size) -> releaseMemory(type, 
size);
 
                // This object identifies the lease in this request. It is used 
only to identify the release operation.
                // Using the object to represent the lease is a bit nicer safer 
than just using a reference counter.
@@ -663,7 +615,7 @@ public class MemoryManager {
         */
        public int getPageSize() {
                //noinspection NumericCastThatLosesPrecision
-               return (int) budgetByType.getDefaultPageSize();
+               return (int) pageSize;
        }
 
        /**
@@ -672,27 +624,16 @@ public class MemoryManager {
         * @return The total size of memory.
         */
        public long getMemorySize() {
-               return budgetByType.maxTotalBudget();
-       }
-
-       /**
-        * Returns the total size of the certain type of memory handled by this 
memory manager.
-        *
-        * @param memoryType The type of memory.
-        * @return The total size of memory.
-        */
-       public long getMemorySizeByType(MemoryType memoryType) {
-               return budgetByType.maxTotalBudgetForKey(memoryType);
+               return totalMemorySize;
        }
 
        /**
         * Returns the total size of the certain type of memory handled by this 
memory manager.
         *
-        * @param memoryType The type of memory.
-        * @return The total size of memory.
+        * @return The available amount of memory.
         */
-       public long availableMemory(MemoryType memoryType) {
-               return budgetByType.availableBudgetForKey(memoryType);
+       public long availableMemory() {
+               return availableMemorySize.get();
        }
 
        /**
@@ -709,7 +650,7 @@ public class MemoryManager {
                }
 
                //noinspection NumericCastThatLosesPrecision
-               return (int) (budgetByType.maxTotalNumberOfPages() * fraction);
+               return (int) (totalNumberOfPages * fraction);
        }
 
        /**
@@ -723,26 +664,45 @@ public class MemoryManager {
                        fraction > 0 && fraction <= 1,
                        "The fraction of memory to allocate must within (0, 1], 
was: %s", fraction);
 
-               return (long) (budgetByType.maxTotalBudget() * fraction);
+               //noinspection NumericCastThatLosesPrecision
+               return (long) Math.floor(totalMemorySize * fraction);
        }
 
-       private MemorySegment allocateManagedSegment(MemoryType memoryType, 
Object owner) {
-               switch (memoryType) {
-                       case HEAP:
-                               return allocateUnpooledSegment(getPageSize(), 
owner);
-                       case OFF_HEAP:
-                               return 
allocateOffHeapUnsafeMemory(getPageSize(), owner);
-                       default:
-                               throw new 
IllegalArgumentException("unrecognized memory type: " + memoryType);
+       private void reserveMemory(long size) throws MemoryReservationException 
{
+               long availableOrReserved = tryReserveMemory(size);
+               if (availableOrReserved < size) {
+                       throw new MemoryReservationException(
+                               String.format("Could not allocate %d bytes, 
only %d bytes are remaining", size, availableOrReserved));
                }
        }
 
-       private void releaseSegment(MemorySegment segment, EnumMap<MemoryType, 
Long> releasedMemory) {
-               releasedMemory.compute(getSegmentType(segment), (t, v) -> v == 
null ? getPageSize() : v + getPageSize());
+       private long tryReserveMemory(long size) {
+               long currentAvailableMemorySize;
+               while (size <= (currentAvailableMemorySize = 
availableMemorySize.get())) {
+                       if 
(availableMemorySize.compareAndSet(currentAvailableMemorySize, 
currentAvailableMemorySize - size)) {
+                               return size;
+                       }
+               }
+               return currentAvailableMemorySize;
        }
 
-       private static MemoryType getSegmentType(MemorySegment segment) {
-               return segment.isOffHeap() ? MemoryType.OFF_HEAP : 
MemoryType.HEAP;
+       private void releaseMemory(@Nonnegative long size) {
+               if (size == 0) {
+                       return;
+               }
+               boolean released = false;
+               long currentAvailableMemorySize = 0L;
+               while (!released && totalMemorySize >= 
(currentAvailableMemorySize = availableMemorySize.get()) + size) {
+                       released = availableMemorySize
+                               .compareAndSet(currentAvailableMemorySize, 
currentAvailableMemorySize + size);
+               }
+               if (!released) {
+                       throw new IllegalStateException(String.format(
+                               "Trying to release more managed memory (%d 
bytes) than has been allocated (%d bytes), the total size is %d bytes",
+                               size,
+                               currentAvailableMemorySize,
+                               totalMemorySize));
+               }
        }
 
        /** Memory segment allocation request. */
@@ -757,18 +717,13 @@ public class MemoryManager {
                /** Number of pages to allocate. */
                private final int numberOfPages;
 
-               /** Allowed types of memory to allocate. */
-               private final Set<MemoryType> types;
-
                private AllocationRequest(
                                Object owner,
                                Collection<MemorySegment> output,
-                               int numberOfPages,
-                               Set<MemoryType> types) {
+                               int numberOfPages) {
                        this.owner = owner;
                        this.output = output;
                        this.numberOfPages = numberOfPages;
-                       this.types = types;
                }
 
                public Object getOwner() {
@@ -779,20 +734,12 @@ public class MemoryManager {
                        return numberOfPages;
                }
 
-               public Set<MemoryType> getTypes() {
-                       return Collections.unmodifiableSet(types);
-               }
-
                public static Builder newBuilder(Object owner) {
                        return new Builder(owner);
                }
 
-               public static AllocationRequest ofAllTypes(Object owner, int 
numberOfPages) {
-                       return 
newBuilder(owner).ofAllTypes().numberOfPages(numberOfPages).build();
-               }
-
-               public static AllocationRequest ofType(Object owner, int 
numberOfPages, MemoryType type) {
-                       return 
newBuilder(owner).ofType(type).numberOfPages(numberOfPages).build();
+               public static AllocationRequest forOf(Object owner, int 
numberOfPages) {
+                       return 
newBuilder(owner).numberOfPages(numberOfPages).build();
                }
        }
 
@@ -802,7 +749,6 @@ public class MemoryManager {
                private final Object owner;
                private Collection<MemorySegment> output = new ArrayList<>();
                private int numberOfPages = 1;
-               private Set<MemoryType> types = 
EnumSet.noneOf(MemoryType.class);
 
                public Builder(Object owner) {
                        this.owner = owner;
@@ -819,18 +765,8 @@ public class MemoryManager {
                        return this;
                }
 
-               public Builder ofType(MemoryType type) {
-                       types.add(type);
-                       return this;
-               }
-
-               public Builder ofAllTypes() {
-                       types = EnumSet.allOf(MemoryType.class);
-                       return this;
-               }
-
                public AllocationRequest build() {
-                       return new AllocationRequest(owner, output, 
numberOfPages, types);
+                       return new AllocationRequest(owner, output, 
numberOfPages);
                }
        }
 
@@ -839,8 +775,6 @@ public class MemoryManager {
        // 
------------------------------------------------------------------------
 
        public static MemoryManager forDefaultPageSize(long size) {
-               final Map<MemoryType, Long> memorySizes = new HashMap<>();
-               memorySizes.put(MemoryType.OFF_HEAP, size);
-               return new MemoryManager(memorySizes, DEFAULT_PAGE_SIZE);
+               return new MemoryManager(size, DEFAULT_PAGE_SIZE);
        }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
index 779d6a9..1dc569f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.taskexecutor.slot;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.concurrent.FutureUtils;
@@ -32,7 +31,6 @@ import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -316,8 +314,6 @@ public class TaskSlot<T extends TaskSlotPayload> implements 
AutoCloseableAsync {
        }
 
        private static MemoryManager createMemoryManager(ResourceProfile 
resourceProfile, int pageSize) {
-               Map<MemoryType, Long> memorySizeByType =
-                       Collections.singletonMap(MemoryType.OFF_HEAP, 
resourceProfile.getManagedMemory().getBytes());
-               return new MemoryManager(memorySizeByType, pageSize);
+               return new 
MemoryManager(resourceProfile.getManagedMemory().getBytes(), pageSize);
        }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyedBudgetManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyedBudgetManager.java
deleted file mode 100644
index f7d0855..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/KeyedBudgetManager.java
+++ /dev/null
@@ -1,294 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util;
-
-import org.apache.flink.util.Preconditions;
-
-import javax.annotation.Nullable;
-import javax.annotation.concurrent.GuardedBy;
-import javax.annotation.concurrent.ThreadSafe;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-
-/**
- * Manages {@code long} available budget per key (allocation/release).
- *
- * <p>This manager gets a certain maximum {@code long} budget per key.
- * Users can acquire some budget for some key and release it later.
- * The manager keeps track of acquired/released budget and prevents from 
over-allocating.
- *
- * <p>There is also a paged type of allocation where a certain number of pages 
can be acquired from a set of keys.
- * The page has its budget size. The manager acquires randomly from all keys 
of a given set.
- * At the end, sum of pages acquired from each key is either requested number 
of pages or none.
- * Only integer number of pages are acquired from each key respecting its 
available budget (no page spans two or more keys)
- * or nothing is acquired reporting the maximum number of pages which could be 
acquired per each given key at the moment.
- *
- * @param <K> type of the budget key
- */
-@ThreadSafe
-public class KeyedBudgetManager<K> {
-       private final Map<K, Long> maxBudgetByKey;
-
-       private final long defaultPageSize;
-
-       private final long totalNumberOfPages;
-
-       @GuardedBy("lock")
-       private final Map<K, Long> availableBudgetByKey;
-
-       private final Object lock = new Object();
-
-       public KeyedBudgetManager(Map<K, Long> maxBudgetByKey, long 
defaultPageSize) {
-               Preconditions.checkNotNull(maxBudgetByKey);
-               Preconditions.checkArgument(defaultPageSize > 0L, "The default 
page size has to be greater than zero");
-
-               this.maxBudgetByKey = new HashMap<>(maxBudgetByKey);
-               this.availableBudgetByKey = new HashMap<>(maxBudgetByKey);
-               this.defaultPageSize = defaultPageSize;
-               this.totalNumberOfPages = 
calculateTotalNumberOfPages(maxBudgetByKey, defaultPageSize);
-       }
-
-       public long getDefaultPageSize() {
-               return defaultPageSize;
-       }
-
-       /**
-        * Tries to acquire budget for a given key.
-        *
-        * <p>No budget is acquired if it was not possible to fully acquire the 
requested budget.
-        *
-        * @param key the key to acquire budget from
-        * @param size the size of budget to acquire from the given key
-        * @return the fully acquired budget for the key or max possible budget 
to acquire
-        * if it was not possible to acquire the requested budget.
-        */
-       public long acquireBudgetForKey(K key, long size) {
-               Preconditions.checkNotNull(key);
-               AcquisitionResult<K> result = 
acquirePagedBudgetForKeys(Collections.singletonList(key), size, 1L);
-               return result.isSuccess() ?
-                       result.getAcquiredPerKey().get(key) : 
result.getTotalAvailableForAllQueriedKeys();
-       }
-
-       /**
-        * Tries to acquire budget for given keys which equals to the number of 
pages times default page size.
-        *
-        * <p>See also {@link #acquirePagedBudgetForKeys(Iterable, long, long)}
-        */
-       public AcquisitionResult<K> acquirePagedBudget(Iterable<K> keys, long 
numberOfPages) {
-               return acquirePagedBudgetForKeys(keys, numberOfPages, 
defaultPageSize);
-       }
-
-       /**
-        * Tries to acquire budget which equals to the number of pages times 
page size.
-        *
-        * <p>The budget will be acquired only from the given keys. Only 
integer number of pages will be acquired from each key.
-        * If the next page does not fit into the available budget of some key, 
it will try to be acquired from another key.
-        * The acquisition is successful if the acquired number of pages for 
each key sums up to the requested number of pages.
-        * The function does not make any preference about which keys from the 
given keys to acquire from.
-        *
-        * @param keys the keys to acquire budget from
-        * @param numberOfPages the total number of pages to acquire from the 
given keys
-        * @param pageSize the size of budget to acquire per page
-        * @return the acquired number of pages for each key if the acquisition 
is successful or
-        * the total number of pages which were available for the given keys.
-        */
-       AcquisitionResult<K> acquirePagedBudgetForKeys(Iterable<K> keys, long 
numberOfPages, long pageSize) {
-               Preconditions.checkNotNull(keys);
-               Preconditions.checkArgument(numberOfPages >= 0L, "The requested 
number of pages has to be positive");
-               Preconditions.checkArgument(pageSize > 0L, "The page size has 
to be greater than zero");
-
-               synchronized (lock) {
-                       long leftPagesToReserve = numberOfPages;
-                       Map<K, Long> pagesToReserveByKey = new HashMap<>();
-                       for (K key : keys) {
-                               long availableBudgetOfCurrentKey = 
availableBudgetByKey.getOrDefault(key, 0L);
-                               long availablePagesOfCurrentKey = 
availableBudgetOfCurrentKey / pageSize;
-                               if (leftPagesToReserve <= 
availablePagesOfCurrentKey) {
-                                       pagesToReserveByKey.put(key, 
leftPagesToReserve);
-                                       leftPagesToReserve = 0L;
-                                       break;
-                               } else if (availablePagesOfCurrentKey > 0L) {
-                                       pagesToReserveByKey.put(key, 
availablePagesOfCurrentKey);
-                                       leftPagesToReserve -= 
availablePagesOfCurrentKey;
-                               }
-                       }
-                       boolean possibleToAcquire = leftPagesToReserve == 0L;
-                       if (possibleToAcquire) {
-                               for (Entry<K, Long> pagesToReserveForKey : 
pagesToReserveByKey.entrySet()) {
-                                       //noinspection ConstantConditions
-                                       availableBudgetByKey.compute(
-                                               pagesToReserveForKey.getKey(),
-                                               (k, v) -> v - 
(pagesToReserveForKey.getValue() * pageSize));
-                               }
-                       }
-                       return possibleToAcquire ?
-                               AcquisitionResult.success(pagesToReserveByKey) 
: AcquisitionResult.failure(numberOfPages - leftPagesToReserve);
-               }
-       }
-
-       public void releasePageForKey(K key) {
-               releaseBudgetForKey(key, defaultPageSize);
-       }
-
-       public void releaseBudgetForKey(K key, long size) {
-               Preconditions.checkNotNull(key);
-               Preconditions.checkArgument(size >= 0L, "The budget to release 
has to be positive");
-
-               releaseBudgetForKeys(Collections.singletonMap(key, size));
-       }
-
-       public void releaseBudgetForKeys(Map<K, Long> sizeByKey) {
-               Preconditions.checkNotNull(sizeByKey);
-
-               synchronized (lock) {
-                       for (Entry<K, Long> toReleaseForKey : 
sizeByKey.entrySet()) {
-                               long toRelease = toReleaseForKey.getValue();
-                               Preconditions.checkArgument(
-                                       toRelease >= 0L,
-                                       "The budget to release for key %s has 
to be positive",
-                                       toReleaseForKey.getKey());
-                               if (toRelease == 0L) {
-                                       continue;
-                               }
-                               K keyToReleaseFor = toReleaseForKey.getKey();
-                               long maxBudgetForKey = 
maxBudgetByKey.get(keyToReleaseFor);
-                               availableBudgetByKey.compute(keyToReleaseFor, 
(k, currentBudget) -> {
-                                       if (currentBudget == null) {
-                                               throw new 
IllegalArgumentException("The budget key is not supported: " + keyToReleaseFor);
-                                       } else if (currentBudget + toRelease > 
maxBudgetForKey) {
-                                               throw new IllegalStateException(
-                                                       String.format(
-                                                               "The budget to 
release %d exceeds the limit %d for key %s",
-                                                               toRelease,
-                                                               maxBudgetForKey,
-                                                               
keyToReleaseFor));
-                                       } else {
-                                               return currentBudget + 
toRelease;
-                                       }
-                               });
-                       }
-               }
-       }
-
-       public void releaseAll() {
-               synchronized (lock) {
-                       availableBudgetByKey.putAll(maxBudgetByKey);
-               }
-       }
-
-       public long maxTotalBudget() {
-               return maxBudgetByKey.values().stream().mapToLong(b -> b).sum();
-       }
-
-       public long maxTotalNumberOfPages() {
-               return totalNumberOfPages;
-       }
-
-       public long maxTotalBudgetForKey(K key) {
-               Preconditions.checkNotNull(key);
-               return maxBudgetByKey.get(key);
-       }
-
-       public long totalAvailableBudget() {
-               return availableBudgetForKeys(maxBudgetByKey.keySet());
-       }
-
-       long availableBudgetForKeys(Iterable<K> keys) {
-               Preconditions.checkNotNull(keys);
-               synchronized (lock) {
-                       long totalSize = 0L;
-                       for (K key : keys) {
-                               totalSize += availableBudgetForKey(key);
-                       }
-                       return totalSize;
-               }
-       }
-
-       public long availableBudgetForKey(K key) {
-               Preconditions.checkNotNull(key);
-               synchronized (lock) {
-                       return availableBudgetByKey.getOrDefault(key, 0L);
-               }
-       }
-
-       private static <K> long calculateTotalNumberOfPages(Map<K, Long> 
budgetByType, long pageSize) {
-               long numPages = 0L;
-               for (long sizeForType : budgetByType.values()) {
-                       numPages += sizeForType / pageSize;
-               }
-               return numPages;
-       }
-
-       /**
-        * Result of budget acquisition to return from acquisition functions.
-        *
-        * <p>The result of acquisition is either success: {@link 
AcquisitionResult#isSuccess()} and this class contains
-        * acquired budget/pages per key: {@link 
AcquisitionResult#getAcquiredPerKey()} or
-        * it is failure: {@link AcquisitionResult#isFailure()} and this class 
contains total max available budget for all
-        * queried keys: {@link 
AcquisitionResult#getTotalAvailableForAllQueriedKeys()} which was not enough to
-        * acquire the requested number of pages.
-        */
-       public static class AcquisitionResult<K> {
-               @Nullable
-               private final Map<K, Long> acquiredBudgetPerKey;
-
-               @Nullable
-               private final Long totalAvailableBudgetForAllQueriedKeys;
-
-               private AcquisitionResult(
-                               @Nullable Map<K, Long> acquiredBudgetPerKey,
-                               @Nullable Long 
totalAvailableBudgetForAllQueriedKeys) {
-                       this.acquiredBudgetPerKey = acquiredBudgetPerKey;
-                       this.totalAvailableBudgetForAllQueriedKeys = 
totalAvailableBudgetForAllQueriedKeys;
-               }
-
-               public static <K> AcquisitionResult<K> success(Map<K, Long> 
acquiredBudgetPerKey) {
-                       return new AcquisitionResult<>(acquiredBudgetPerKey, 
null);
-               }
-
-               public static <K> AcquisitionResult<K> failure(long 
totalAvailableBudgetForAllQueriedKeys) {
-                       return new AcquisitionResult<>(null, 
totalAvailableBudgetForAllQueriedKeys);
-               }
-
-               public boolean isSuccess() {
-                       return acquiredBudgetPerKey != null;
-               }
-
-               public boolean isFailure() {
-                       return totalAvailableBudgetForAllQueriedKeys != null;
-               }
-
-               public Map<K, Long> getAcquiredPerKey() {
-                       if (acquiredBudgetPerKey == null) {
-                               throw new IllegalStateException("The 
acquisition failed. Nothing was acquired.");
-                       }
-                       return 
Collections.unmodifiableMap(acquiredBudgetPerKey);
-               }
-
-               public long getTotalAvailableForAllQueriedKeys() {
-                       if (totalAvailableBudgetForAllQueriedKeys == null) {
-                               throw new IllegalStateException("The 
acquisition succeeded. All requested pages were acquired.");
-                       }
-                       return totalAvailableBudgetForAllQueriedKeys;
-               }
-       }
-}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerBuilder.java
index 889f9cd..91599ac 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerBuilder.java
@@ -18,29 +18,19 @@
 
 package org.apache.flink.runtime.memory;
 
-import org.apache.flink.core.memory.MemoryType;
-
-import java.util.EnumMap;
-import java.util.Map;
-
 import static org.apache.flink.runtime.memory.MemoryManager.DEFAULT_PAGE_SIZE;
 
 /** Builder class for {@link MemoryManager}. */
 public class MemoryManagerBuilder {
        private static final long DEFAULT_MEMORY_SIZE = 32L * DEFAULT_PAGE_SIZE;
 
-       private final Map<MemoryType, Long> memoryPools = new 
EnumMap<>(MemoryType.class);
+       private long memorySize = DEFAULT_MEMORY_SIZE;
        private int pageSize = DEFAULT_PAGE_SIZE;
 
        private MemoryManagerBuilder() {}
 
        public MemoryManagerBuilder setMemorySize(long memorySize) {
-               this.memoryPools.put(MemoryType.HEAP, memorySize);
-               return this;
-       }
-
-       public MemoryManagerBuilder setMemorySize(MemoryType memoryType, long 
memorySize) {
-               this.memoryPools.put(memoryType, memorySize);
+               this.memorySize = memorySize;
                return this;
        }
 
@@ -50,10 +40,7 @@ public class MemoryManagerBuilder {
        }
 
        public MemoryManager build() {
-               if (memoryPools.isEmpty()) {
-                       memoryPools.put(MemoryType.HEAP, DEFAULT_MEMORY_SIZE);
-               }
-               return new MemoryManager(memoryPools, pageSize);
+               return new MemoryManager(memorySize, pageSize);
        }
 
        public static MemoryManagerBuilder newBuilder() {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerSharedResourcesTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerSharedResourcesTest.java
index 9f49fe2..d501684 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerSharedResourcesTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerSharedResourcesTest.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.memory;
 
-import org.apache.flink.core.memory.MemoryType;
-
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -77,18 +75,18 @@ public class MemoryManagerSharedResourcesTest {
 
                memoryManager.getSharedMemoryResourceForManagedMemory("type", 
TestResource::new, 0.5);
 
-               assertEquals(memoryManager.getMemorySize() / 2, 
memoryManager.availableMemory(MemoryType.OFF_HEAP));
+               assertEquals(memoryManager.getMemorySize() / 2, 
memoryManager.availableMemory());
        }
 
        @Test
        public void getExistingDoesNotAllocateAdditionalMemory() throws 
Exception {
                final MemoryManager memoryManager = createMemoryManager();
                memoryManager.getSharedMemoryResourceForManagedMemory("type", 
TestResource::new, 0.8);
-               final long freeMemory = 
memoryManager.availableMemory(MemoryType.OFF_HEAP);
+               final long freeMemory = memoryManager.availableMemory();
 
                memoryManager.getSharedMemoryResourceForManagedMemory("type", 
TestResource::new, 0.8);
 
-               assertEquals(freeMemory, 
memoryManager.availableMemory(MemoryType.OFF_HEAP));
+               assertEquals(freeMemory, memoryManager.availableMemory());
        }
 
        @Test
@@ -222,7 +220,7 @@ public class MemoryManagerSharedResourcesTest {
 
                // this is to guard test assumptions
                assertEquals(size, mm.getMemorySize());
-               assertEquals(size, mm.availableMemory(MemoryType.OFF_HEAP));
+               assertEquals(size, mm.availableMemory());
 
                return mm;
        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java
index 75a3151..6a553df 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.memory;
 
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryManager.AllocationRequest;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
@@ -31,17 +30,11 @@ import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.EnumMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Random;
 
-import static 
org.apache.flink.runtime.memory.MemoryManager.AllocationRequest.ofAllTypes;
-import static 
org.apache.flink.runtime.memory.MemoryManager.AllocationRequest.ofType;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.number.OrderingComparison.lessThanOrEqualTo;
+import static 
org.apache.flink.runtime.memory.MemoryManager.AllocationRequest.forOf;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
 /**
@@ -65,8 +58,7 @@ public class MemoryManagerTest {
        public void setUp() {
                this.memoryManager = MemoryManagerBuilder
                        .newBuilder()
-                       .setMemorySize(MemoryType.HEAP, MEMORY_SIZE / 2)
-                       .setMemorySize(MemoryType.OFF_HEAP, MEMORY_SIZE / 2)
+                       .setMemorySize(MEMORY_SIZE)
                        .setPageSize(PAGE_SIZE)
                        .build();
                this.random = new Random(RANDOM_SEED);
@@ -173,7 +165,7 @@ public class MemoryManagerTest {
 
                        List<MemorySegment> segs = 
this.memoryManager.allocatePages(mockInvoke, NUM_PAGES);
 
-                       testCannotAllocateAnymore(ofAllTypes(mockInvoke, 1));
+                       testCannotAllocateAnymore(forOf(mockInvoke, 1));
 
                        Assert.assertTrue("The previously allocated segments 
were not valid any more.",
                                                                                
                                                        
allMemorySegmentsValid(segs));
@@ -190,13 +182,13 @@ public class MemoryManagerTest {
        public void doubleReleaseReturnsMemoryOnlyOnce() throws 
MemoryAllocationException {
                final AbstractInvokable mockInvoke = new DummyInvokable();
 
-               Collection<MemorySegment> segs = 
this.memoryManager.allocatePages(ofAllTypes(mockInvoke, NUM_PAGES));
+               Collection<MemorySegment> segs = 
this.memoryManager.allocatePages(forOf(mockInvoke, NUM_PAGES));
                MemorySegment segment = segs.iterator().next();
 
                this.memoryManager.release(segment);
                this.memoryManager.release(segment);
 
-               testCannotAllocateAnymore(ofAllTypes(mockInvoke, 2));
+               testCannotAllocateAnymore(forOf(mockInvoke, 2));
 
                this.memoryManager.releaseAll(mockInvoke);
        }
@@ -220,116 +212,86 @@ public class MemoryManagerTest {
        }
 
        @Test
-       @SuppressWarnings("NumericCastThatLosesPrecision")
-       public void testAllocateMixedMemoryType() throws 
MemoryAllocationException {
-               int totalHeapPages = (int) 
memoryManager.getMemorySizeByType(MemoryType.HEAP) / PAGE_SIZE;
-               int totalOffHeapPages = (int) 
memoryManager.getMemorySizeByType(MemoryType.OFF_HEAP) / PAGE_SIZE;
-               int pagesToAllocate =  totalHeapPages + totalOffHeapPages / 2;
-
+       public void testMemoryReservation() throws MemoryReservationException {
                Object owner = new Object();
-               Collection<MemorySegment> segments = 
memoryManager.allocatePages(ofAllTypes(owner, pagesToAllocate));
-               Map<MemoryType, Integer> split = 
calcMemoryTypeSplitForSegments(segments);
-
-               assertThat(split.get(MemoryType.HEAP), 
lessThanOrEqualTo(totalHeapPages));
-               assertThat(split.get(MemoryType.OFF_HEAP), 
lessThanOrEqualTo(totalOffHeapPages));
-               assertThat(split.get(MemoryType.HEAP) + 
split.get(MemoryType.OFF_HEAP), is(pagesToAllocate));
 
-               memoryManager.release(segments);
-       }
-
-       private static Map<MemoryType, Integer> 
calcMemoryTypeSplitForSegments(Iterable<MemorySegment> segments) {
-               int heapPages = 0;
-               int offHeapPages = 0;
-               for (MemorySegment memorySegment : segments) {
-                       if (memorySegment.isOffHeap()) {
-                               offHeapPages++;
-                       } else {
-                               heapPages++;
-                       }
-               }
-               Map<MemoryType, Integer> split = new 
EnumMap<>(MemoryType.class);
-               split.put(MemoryType.HEAP, heapPages);
-               split.put(MemoryType.OFF_HEAP, offHeapPages);
-               return split;
+               memoryManager.reserveMemory(owner, PAGE_SIZE);
+               memoryManager.releaseMemory(owner, PAGE_SIZE);
        }
 
        @Test
-       public void testMemoryReservation() throws MemoryReservationException {
+       public void testAllMemoryReservation() throws 
MemoryReservationException {
                Object owner = new Object();
 
-               memoryManager.reserveMemory(owner, MemoryType.HEAP, PAGE_SIZE);
-               memoryManager.reserveMemory(owner, MemoryType.OFF_HEAP, 
memoryManager.getMemorySizeByType(MemoryType.OFF_HEAP));
-
-               memoryManager.releaseMemory(owner, MemoryType.HEAP, PAGE_SIZE);
-               memoryManager.releaseAllMemory(owner, MemoryType.OFF_HEAP);
+               memoryManager.reserveMemory(owner, 
memoryManager.getMemorySize());
+               memoryManager.releaseAllMemory(owner);
        }
 
        @Test
        public void testCannotReserveBeyondTheLimit() throws 
MemoryReservationException {
                Object owner = new Object();
-               memoryManager.reserveMemory(owner, MemoryType.OFF_HEAP, 
memoryManager.getMemorySizeByType(MemoryType.OFF_HEAP));
-               testCannotReserveAnymore(MemoryType.OFF_HEAP, 1L);
-               memoryManager.releaseAllMemory(owner, MemoryType.OFF_HEAP);
+               memoryManager.reserveMemory(owner, 
memoryManager.getMemorySize());
+               testCannotReserveAnymore(1L);
+               memoryManager.releaseAllMemory(owner);
        }
 
        @Test
        public void testMemoryTooBigReservation() {
-               long size = memoryManager.getMemorySizeByType(MemoryType.HEAP) 
+ PAGE_SIZE;
-               testCannotReserveAnymore(MemoryType.HEAP, size);
+               long size = memoryManager.getMemorySize() + PAGE_SIZE;
+               testCannotReserveAnymore(size);
        }
 
        @Test
        public void testMemoryReleaseMultipleTimes() throws 
MemoryReservationException {
                Object owner = new Object();
                Object owner2 = new Object();
-               long totalHeapMemorySize = 
memoryManager.availableMemory(MemoryType.HEAP);
+               long totalHeapMemorySize = memoryManager.availableMemory();
                // to prevent memory size exceeding the limit, reserve some 
memory from another owner.
-               memoryManager.reserveMemory(owner2, MemoryType.HEAP, PAGE_SIZE);
+               memoryManager.reserveMemory(owner2, PAGE_SIZE);
 
                // reserve once but release twice
-               memoryManager.reserveMemory(owner, MemoryType.HEAP, PAGE_SIZE);
-               memoryManager.releaseMemory(owner, MemoryType.HEAP, PAGE_SIZE);
-               memoryManager.releaseMemory(owner, MemoryType.HEAP, PAGE_SIZE);
-               long heapMemoryLeft = 
memoryManager.availableMemory(MemoryType.HEAP);
+               memoryManager.reserveMemory(owner, PAGE_SIZE);
+               memoryManager.releaseMemory(owner, PAGE_SIZE);
+               memoryManager.releaseMemory(owner, PAGE_SIZE);
+               long heapMemoryLeft = memoryManager.availableMemory();
                assertEquals("Memory leak happens", totalHeapMemorySize - 
PAGE_SIZE, heapMemoryLeft);
-               memoryManager.releaseAllMemory(owner2, MemoryType.HEAP);
+               memoryManager.releaseAllMemory(owner2);
        }
 
        @Test
        public void testMemoryReleaseMoreThanReserved() throws 
MemoryReservationException {
                Object owner = new Object();
                Object owner2 = new Object();
-               long totalHeapMemorySize = 
memoryManager.availableMemory(MemoryType.HEAP);
+               long totalHeapMemorySize = memoryManager.availableMemory();
                // to prevent memory size exceeding the limit, reserve some 
memory from another owner.
-               memoryManager.reserveMemory(owner2, MemoryType.HEAP, PAGE_SIZE);
+               memoryManager.reserveMemory(owner2, PAGE_SIZE);
 
                // release more than reserved size
-               memoryManager.reserveMemory(owner, MemoryType.HEAP, PAGE_SIZE);
-               memoryManager.releaseMemory(owner, MemoryType.HEAP, PAGE_SIZE * 
2);
-               long heapMemoryLeft = 
memoryManager.availableMemory(MemoryType.HEAP);
+               memoryManager.reserveMemory(owner, PAGE_SIZE);
+               memoryManager.releaseMemory(owner, PAGE_SIZE * 2);
+               long heapMemoryLeft = memoryManager.availableMemory();
                assertEquals("Memory leak happens", totalHeapMemorySize - 
PAGE_SIZE, heapMemoryLeft);
-               memoryManager.releaseAllMemory(owner2, MemoryType.HEAP);
+               memoryManager.releaseAllMemory(owner2);
        }
 
        @Test
        public void testMemoryAllocationAndReservation() throws 
MemoryAllocationException, MemoryReservationException {
-               MemoryType type = MemoryType.OFF_HEAP;
                @SuppressWarnings("NumericCastThatLosesPrecision")
-               int totalPagesForType = (int) 
memoryManager.getMemorySizeByType(type) / PAGE_SIZE;
+               int totalPagesForType = (int) memoryManager.getMemorySize() / 
PAGE_SIZE;
 
                // allocate half memory for segments
                Object owner1 = new Object();
-               memoryManager.allocatePages(ofType(owner1, totalPagesForType / 
2, MemoryType.OFF_HEAP));
+               memoryManager.allocatePages(forOf(owner1, totalPagesForType / 
2));
 
                // reserve the other half of memory
                Object owner2 = new Object();
-               memoryManager.reserveMemory(owner2, type, (long) PAGE_SIZE * 
totalPagesForType / 2);
+               memoryManager.reserveMemory(owner2, (long) PAGE_SIZE * 
totalPagesForType / 2);
 
-               testCannotAllocateAnymore(ofType(new Object(), 1, type));
-               testCannotReserveAnymore(type, 1L);
+               testCannotAllocateAnymore(forOf(new Object(), 1));
+               testCannotReserveAnymore(1L);
 
                memoryManager.releaseAll(owner1);
-               memoryManager.releaseAllMemory(owner2, type);
+               memoryManager.releaseAllMemory(owner2);
        }
 
        @Test
@@ -366,9 +328,9 @@ public class MemoryManagerTest {
                }
        }
 
-       private void testCannotReserveAnymore(MemoryType type, long size) {
+       private void testCannotReserveAnymore(long size) {
                try {
-                       memoryManager.reserveMemory(new Object(), type, size);
+                       memoryManager.reserveMemory(new Object(), size);
                        Assert.fail("Expected MemoryAllocationException. " +
                                "We should not be able to any more memory after 
allocating or(and) reserving all memory of a certain type.");
                } catch (MemoryReservationException maex) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 52b00b8..01306f6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
@@ -148,7 +147,7 @@ public class MockEnvironment implements Environment, 
AutoCloseable {
                this.inputs = new LinkedList<InputGate>();
                this.outputs = new LinkedList<ResultPartitionWriter>();
 
-               this.memManager = 
MemoryManagerBuilder.newBuilder().setMemorySize(MemoryType.OFF_HEAP, 
offHeapMemorySize).build();
+               this.memManager = 
MemoryManagerBuilder.newBuilder().setMemorySize(offHeapMemorySize).build();
                this.ioManager = ioManager;
                this.taskManagerRuntimeInfo = taskManagerRuntimeInfo;
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyedBudgetManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyedBudgetManagerTest.java
deleted file mode 100644
index 0d431bd..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/KeyedBudgetManagerTest.java
+++ /dev/null
@@ -1,262 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util;
-
-import org.apache.flink.runtime.util.KeyedBudgetManager.AcquisitionResult;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.flink.util.TestLogger;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Future;
-import java.util.stream.Collectors;
-import java.util.stream.LongStream;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
-
-/**
- * Test suite for {@link KeyedBudgetManager}.
- */
-@SuppressWarnings("MagicNumber")
-public class KeyedBudgetManagerTest extends TestLogger {
-       private static final String[] TEST_KEYS = {"k1", "k2", "k3", "k4"};
-       private static final long[] TEST_BUDGETS = {15, 17, 22, 11};
-       private static final Executor NEW_THREAD_EXECUTOR = r -> new 
Thread(r).start();
-
-       private KeyedBudgetManager<String> keyedBudgetManager;
-
-       @Before
-       public void setup() {
-               keyedBudgetManager = createSimpleKeyedBudget();
-       }
-
-       @After
-       public void teardown() {
-               keyedBudgetManager.releaseAll();
-               checkNoKeyBudgetChange();
-       }
-
-       @Test
-       public void testSuccessfulAcquisitionForKey() {
-               long acquired = keyedBudgetManager.acquireBudgetForKey("k1", 
10L);
-
-               assertThat(acquired, is(10L));
-               checkOneKeyBudgetChange("k1", 5L);
-       }
-
-       @Test
-       public void testFailedAcquisitionForKey() {
-               long maxPossibleBudgetToAcquire = 
keyedBudgetManager.acquireBudgetForKey("k1", 20L);
-
-               assertThat(maxPossibleBudgetToAcquire, is(15L));
-               checkNoKeyBudgetChange();
-       }
-
-       @Test
-       public void testSuccessfulReleaseForKey() {
-               keyedBudgetManager.acquireBudgetForKey("k1", 10L);
-               keyedBudgetManager.releaseBudgetForKey("k1", 5L);
-
-               checkOneKeyBudgetChange("k1", 10L);
-       }
-
-       @Test
-       public void testFailedReleaseForKey() {
-               keyedBudgetManager.acquireBudgetForKey("k1", 10L);
-               try {
-                       keyedBudgetManager.releaseBudgetForKey("k1", 15L);
-                       fail("IllegalStateException is expected to fail 
over-sized release");
-               } catch (IllegalStateException e) {
-                       // expected
-               }
-
-               checkOneKeyBudgetChange("k1", 5L);
-       }
-
-       @Test
-       public void testSuccessfulAcquisitionForKeys() {
-               AcquisitionResult<String> acquired = acquireForMultipleKeys(5L);
-
-               assertThat(checkAcquisitionSuccess(acquired, 4L), is(true));
-
-               assertThat(keyedBudgetManager.availableBudgetForKey("k1"), 
is(15L));
-               
assertThat(keyedBudgetManager.availableBudgetForKeys(Arrays.asList("k2", 
"k3")), is(19L));
-               assertThat(keyedBudgetManager.totalAvailableBudget(), is(45L));
-       }
-
-       @Test
-       public void testConcurrentAcquisitionForKeys() throws 
ExecutionException, InterruptedException {
-               long pageSize = 5L;
-               CompletableFuture<AcquisitionResult<String>> allocation1 = 
acquireForMultipleKeysAsync(pageSize);
-               CompletableFuture<Long> availableBudgetForKeysFuture = 
getAvailableBudgetForKeysAsync();
-               CompletableFuture<AcquisitionResult<String>> allocation2 = 
acquireForMultipleKeysAsync(pageSize);
-               Arrays
-                       .asList(allocation1, allocation2, 
availableBudgetForKeysFuture)
-                       .forEach(KeyedBudgetManagerTest::waitForFutureSilently);
-
-               boolean firstSucceeded = 
checkFirstAcquisitionSucceeded(allocation1, allocation2);
-               boolean secondSucceeded = 
checkFirstAcquisitionSucceeded(allocation2, allocation1);
-               assertThat(firstSucceeded || secondSucceeded, is(true));
-
-               long availableBudgetForKeys = 
availableBudgetForKeysFuture.get();
-               assertThat(availableBudgetForKeys == 39L || 
availableBudgetForKeys == 19L, is(true));
-       }
-
-       @Test
-       public void testConcurrentReleaseForKeys() throws ExecutionException, 
InterruptedException {
-               long pageSize = 5L;
-               Map<String, Long> sizeByKey = acquireForMultipleKeys(pageSize)
-                       .getAcquiredPerKey()
-                       .entrySet()
-                       .stream()
-                       .collect(Collectors.toMap(Entry::getKey, e -> 
e.getValue() * pageSize));
-
-               CompletableFuture<Void> release1 = releaseKeysAsync(sizeByKey);
-               CompletableFuture<Long> availableBudgetForKeysFuture = 
getAvailableBudgetForKeysAsync();
-               CompletableFuture<Void> release2 = releaseKeysAsync(sizeByKey);
-               Arrays
-                       .asList(release1, availableBudgetForKeysFuture, 
release2)
-                       .forEach(KeyedBudgetManagerTest::waitForFutureSilently);
-
-               boolean firstSucceeded = !release1.isCompletedExceptionally() 
&& release2.isCompletedExceptionally();
-               boolean secondSucceeded = !release2.isCompletedExceptionally() 
&& release1.isCompletedExceptionally();
-               assertThat(firstSucceeded || secondSucceeded, is(true));
-
-               long availableBudgetForKeys = 
availableBudgetForKeysFuture.get();
-               assertThat(availableBudgetForKeys == 39L || 
availableBudgetForKeys == 19L, is(true));
-
-               checkNoKeyBudgetChange();
-       }
-
-       @Test
-       public void testFailedAcquisitionForKeys() {
-               AcquisitionResult<String> acquired =
-                       
keyedBudgetManager.acquirePagedBudgetForKeys(Arrays.asList("k2", "k3"), 6, 6);
-
-               assertThat(acquired.isFailure(), is(true));
-               assertThat(acquired.getTotalAvailableForAllQueriedKeys(), 
is(5L));
-               checkNoKeyBudgetChange();
-       }
-
-       @Test
-       public void testSuccessfulReleaseForKeys() {
-               
keyedBudgetManager.acquirePagedBudgetForKeys(Arrays.asList("k2", "k3"), 4, 8);
-               keyedBudgetManager.releaseBudgetForKeys(createdBudgetMap(new 
String[] {"k2", "k3"}, new long[] {7, 10}));
-
-               
assertThat(keyedBudgetManager.availableBudgetForKeys(Arrays.asList("k2", 
"k3")), is(24L));
-               
assertThat(keyedBudgetManager.availableBudgetForKeys(Arrays.asList("k1", 
"k4")), is(26L));
-               assertThat(keyedBudgetManager.totalAvailableBudget(), is(50L));
-       }
-
-       @Test
-       public void testSuccessfulReleaseForKeysWithMixedRequests() {
-               
keyedBudgetManager.acquirePagedBudgetForKeys(Arrays.asList("k2", "k3"), 4, 8);
-               
keyedBudgetManager.acquirePagedBudgetForKeys(Arrays.asList("k1", "k4"), 6, 3);
-               keyedBudgetManager.releaseBudgetForKeys(createdBudgetMap(new 
String[] {"k2", "k3"}, new long[] {7, 10}));
-
-               
assertThat(keyedBudgetManager.availableBudgetForKeys(Arrays.asList("k2", 
"k3")), is(24L));
-               
assertThat(keyedBudgetManager.availableBudgetForKeys(Arrays.asList("k1", 
"k4")), is(8L));
-               assertThat(keyedBudgetManager.totalAvailableBudget(), is(32L));
-       }
-
-       private void checkNoKeyBudgetChange() {
-               checkKeysBudgetChange(Collections.emptyMap());
-       }
-
-       private void checkOneKeyBudgetChange(
-                       @SuppressWarnings("SameParameterValue") String key,
-                       long budget) {
-               checkKeysBudgetChange(Collections.singletonMap(key, budget));
-       }
-
-       private void checkKeysBudgetChange(
-                       Map<String, Long> changedBudgetPerKey) {
-               long totalExpectedBudget = 0L;
-               for (int i = 0; i < TEST_KEYS.length; i++) {
-                       long expectedBudget = 
changedBudgetPerKey.containsKey(TEST_KEYS[i]) ?
-                               changedBudgetPerKey.get(TEST_KEYS[i]) : 
TEST_BUDGETS[i];
-                       
assertThat(keyedBudgetManager.availableBudgetForKey(TEST_KEYS[i]), 
is(expectedBudget));
-                       totalExpectedBudget += expectedBudget;
-               }
-               assertThat(keyedBudgetManager.maxTotalBudget(), 
is(LongStream.of(TEST_BUDGETS).sum()));
-               assertThat(keyedBudgetManager.totalAvailableBudget(), 
is(totalExpectedBudget));
-       }
-
-       private CompletableFuture<AcquisitionResult<String>> 
acquireForMultipleKeysAsync(long pageSize) {
-               return CompletableFuture.supplyAsync(() -> 
acquireForMultipleKeys(pageSize), NEW_THREAD_EXECUTOR);
-       }
-
-       private CompletableFuture<Long> getAvailableBudgetForKeysAsync() {
-               return CompletableFuture.supplyAsync(() -> 
keyedBudgetManager.availableBudgetForKeys(Arrays.asList("k2", "k3")), 
NEW_THREAD_EXECUTOR);
-       }
-
-       private AcquisitionResult<String> acquireForMultipleKeys(long pageSize) 
{
-               return 
keyedBudgetManager.acquirePagedBudgetForKeys(Arrays.asList("k2", "k3"), 4, 
pageSize);
-       }
-
-       private CompletableFuture<Void> releaseKeysAsync(Map<String, Long> 
sizeByKey) {
-               return CompletableFuture.runAsync(() -> 
keyedBudgetManager.releaseBudgetForKeys(sizeByKey), NEW_THREAD_EXECUTOR);
-       }
-
-       private static boolean checkFirstAcquisitionSucceeded(
-               Future<AcquisitionResult<String>> allocation1,
-               Future<AcquisitionResult<String>> allocation2) throws 
ExecutionException, InterruptedException {
-               return checkAcquisitionSuccess(allocation1.get(), 4L) && 
allocation2.get().isFailure();
-       }
-
-       private static boolean checkAcquisitionSuccess(
-               AcquisitionResult<String> acquired,
-               @SuppressWarnings("SameParameterValue") long 
numberOfPageToAcquire) {
-               return acquired.isSuccess() &&
-                       
acquired.getAcquiredPerKey().values().stream().mapToLong(b -> b).sum() == 
numberOfPageToAcquire;
-       }
-
-       private static KeyedBudgetManager<String> createSimpleKeyedBudget() {
-               return new KeyedBudgetManager<>(createdBudgetMap(TEST_KEYS, 
TEST_BUDGETS), 1L);
-       }
-
-       private static Map<String, Long> createdBudgetMap(String[] keys, long[] 
budgets) {
-               Preconditions.checkArgument(keys.length == budgets.length);
-               Map<String, Long> keydBudgets = new HashMap<>();
-               for (int i = 0; i < keys.length; i++) {
-                       keydBudgets.put(keys[i], budgets[i]);
-               }
-               return keydBudgets;
-       }
-
-       private static void waitForFutureSilently(Future<?> future) {
-               try {
-                       future.get();
-               } catch (InterruptedException | ExecutionException e) {
-                       // silent
-               }
-       }
-}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index 181f701..359f908 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -24,7 +24,6 @@ import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
@@ -159,7 +158,7 @@ public class StreamMockEnvironment implements Environment {
                this.taskConfiguration = taskConfig;
                this.inputs = new LinkedList<InputGate>();
                this.outputs = new LinkedList<ResultPartitionWriter>();
-               this.memManager = 
MemoryManagerBuilder.newBuilder().setMemorySize(MemoryType.OFF_HEAP, 
offHeapMemorySize).build();
+               this.memManager = 
MemoryManagerBuilder.newBuilder().setMemorySize(offHeapMemorySize).build();
                this.ioManager = new IOManagerAsync();
                this.taskStateManager = 
Preconditions.checkNotNull(taskStateManager);
                this.aggregateManager = new TestGlobalAggregateManager();

Reply via email to