This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d16393feb32fb6470d0d6ea7ef5b6b9fac89ca21
Author: Jark Wu <[email protected]>
AuthorDate: Thu Jan 14 17:16:42 2021 +0800

    [hotfix][table-runtime-blink] Make MemoryManager not null in 
LazyMemorySegmentPool
---
 .../table/runtime/util/LazyMemorySegmentPool.java  | 45 +++++-----------------
 .../runtime/util/collections/binary/BytesMap.java  |  3 +-
 2 files changed, 11 insertions(+), 37 deletions(-)

diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/LazyMemorySegmentPool.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/LazyMemorySegmentPool.java
index 54b09b4..3f39073 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/LazyMemorySegmentPool.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/LazyMemorySegmentPool.java
@@ -19,13 +19,10 @@
 package org.apache.flink.table.runtime.util;
 
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.memory.MemoryAllocationException;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.util.Preconditions;
 
-import javax.annotation.Nullable;
-
 import java.io.Closeable;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -37,37 +34,26 @@ public class LazyMemorySegmentPool implements 
MemorySegmentPool, Closeable {
     private static final long PER_REQUEST_MEMORY_SIZE = 16 * 1024 * 1024;
 
     private final Object owner;
-    private final @Nullable MemoryManager memoryManager;
+    private final MemoryManager memoryManager;
     private final ArrayList<MemorySegment> cachePages;
     private final int maxPages;
     private final int perRequestPages;
 
     private int pageUsage;
 
-    public LazyMemorySegmentPool(Object owner, MemoryManager memoryManager, 
long memorySize) {
-        this(
-                owner,
-                memoryManager,
-                (int) memorySize
-                        / (memoryManager == null
-                                ? MemoryManager.DEFAULT_PAGE_SIZE
-                                : memoryManager.getPageSize()));
-    }
-
     public LazyMemorySegmentPool(Object owner, MemoryManager memoryManager, 
int maxPages) {
         this.owner = owner;
         this.memoryManager = memoryManager;
         this.cachePages = new ArrayList<>();
         this.maxPages = maxPages;
         this.pageUsage = 0;
-        this.perRequestPages = Math.max(1, (int) (PER_REQUEST_MEMORY_SIZE / 
pageSize()));
+        this.perRequestPages =
+                Math.max(1, (int) (PER_REQUEST_MEMORY_SIZE / 
memoryManager.getPageSize()));
     }
 
     @Override
     public int pageSize() {
-        return memoryManager == null
-                ? MemoryManager.DEFAULT_PAGE_SIZE
-                : memoryManager.getPageSize();
+        return this.memoryManager.getPageSize();
     }
 
     @Override
@@ -92,19 +78,10 @@ public class LazyMemorySegmentPool implements 
MemorySegmentPool, Closeable {
 
         if (this.cachePages.isEmpty()) {
             int numPages = Math.min(freePages, this.perRequestPages);
-            // allocate from non-managed heap memory
-            if (memoryManager == null) {
-                for (int i = 0; i < numPages; i++) {
-                    cachePages.add(
-                            MemorySegmentFactory.allocateUnpooledSegment(
-                                    MemoryManager.DEFAULT_PAGE_SIZE));
-                }
-            } else {
-                try {
-                    this.memoryManager.allocatePages(owner, this.cachePages, 
numPages);
-                } catch (MemoryAllocationException e) {
-                    throw new RuntimeException(e);
-                }
+            try {
+                this.memoryManager.allocatePages(owner, this.cachePages, 
numPages);
+            } catch (MemoryAllocationException e) {
+                throw new RuntimeException(e);
             }
         }
         this.pageUsage++;
@@ -148,10 +125,6 @@ public class LazyMemorySegmentPool implements 
MemorySegmentPool, Closeable {
     }
 
     public void cleanCache() {
-        if (memoryManager == null) {
-            this.cachePages.clear();
-        } else {
-            this.memoryManager.release(this.cachePages);
-        }
+        this.memoryManager.release(this.cachePages);
     }
 }
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/collections/binary/BytesMap.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/collections/binary/BytesMap.java
index f4db530..ffe32dc 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/collections/binary/BytesMap.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/collections/binary/BytesMap.java
@@ -94,7 +94,8 @@ public abstract class BytesMap<K, V> {
             MemoryManager memoryManager,
             long memorySize,
             TypeSerializer<K> keySerializer) {
-        this.memoryPool = new LazyMemorySegmentPool(owner, memoryManager, 
memorySize);
+        int maxPages = (int) (memorySize / memoryManager.getPageSize());
+        this.memoryPool = new LazyMemorySegmentPool(owner, memoryManager, 
maxPages);
         this.segmentSize = memoryPool.pageSize();
         this.reservedNumBuffers = (int) (memorySize / segmentSize);
         this.numBucketsPerSegment = segmentSize / BUCKET_SIZE;

Reply via email to