This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit d16393feb32fb6470d0d6ea7ef5b6b9fac89ca21 Author: Jark Wu <[email protected]> AuthorDate: Thu Jan 14 17:16:42 2021 +0800 [hotfix][table-runtime-blink] Make MemoryManager not null in LazyMemorySegmentPool --- .../table/runtime/util/LazyMemorySegmentPool.java | 45 +++++----------------- .../runtime/util/collections/binary/BytesMap.java | 3 +- 2 files changed, 11 insertions(+), 37 deletions(-) diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/LazyMemorySegmentPool.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/LazyMemorySegmentPool.java index 54b09b4..3f39073 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/LazyMemorySegmentPool.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/LazyMemorySegmentPool.java @@ -19,13 +19,10 @@ package org.apache.flink.table.runtime.util; import org.apache.flink.core.memory.MemorySegment; -import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.runtime.memory.MemoryAllocationException; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.util.Preconditions; -import javax.annotation.Nullable; - import java.io.Closeable; import java.util.ArrayList; import java.util.Collections; @@ -37,37 +34,26 @@ public class LazyMemorySegmentPool implements MemorySegmentPool, Closeable { private static final long PER_REQUEST_MEMORY_SIZE = 16 * 1024 * 1024; private final Object owner; - private final @Nullable MemoryManager memoryManager; + private final MemoryManager memoryManager; private final ArrayList<MemorySegment> cachePages; private final int maxPages; private final int perRequestPages; private int pageUsage; - public LazyMemorySegmentPool(Object owner, MemoryManager memoryManager, long memorySize) { - this( - owner, - memoryManager, - (int) memorySize - / (memoryManager == null - ? MemoryManager.DEFAULT_PAGE_SIZE - : memoryManager.getPageSize())); - } - public LazyMemorySegmentPool(Object owner, MemoryManager memoryManager, int maxPages) { this.owner = owner; this.memoryManager = memoryManager; this.cachePages = new ArrayList<>(); this.maxPages = maxPages; this.pageUsage = 0; - this.perRequestPages = Math.max(1, (int) (PER_REQUEST_MEMORY_SIZE / pageSize())); + this.perRequestPages = + Math.max(1, (int) (PER_REQUEST_MEMORY_SIZE / memoryManager.getPageSize())); } @Override public int pageSize() { - return memoryManager == null - ? MemoryManager.DEFAULT_PAGE_SIZE - : memoryManager.getPageSize(); + return this.memoryManager.getPageSize(); } @Override @@ -92,19 +78,10 @@ public class LazyMemorySegmentPool implements MemorySegmentPool, Closeable { if (this.cachePages.isEmpty()) { int numPages = Math.min(freePages, this.perRequestPages); - // allocate from non-managed heap memory - if (memoryManager == null) { - for (int i = 0; i < numPages; i++) { - cachePages.add( - MemorySegmentFactory.allocateUnpooledSegment( - MemoryManager.DEFAULT_PAGE_SIZE)); - } - } else { - try { - this.memoryManager.allocatePages(owner, this.cachePages, numPages); - } catch (MemoryAllocationException e) { - throw new RuntimeException(e); - } + try { + this.memoryManager.allocatePages(owner, this.cachePages, numPages); + } catch (MemoryAllocationException e) { + throw new RuntimeException(e); } } this.pageUsage++; @@ -148,10 +125,6 @@ public class LazyMemorySegmentPool implements MemorySegmentPool, Closeable { } public void cleanCache() { - if (memoryManager == null) { - this.cachePages.clear(); - } else { - this.memoryManager.release(this.cachePages); - } + this.memoryManager.release(this.cachePages); } } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/collections/binary/BytesMap.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/collections/binary/BytesMap.java index f4db530..ffe32dc 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/collections/binary/BytesMap.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/collections/binary/BytesMap.java @@ -94,7 +94,8 @@ public abstract class BytesMap<K, V> { MemoryManager memoryManager, long memorySize, TypeSerializer<K> keySerializer) { - this.memoryPool = new LazyMemorySegmentPool(owner, memoryManager, memorySize); + int maxPages = (int) (memorySize / memoryManager.getPageSize()); + this.memoryPool = new LazyMemorySegmentPool(owner, memoryManager, maxPages); this.segmentSize = memoryPool.pageSize(); this.reservedNumBuffers = (int) (memorySize / segmentSize); this.numBucketsPerSegment = segmentSize / BUCKET_SIZE;
