This is an automated email from the ASF dual-hosted git repository.

zjureel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 962a0867e [core] Add withMemoryPool for TableWrite (#1074)
962a0867e is described below

commit 962a0867e6b6d54b0432b975d4527cd24a1d6d23
Author: Shammon FY <[email protected]>
AuthorDate: Tue May 9 10:30:39 2023 +0800

    [core] Add withMemoryPool for TableWrite (#1074)
---
 .../paimon/operation/AbstractFileStoreWrite.java   |  6 +++++
 .../apache/paimon/operation/FileStoreWrite.java    |  8 +++++++
 .../paimon/operation/MemoryFileStoreWrite.java     | 26 ++++++++++++++++++----
 .../org/apache/paimon/table/sink/TableWrite.java   |  4 ++++
 .../apache/paimon/table/sink/TableWriteImpl.java   |  7 ++++++
 .../paimon/flink/sink/StoreSinkWriteImpl.java      |  5 +++++
 6 files changed, 52 insertions(+), 4 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
index a761aefbf..20dc16af2 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
@@ -24,6 +24,7 @@ import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.memory.MemorySegmentPool;
 import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.sink.CommitMessageImpl;
 import org.apache.paimon.utils.CommitIncrement;
@@ -83,6 +84,11 @@ public abstract class AbstractFileStoreWrite<T>
         return this;
     }
 
+    @Override
+    public FileStoreWrite<T> withMemoryPool(MemorySegmentPool memoryPool) {
+        return this;
+    }
+
     public void withOverwrite(boolean overwrite) {
         this.overwrite = overwrite;
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
index 5f516018d..dbd737b88 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
@@ -22,6 +22,7 @@ import org.apache.paimon.FileStore;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.memory.MemorySegmentPool;
 import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.sink.SinkRecord;
 import org.apache.paimon.utils.RecordWriter;
@@ -38,6 +39,13 @@ public interface FileStoreWrite<T> {
 
     FileStoreWrite<T> withIOManager(IOManager ioManager);
 
+    /**
+     * With memory pool for the current file store write.
+     *
+     * @param memoryPool the given memory pool.
+     */
+    FileStoreWrite<T> withMemoryPool(MemorySegmentPool memoryPool);
+
     /**
      * If overwrite is true, the writer will overwrite the store, otherwise it 
won't.
      *
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
index 8d1c214c2..40b8da562 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
@@ -23,11 +23,15 @@ import org.apache.paimon.io.cache.CacheManager;
 import org.apache.paimon.memory.HeapMemorySegmentPool;
 import org.apache.paimon.memory.MemoryOwner;
 import org.apache.paimon.memory.MemoryPoolFactory;
+import org.apache.paimon.memory.MemorySegmentPool;
 import org.apache.paimon.utils.RecordWriter;
 import org.apache.paimon.utils.SnapshotManager;
 
 import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.Iterator;
 import java.util.Map;
 
@@ -40,9 +44,11 @@ import static 
org.apache.paimon.CoreOptions.LOOKUP_CACHE_MAX_MEMORY_SIZE;
  * @param <T> type of record to write.
  */
 public abstract class MemoryFileStoreWrite<T> extends 
AbstractFileStoreWrite<T> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(MemoryFileStoreWrite.class);
 
-    private final MemoryPoolFactory writeBufferPool;
+    private final CoreOptions options;
     protected final CacheManager cacheManager;
+    private MemoryPoolFactory writeBufferPool;
 
     public MemoryFileStoreWrite(
             String commitUser,
@@ -50,15 +56,19 @@ public abstract class MemoryFileStoreWrite<T> extends 
AbstractFileStoreWrite<T>
             FileStoreScan scan,
             CoreOptions options) {
         super(commitUser, snapshotManager, scan);
-        HeapMemorySegmentPool memoryPool =
-                new HeapMemorySegmentPool(options.writeBufferSize(), 
options.pageSize());
-        this.writeBufferPool = new MemoryPoolFactory(memoryPool, 
this::memoryOwners);
+        this.options = options;
         this.cacheManager =
                 new CacheManager(
                         options.pageSize(),
                         
options.toConfiguration().get(LOOKUP_CACHE_MAX_MEMORY_SIZE));
     }
 
+    @Override
+    public MemoryFileStoreWrite<T> withMemoryPool(MemorySegmentPool 
memoryPool) {
+        this.writeBufferPool = new MemoryPoolFactory(memoryPool, 
this::memoryOwners);
+        return this;
+    }
+
     private Iterator<MemoryOwner> memoryOwners() {
         Iterator<Map<Integer, WriterContainer<T>>> iterator = 
writers.values().iterator();
         return Iterators.concat(
@@ -88,6 +98,14 @@ public abstract class MemoryFileStoreWrite<T> extends 
AbstractFileStoreWrite<T>
                             + " but this is: "
                             + writer.getClass());
         }
+        if (writeBufferPool == null) {
+            LOG.debug("Use default heap memory segment pool for write 
buffer.");
+            writeBufferPool =
+                    new MemoryPoolFactory(
+                            new HeapMemorySegmentPool(
+                                    options.writeBufferSize(), 
options.pageSize()),
+                            this::memoryOwners);
+        }
         writeBufferPool.notifyNewOwner((MemoryOwner) writer);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWrite.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWrite.java
index 3e1506fd3..57bfbd190 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWrite.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWrite.java
@@ -22,6 +22,7 @@ import org.apache.paimon.annotation.Public;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.memory.MemorySegmentPool;
 import org.apache.paimon.table.Table;
 
 /**
@@ -35,6 +36,9 @@ public interface TableWrite extends AutoCloseable {
     /** With {@link IOManager}, this is needed if 'write-buffer-spillable' is 
set to true. */
     TableWrite withIOManager(IOManager ioManager);
 
+    /** With {@link MemorySegmentPool} for the current table write. */
+    TableWrite withMemoryPool(MemorySegmentPool memoryPool);
+
     /** Calculate which partition {@code row} belongs to. */
     BinaryRow getPartition(InternalRow row);
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
index 79acf0ecd..cd6dfec0d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
@@ -23,6 +23,7 @@ import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.memory.MemorySegmentPool;
 import org.apache.paimon.operation.AbstractFileStoreWrite;
 import org.apache.paimon.operation.FileStoreWrite;
 import org.apache.paimon.utils.Restorable;
@@ -66,6 +67,12 @@ public class TableWriteImpl<T>
         return this;
     }
 
+    @Override
+    public TableWriteImpl<T> withMemoryPool(MemorySegmentPool memoryPool) {
+        write.withMemoryPool(memoryPool);
+        return this;
+    }
+
     @Override
     public BinaryRow getPartition(InternalRow row) {
         keyAndBucketExtractor.setRecord(row);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
index 18ecc44a0..4d2fb4c81 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
@@ -22,6 +22,7 @@ import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.disk.IOManagerImpl;
 import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.memory.HeapMemorySegmentPool;
 import org.apache.paimon.operation.AbstractFileStoreWrite;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.CommitMessage;
@@ -70,6 +71,10 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
                         (part, bucket) ->
                                 state.stateValueFilter().filter(table.name(), 
part, bucket))
                 .withIOManager(new 
IOManagerImpl(ioManager.getSpillingDirectoriesPaths()))
+                .withMemoryPool(
+                        new HeapMemorySegmentPool(
+                                table.coreOptions().writeBufferSize(),
+                                table.coreOptions().pageSize()))
                 .withOverwrite(isOverwrite);
     }
 

Reply via email to