This is an automated email from the ASF dual-hosted git repository.
zjureel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 962a0867e [core] Add withMemoryPool for TableWrite (#1074)
962a0867e is described below
commit 962a0867e6b6d54b0432b975d4527cd24a1d6d23
Author: Shammon FY <[email protected]>
AuthorDate: Tue May 9 10:30:39 2023 +0800
[core] Add withMemoryPool for TableWrite (#1074)
---
.../paimon/operation/AbstractFileStoreWrite.java | 6 +++++
.../apache/paimon/operation/FileStoreWrite.java | 8 +++++++
.../paimon/operation/MemoryFileStoreWrite.java | 26 ++++++++++++++++++----
.../org/apache/paimon/table/sink/TableWrite.java | 4 ++++
.../apache/paimon/table/sink/TableWriteImpl.java | 7 ++++++
.../paimon/flink/sink/StoreSinkWriteImpl.java | 5 +++++
6 files changed, 52 insertions(+), 4 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
index a761aefbf..20dc16af2 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
@@ -24,6 +24,7 @@ import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.utils.CommitIncrement;
@@ -83,6 +84,11 @@ public abstract class AbstractFileStoreWrite<T>
return this;
}
+ @Override
+ public FileStoreWrite<T> withMemoryPool(MemorySegmentPool memoryPool) {
+ return this;
+ }
+
public void withOverwrite(boolean overwrite) {
this.overwrite = overwrite;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
index 5f516018d..dbd737b88 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
@@ -22,6 +22,7 @@ import org.apache.paimon.FileStore;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.SinkRecord;
import org.apache.paimon.utils.RecordWriter;
@@ -38,6 +39,13 @@ public interface FileStoreWrite<T> {
FileStoreWrite<T> withIOManager(IOManager ioManager);
+ /**
+ * With memory pool for the current file store write.
+ *
+ * @param memoryPool the given memory pool.
+ */
+ FileStoreWrite<T> withMemoryPool(MemorySegmentPool memoryPool);
+
/**
* If overwrite is true, the writer will overwrite the store, otherwise it
won't.
*
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
index 8d1c214c2..40b8da562 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
@@ -23,11 +23,15 @@ import org.apache.paimon.io.cache.CacheManager;
import org.apache.paimon.memory.HeapMemorySegmentPool;
import org.apache.paimon.memory.MemoryOwner;
import org.apache.paimon.memory.MemoryPoolFactory;
+import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.utils.RecordWriter;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.Iterator;
import java.util.Map;
@@ -40,9 +44,11 @@ import static
org.apache.paimon.CoreOptions.LOOKUP_CACHE_MAX_MEMORY_SIZE;
* @param <T> type of record to write.
*/
public abstract class MemoryFileStoreWrite<T> extends
AbstractFileStoreWrite<T> {
+ private static final Logger LOG =
LoggerFactory.getLogger(MemoryFileStoreWrite.class);
- private final MemoryPoolFactory writeBufferPool;
+ private final CoreOptions options;
protected final CacheManager cacheManager;
+ private MemoryPoolFactory writeBufferPool;
public MemoryFileStoreWrite(
String commitUser,
@@ -50,15 +56,19 @@ public abstract class MemoryFileStoreWrite<T> extends
AbstractFileStoreWrite<T>
FileStoreScan scan,
CoreOptions options) {
super(commitUser, snapshotManager, scan);
- HeapMemorySegmentPool memoryPool =
- new HeapMemorySegmentPool(options.writeBufferSize(),
options.pageSize());
- this.writeBufferPool = new MemoryPoolFactory(memoryPool,
this::memoryOwners);
+ this.options = options;
this.cacheManager =
new CacheManager(
options.pageSize(),
options.toConfiguration().get(LOOKUP_CACHE_MAX_MEMORY_SIZE));
}
+ @Override
+ public MemoryFileStoreWrite<T> withMemoryPool(MemorySegmentPool
memoryPool) {
+ this.writeBufferPool = new MemoryPoolFactory(memoryPool,
this::memoryOwners);
+ return this;
+ }
+
private Iterator<MemoryOwner> memoryOwners() {
Iterator<Map<Integer, WriterContainer<T>>> iterator =
writers.values().iterator();
return Iterators.concat(
@@ -88,6 +98,14 @@ public abstract class MemoryFileStoreWrite<T> extends
AbstractFileStoreWrite<T>
+ " but this is: "
+ writer.getClass());
}
+ if (writeBufferPool == null) {
+ LOG.debug("Use default heap memory segment pool for write
buffer.");
+ writeBufferPool =
+ new MemoryPoolFactory(
+ new HeapMemorySegmentPool(
+ options.writeBufferSize(),
options.pageSize()),
+ this::memoryOwners);
+ }
writeBufferPool.notifyNewOwner((MemoryOwner) writer);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWrite.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWrite.java
index 3e1506fd3..57bfbd190 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWrite.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWrite.java
@@ -22,6 +22,7 @@ import org.apache.paimon.annotation.Public;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.table.Table;
/**
@@ -35,6 +36,9 @@ public interface TableWrite extends AutoCloseable {
/** With {@link IOManager}, this is needed if 'write-buffer-spillable' is
set to true. */
TableWrite withIOManager(IOManager ioManager);
+ /** With {@link MemorySegmentPool} for the current table write. */
+ TableWrite withMemoryPool(MemorySegmentPool memoryPool);
+
/** Calculate which partition {@code row} belongs to. */
BinaryRow getPartition(InternalRow row);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
index 79acf0ecd..cd6dfec0d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
@@ -23,6 +23,7 @@ import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.operation.AbstractFileStoreWrite;
import org.apache.paimon.operation.FileStoreWrite;
import org.apache.paimon.utils.Restorable;
@@ -66,6 +67,12 @@ public class TableWriteImpl<T>
return this;
}
+ @Override
+ public TableWriteImpl<T> withMemoryPool(MemorySegmentPool memoryPool) {
+ write.withMemoryPool(memoryPool);
+ return this;
+ }
+
@Override
public BinaryRow getPartition(InternalRow row) {
keyAndBucketExtractor.setRecord(row);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
index 18ecc44a0..4d2fb4c81 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
@@ -22,6 +22,7 @@ import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManagerImpl;
import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.memory.HeapMemorySegmentPool;
import org.apache.paimon.operation.AbstractFileStoreWrite;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessage;
@@ -70,6 +71,10 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
(part, bucket) ->
state.stateValueFilter().filter(table.name(),
part, bucket))
.withIOManager(new
IOManagerImpl(ioManager.getSpillingDirectoriesPaths()))
+ .withMemoryPool(
+ new HeapMemorySegmentPool(
+ table.coreOptions().writeBufferSize(),
+ table.coreOptions().pageSize()))
.withOverwrite(isOverwrite);
}