This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new d4e9dab0f5 [core] Introduce auto-buffer-spill for postpone bucket
write (#5744)
d4e9dab0f5 is described below
commit d4e9dab0f539ff921c4b9a4f7e32da69229db59b
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Jun 16 10:31:54 2025 +0800
[core] Introduce auto-buffer-spill for postpone bucket write (#5744)
---
.../java/org/apache/paimon/KeyValueFileStore.java | 2 +
.../org/apache/paimon/append/AppendOnlyWriter.java | 216 +++----------------
.../paimon/operation/AbstractFileStoreWrite.java | 2 +-
.../postpone/PostponeBucketFileStoreWrite.java | 71 ++++++-
.../paimon/postpone/PostponeBucketWriter.java | 141 +++++++++++--
.../java/org/apache/paimon/utils/SinkWriter.java | 234 +++++++++++++++++++++
.../paimon/table/PrimaryKeySimpleTableTest.java | 38 ++++
7 files changed, 495 insertions(+), 209 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index 81039da369..be4416af9b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -177,12 +177,14 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
if (options.bucket() == BucketMode.POSTPONE_BUCKET) {
return new PostponeBucketFileStoreWrite(
fileIO,
+ pathFactory(),
schema,
commitUser,
partitionType,
keyType,
valueType,
this::pathFactory,
+ newReaderFactoryBuilder(),
snapshotManager(),
newScan(ScanType.FOR_WRITE).withManifestCacheFilter(manifestFilter),
options,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
index 694937c88f..3534ac4e9e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
@@ -23,7 +23,6 @@ import org.apache.paimon.compact.CompactDeletionFile;
import org.apache.paimon.compact.CompactManager;
import org.apache.paimon.compression.CompressOptions;
import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.disk.RowBuffer;
import org.apache.paimon.fileindex.FileIndexOptions;
@@ -45,14 +44,15 @@ import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.BatchRecordWriter;
import org.apache.paimon.utils.CommitIncrement;
import org.apache.paimon.utils.IOFunction;
-import org.apache.paimon.utils.IOUtils;
import org.apache.paimon.utils.LongCounter;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.RecordWriter;
+import org.apache.paimon.utils.SinkWriter;
+import org.apache.paimon.utils.SinkWriter.BufferedSinkWriter;
+import org.apache.paimon.utils.SinkWriter.DirectSinkWriter;
import javax.annotation.Nullable;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -72,7 +72,7 @@ public class AppendOnlyWriter implements BatchRecordWriter,
MemoryOwner {
private final RowType writeSchema;
private final DataFilePathFactory pathFactory;
private final CompactManager compactManager;
- private final IOFunction<List<DataFileMeta>,
RecordReaderIterator<InternalRow>> bucketFileRead;
+ private final IOFunction<List<DataFileMeta>,
RecordReaderIterator<InternalRow>> dataFileRead;
private final boolean forceCompact;
private final boolean asyncFileWrite;
private final boolean statsDenseStore;
@@ -80,17 +80,17 @@ public class AppendOnlyWriter implements BatchRecordWriter,
MemoryOwner {
private final List<DataFileMeta> deletedFiles;
private final List<DataFileMeta> compactBefore;
private final List<DataFileMeta> compactAfter;
- @Nullable private CompactDeletionFile compactDeletionFile;
private final LongCounter seqNumCounter;
private final String fileCompression;
private final CompressOptions spillCompression;
- private SinkWriter sinkWriter;
private final SimpleColStatsCollector.Factory[] statsCollectors;
@Nullable private final IOManager ioManager;
private final FileIndexOptions fileIndexOptions;
+ private final MemorySize maxDiskSize;
+ @Nullable private CompactDeletionFile compactDeletionFile;
+ private SinkWriter<InternalRow> sinkWriter;
private MemorySegmentPool memorySegmentPool;
- private final MemorySize maxDiskSize;
public AppendOnlyWriter(
FileIO fileIO,
@@ -101,7 +101,7 @@ public class AppendOnlyWriter implements BatchRecordWriter,
MemoryOwner {
RowType writeSchema,
long maxSequenceNumber,
CompactManager compactManager,
- IOFunction<List<DataFileMeta>, RecordReaderIterator<InternalRow>>
bucketFileRead,
+ IOFunction<List<DataFileMeta>, RecordReaderIterator<InternalRow>>
dataFileRead,
boolean forceCompact,
DataFilePathFactory pathFactory,
@Nullable CommitIncrement increment,
@@ -121,7 +121,7 @@ public class AppendOnlyWriter implements BatchRecordWriter,
MemoryOwner {
this.writeSchema = writeSchema;
this.pathFactory = pathFactory;
this.compactManager = compactManager;
- this.bucketFileRead = bucketFileRead;
+ this.dataFileRead = dataFileRead;
this.forceCompact = forceCompact;
this.asyncFileWrite = asyncFileWrite;
this.statsDenseStore = statsDenseStore;
@@ -139,8 +139,8 @@ public class AppendOnlyWriter implements BatchRecordWriter,
MemoryOwner {
this.sinkWriter =
useWriteBuffer
- ? new BufferedSinkWriter(spillable, maxDiskSize,
spillCompression)
- : new DirectSinkWriter();
+ ? createBufferedSinkWriter(spillable)
+ : new DirectSinkWriter<>(this::createRollingRowWriter);
if (increment != null) {
newFiles.addAll(increment.newFilesIncrement().newFiles());
@@ -151,6 +151,18 @@ public class AppendOnlyWriter implements
BatchRecordWriter, MemoryOwner {
}
}
+ private BufferedSinkWriter<InternalRow> createBufferedSinkWriter(boolean
spillable) {
+ return new BufferedSinkWriter<>(
+ this::createRollingRowWriter,
+ t -> t,
+ t -> t,
+ ioManager,
+ writeSchema,
+ spillable,
+ maxDiskSize,
+ spillCompression);
+ }
+
@Override
public void write(InternalRow rowData) throws Exception {
Preconditions.checkArgument(
@@ -178,7 +190,7 @@ public class AppendOnlyWriter implements BatchRecordWriter,
MemoryOwner {
write(row);
}
} else {
- ((DirectSinkWriter) sinkWriter).writeBundle(bundle);
+ ((DirectSinkWriter<?>) sinkWriter).writeBundle(bundle);
}
}
@@ -252,16 +264,16 @@ public class AppendOnlyWriter implements
BatchRecordWriter, MemoryOwner {
}
public void toBufferedWriter() throws Exception {
- if (sinkWriter != null && !sinkWriter.bufferSpillableWriter() &&
bucketFileRead != null) {
+ if (sinkWriter != null && !sinkWriter.bufferSpillableWriter() &&
dataFileRead != null) {
// fetch the written results
List<DataFileMeta> files = sinkWriter.flush();
sinkWriter.close();
- sinkWriter = new BufferedSinkWriter(true, maxDiskSize,
spillCompression);
+ sinkWriter = createBufferedSinkWriter(true);
sinkWriter.setMemoryPool(memorySegmentPool);
// rewrite small files
- try (RecordReaderIterator<InternalRow> reader =
bucketFileRead.apply(files)) {
+ try (RecordReaderIterator<InternalRow> reader =
dataFileRead.apply(files)) {
while (reader.hasNext()) {
sinkWriter.write(reader.next());
}
@@ -356,7 +368,7 @@ public class AppendOnlyWriter implements BatchRecordWriter,
MemoryOwner {
@VisibleForTesting
public RowBuffer getWriteBuffer() {
if (sinkWriter instanceof BufferedSinkWriter) {
- return ((BufferedSinkWriter) sinkWriter).writeBuffer;
+ return ((BufferedSinkWriter<?>) sinkWriter).rowBuffer();
} else {
return null;
}
@@ -366,176 +378,4 @@ public class AppendOnlyWriter implements
BatchRecordWriter, MemoryOwner {
List<DataFileMeta> getNewFiles() {
return newFiles;
}
-
- /** Internal interface to Sink Data from input. */
- private interface SinkWriter {
-
- boolean write(InternalRow data) throws IOException;
-
- List<DataFileMeta> flush() throws IOException;
-
- boolean flushMemory() throws IOException;
-
- long memoryOccupancy();
-
- void close();
-
- void setMemoryPool(MemorySegmentPool memoryPool);
-
- boolean bufferSpillableWriter();
- }
-
- /**
- * Directly sink data to file, no memory cache here, use
OrcWriter/ParquetWrite/etc directly
- * write data. May cause out-of-memory.
- */
- private class DirectSinkWriter implements SinkWriter {
-
- private RowDataRollingFileWriter writer;
-
- @Override
- public boolean write(InternalRow data) throws IOException {
- if (writer == null) {
- writer = createRollingRowWriter();
- }
- writer.write(data);
- return true;
- }
-
- public void writeBundle(BundleRecords bundle) throws IOException {
- if (writer == null) {
- writer = createRollingRowWriter();
- }
- writer.writeBundle(bundle);
- }
-
- @Override
- public List<DataFileMeta> flush() throws IOException {
- List<DataFileMeta> flushedFiles = new ArrayList<>();
- if (writer != null) {
- writer.close();
- flushedFiles.addAll(writer.result());
- writer = null;
- }
- return flushedFiles;
- }
-
- @Override
- public boolean flushMemory() throws IOException {
- return false;
- }
-
- @Override
- public long memoryOccupancy() {
- return 0;
- }
-
- @Override
- public void close() {
- if (writer != null) {
- writer.abort();
- writer = null;
- }
- }
-
- @Override
- public void setMemoryPool(MemorySegmentPool memoryPool) {
- // do nothing
- }
-
- @Override
- public boolean bufferSpillableWriter() {
- return false;
- }
- }
-
- /**
- * Use buffered writer, segment pooled from segment pool. When spillable,
may delay checkpoint
- * acknowledge time. When non-spillable, may cause too many small files.
- */
- private class BufferedSinkWriter implements SinkWriter {
-
- private final boolean spillable;
-
- private final MemorySize maxDiskSize;
-
- private final CompressOptions compression;
-
- private RowBuffer writeBuffer;
-
- private BufferedSinkWriter(
- boolean spillable, MemorySize maxDiskSize, CompressOptions
compression) {
- this.spillable = spillable;
- this.maxDiskSize = maxDiskSize;
- this.compression = compression;
- }
-
- @Override
- public boolean write(InternalRow data) throws IOException {
- return writeBuffer.put(data);
- }
-
- @Override
- public List<DataFileMeta> flush() throws IOException {
- List<DataFileMeta> flushedFiles = new ArrayList<>();
- if (writeBuffer != null) {
- writeBuffer.complete();
- RowDataRollingFileWriter writer = createRollingRowWriter();
- IOException exception = null;
- try (RowBuffer.RowBufferIterator iterator =
writeBuffer.newIterator()) {
- while (iterator.advanceNext()) {
- writer.write(iterator.getRow());
- }
- } catch (IOException e) {
- exception = e;
- } finally {
- if (exception != null) {
- IOUtils.closeQuietly(writer);
- // cleanup code that might throw another exception
- throw exception;
- }
- writer.close();
- }
- flushedFiles.addAll(writer.result());
- // reuse writeBuffer
- writeBuffer.reset();
- }
- return flushedFiles;
- }
-
- @Override
- public long memoryOccupancy() {
- return writeBuffer.memoryOccupancy();
- }
-
- @Override
- public void close() {
- if (writeBuffer != null) {
- writeBuffer.reset();
- writeBuffer = null;
- }
- }
-
- @Override
- public void setMemoryPool(MemorySegmentPool memoryPool) {
- writeBuffer =
- RowBuffer.getBuffer(
- ioManager,
- memoryPool,
- new InternalRowSerializer(writeSchema),
- spillable,
- maxDiskSize,
- compression);
- }
-
- @Override
- public boolean bufferSpillableWriter() {
- return spillable;
- }
-
- @Override
- public boolean flushMemory() throws IOException {
- return writeBuffer.flushMemory();
- }
- }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
index fad898da45..a7bac3f12a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
@@ -567,7 +567,7 @@ public abstract class AbstractFileStoreWrite<T> implements
FileStoreWrite<T> {
}
@VisibleForTesting
- Map<BinaryRow, Map<Integer, WriterContainer<T>>> writers() {
+ public Map<BinaryRow, Map<Integer, WriterContainer<T>>> writers() {
return writers;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketFileStoreWrite.java
index 09b3afdff2..97b5dfe7c2 100644
---
a/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketFileStoreWrite.java
@@ -26,11 +26,15 @@ import org.apache.paimon.format.FileFormat;
import org.apache.paimon.format.avro.AvroSchemaConverter;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.KeyValueFileReaderFactory;
import org.apache.paimon.io.KeyValueFileWriterFactory;
-import org.apache.paimon.operation.AbstractFileStoreWrite;
+import org.apache.paimon.mergetree.compact.ConcatRecordReader;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.operation.FileStoreWrite;
+import org.apache.paimon.operation.MemoryFileStoreWrite;
import org.apache.paimon.options.Options;
+import org.apache.paimon.reader.ReaderSupplier;
+import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.types.RowType;
@@ -39,10 +43,17 @@ import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SnapshotManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.BiFunction;
@@ -52,25 +63,37 @@ import static
org.apache.paimon.format.FileFormat.fileFormat;
import static
org.apache.paimon.utils.FileStorePathFactory.createFormatPathFactories;
/** {@link FileStoreWrite} for {@code bucket = -2} tables. */
-public class PostponeBucketFileStoreWrite extends
AbstractFileStoreWrite<KeyValue> {
+public class PostponeBucketFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(PostponeBucketFileStoreWrite.class);
private final CoreOptions options;
private final KeyValueFileWriterFactory.Builder writerFactoryBuilder;
+ private final FileIO fileIO;
+ private final FileStorePathFactory pathFactory;
+ private final KeyValueFileReaderFactory.Builder readerFactoryBuilder;
+
+ private boolean forceBufferSpill = false;
public PostponeBucketFileStoreWrite(
FileIO fileIO,
+ FileStorePathFactory pathFactory,
TableSchema schema,
String commitUser,
RowType partitionType,
RowType keyType,
RowType valueType,
BiFunction<CoreOptions, String, FileStorePathFactory>
formatPathFactory,
+ KeyValueFileReaderFactory.Builder readerFactoryBuilder,
SnapshotManager snapshotManager,
FileStoreScan scan,
CoreOptions options,
String tableName,
@Nullable Integer writeId) {
- super(snapshotManager, scan, null, null, tableName, options,
partitionType);
+ super(snapshotManager, scan, options, partitionType, null, null,
tableName);
+ this.fileIO = fileIO;
+ this.pathFactory = pathFactory;
+ this.readerFactoryBuilder = readerFactoryBuilder;
Options newOptions = new Options(options.toMap());
try {
@@ -120,6 +143,25 @@ public class PostponeBucketFileStoreWrite extends
AbstractFileStoreWrite<KeyValu
withIgnorePreviousFiles(true);
}
+ @Override
+ protected void forceBufferSpill() throws Exception {
+ if (ioManager == null) {
+ return;
+ }
+ if (forceBufferSpill) {
+ return;
+ }
+ forceBufferSpill = true;
+ LOG.info(
+ "Force buffer spill for postpone file store write, writer
number is: {}",
+ writers.size());
+ for (Map<Integer, WriterContainer<KeyValue>> bucketWriters :
writers.values()) {
+ for (WriterContainer<KeyValue> writerContainer :
bucketWriters.values()) {
+ ((PostponeBucketWriter)
writerContainer.writer).toBufferedWriter();
+ }
+ }
+ }
+
@Override
public void withIgnorePreviousFiles(boolean ignorePrevious) {
// see comments in constructor
@@ -141,7 +183,28 @@ public class PostponeBucketFileStoreWrite extends
AbstractFileStoreWrite<KeyValu
"Postpone bucket writers should not restore previous files.
This is unexpected.");
KeyValueFileWriterFactory writerFactory =
writerFactoryBuilder.build(partition, bucket, options);
- return new PostponeBucketWriter(writerFactory, restoreIncrement);
+ return new PostponeBucketWriter(
+ fileIO,
+ pathFactory.createDataFilePathFactory(partition, bucket),
+ options.spillCompressOptions(),
+ options.writeBufferSpillDiskSize(),
+ ioManager,
+ writerFactory,
+ files -> newFileRead(partition, bucket, files),
+ forceBufferSpill,
+ forceBufferSpill,
+ restoreIncrement);
+ }
+
+ private RecordReaderIterator<KeyValue> newFileRead(
+ BinaryRow partition, int bucket, List<DataFileMeta> files) throws
IOException {
+ KeyValueFileReaderFactory readerFactory =
+ readerFactoryBuilder.build(partition, bucket, name ->
Optional.empty());
+ List<ReaderSupplier<KeyValue>> suppliers = new ArrayList<>();
+ for (DataFileMeta file : files) {
+ suppliers.add(() -> readerFactory.createRecordReader(file));
+ }
+ return new
RecordReaderIterator<>(ConcatRecordReader.create(suppliers));
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketWriter.java
b/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketWriter.java
index 10259650d4..c41db8aac7 100644
---
a/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketWriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketWriter.java
@@ -19,14 +19,29 @@
package org.apache.paimon.postpone;
import org.apache.paimon.KeyValue;
+import org.apache.paimon.KeyValueSerializer;
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.compression.CompressOptions;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.fs.FileIO;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.io.KeyValueFileWriterFactory;
import org.apache.paimon.io.RollingFileWriter;
import org.apache.paimon.manifest.FileSource;
+import org.apache.paimon.memory.MemoryOwner;
+import org.apache.paimon.memory.MemorySegmentPool;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.reader.RecordReaderIterator;
+import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.CommitIncrement;
+import org.apache.paimon.utils.IOFunction;
import org.apache.paimon.utils.RecordWriter;
+import org.apache.paimon.utils.SinkWriter;
+import org.apache.paimon.utils.SinkWriter.BufferedSinkWriter;
+import org.apache.paimon.utils.SinkWriter.DirectSinkWriter;
import javax.annotation.Nullable;
@@ -36,30 +51,69 @@ import java.util.Collections;
import java.util.List;
/** {@link RecordWriter} for {@code bucket = -2} tables. */
-public class PostponeBucketWriter implements RecordWriter<KeyValue> {
+public class PostponeBucketWriter implements RecordWriter<KeyValue>,
MemoryOwner {
+ private final FileIO fileIO;
+ private final DataFilePathFactory pathFactory;
private final KeyValueFileWriterFactory writerFactory;
private final List<DataFileMeta> files;
+ private final IOFunction<List<DataFileMeta>,
RecordReaderIterator<KeyValue>> fileRead;
+ private final @Nullable IOManager ioManager;
+ private final CompressOptions spillCompression;
+ private final MemorySize maxDiskSize;
- private RollingFileWriter<KeyValue, DataFileMeta> writer;
+ private SinkWriter<KeyValue> sinkWriter;
+ private MemorySegmentPool memorySegmentPool;
public PostponeBucketWriter(
- KeyValueFileWriterFactory writerFactory, @Nullable CommitIncrement
restoreIncrement) {
+ FileIO fileIO,
+ DataFilePathFactory pathFactory,
+ CompressOptions spillCompression,
+ MemorySize maxDiskSize,
+ @Nullable IOManager ioManager,
+ KeyValueFileWriterFactory writerFactory,
+ IOFunction<List<DataFileMeta>, RecordReaderIterator<KeyValue>>
fileRead,
+ boolean useWriteBuffer,
+ boolean spillable,
+ @Nullable CommitIncrement restoreIncrement) {
+ this.ioManager = ioManager;
this.writerFactory = writerFactory;
+ this.fileRead = fileRead;
+ this.fileIO = fileIO;
+ this.pathFactory = pathFactory;
+ this.spillCompression = spillCompression;
+ this.maxDiskSize = maxDiskSize;
this.files = new ArrayList<>();
if (restoreIncrement != null) {
files.addAll(restoreIncrement.newFilesIncrement().newFiles());
}
+ this.sinkWriter =
+ useWriteBuffer
+ ? createBufferedSinkWriter(spillable)
+ : new DirectSinkWriter<>(this::createRollingRowWriter);
+ }
- this.writer = null;
+ private RollingFileWriter<KeyValue, DataFileMeta> createRollingRowWriter()
{
+ return writerFactory.createRollingMergeTreeFileWriter(0,
FileSource.APPEND);
}
@Override
public void write(KeyValue record) throws Exception {
- if (writer == null) {
- writer = writerFactory.createRollingMergeTreeFileWriter(0,
FileSource.APPEND);
+ boolean success = sinkWriter.write(record);
+ if (!success) {
+ flush();
+ success = sinkWriter.write(record);
+ if (!success) {
+ // Should not get here, because writeBuffer will throw too big
exception out.
+ // But we throw again in case of something unexpected happens.
(like someone changed
+ // code in SpillableBuffer.)
+ throw new RuntimeException("Mem table is too small to hold a
single element.");
+ }
}
- writer.write(record);
+ }
+
+ private void flush() throws Exception {
+ files.addAll(sinkWriter.flush());
}
@Override
@@ -82,13 +136,66 @@ public class PostponeBucketWriter implements
RecordWriter<KeyValue> {
}
@Override
- public CommitIncrement prepareCommit(boolean waitCompaction) throws
Exception {
- if (writer != null) {
- writer.close();
- files.addAll(writer.result());
- writer = null;
+ public void setMemoryPool(MemorySegmentPool memoryPool) {
+ this.memorySegmentPool = memoryPool;
+ sinkWriter.setMemoryPool(memoryPool);
+ }
+
+ @Override
+ public long memoryOccupancy() {
+ return sinkWriter.memoryOccupancy();
+ }
+
+ @Override
+ public void flushMemory() throws Exception {
+ boolean success = sinkWriter.flushMemory();
+ if (!success) {
+ flush();
}
+ }
+ private BufferedSinkWriter<KeyValue> createBufferedSinkWriter(boolean
spillable) {
+ RowType keyType = writerFactory.keyType();
+ RowType valueType = writerFactory.valueType();
+ RowType kvRowType = KeyValue.schema(keyType, valueType);
+ KeyValueSerializer serializer = new KeyValueSerializer(keyType,
valueType);
+ return new BufferedSinkWriter<>(
+ this::createRollingRowWriter,
+ serializer::toRow,
+ serializer::fromRow,
+ ioManager,
+ kvRowType,
+ spillable,
+ maxDiskSize,
+ spillCompression);
+ }
+
+ public void toBufferedWriter() throws Exception {
+ if (sinkWriter != null && !sinkWriter.bufferSpillableWriter() &&
fileRead != null) {
+ // fetch the written results
+ List<DataFileMeta> files = sinkWriter.flush();
+
+ sinkWriter.close();
+ sinkWriter = createBufferedSinkWriter(true);
+ sinkWriter.setMemoryPool(memorySegmentPool);
+
+ // rewrite small files
+ try (RecordReaderIterator<KeyValue> reader =
fileRead.apply(files)) {
+ while (reader.hasNext()) {
+ sinkWriter.write(reader.next());
+ }
+ } finally {
+ // remove small files
+ for (DataFileMeta file : files) {
+ fileIO.deleteQuietly(pathFactory.toPath(file));
+ }
+ }
+ }
+ }
+
+ @Override
+ public CommitIncrement prepareCommit(boolean waitCompaction) throws
Exception {
+ flush();
List<DataFileMeta> result = new ArrayList<>(files);
files.clear();
return new CommitIncrement(
@@ -97,6 +204,11 @@ public class PostponeBucketWriter implements
RecordWriter<KeyValue> {
null);
}
+ @VisibleForTesting
+ public boolean useBufferedSinkWriter() {
+ return sinkWriter instanceof BufferedSinkWriter;
+ }
+
@Override
public boolean compactNotCompleted() {
return false;
@@ -107,9 +219,6 @@ public class PostponeBucketWriter implements
RecordWriter<KeyValue> {
@Override
public void close() throws Exception {
- if (writer != null) {
- writer.abort();
- writer = null;
- }
+ sinkWriter.close();
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SinkWriter.java
b/paimon-core/src/main/java/org/apache/paimon/utils/SinkWriter.java
new file mode 100644
index 0000000000..c4596a1df4
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SinkWriter.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.compression.CompressOptions;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.disk.RowBuffer;
+import org.apache.paimon.io.BundleRecords;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.RollingFileWriter;
+import org.apache.paimon.memory.MemorySegmentPool;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.types.RowType;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+/** Internal interface to Sink Data from input. */
+public interface SinkWriter<T> {
+
+ boolean write(T data) throws IOException;
+
+ List<DataFileMeta> flush() throws IOException;
+
+ boolean flushMemory() throws IOException;
+
+ long memoryOccupancy();
+
+ void close();
+
+ void setMemoryPool(MemorySegmentPool memoryPool);
+
+ boolean bufferSpillableWriter();
+
+ /**
+ * Directly sink data to file, no memory cache here, use
OrcWriter/ParquetWrite/etc directly
+ * write data. May cause out-of-memory.
+ */
+ class DirectSinkWriter<T> implements SinkWriter<T> {
+
+ private final Supplier<RollingFileWriter<T, DataFileMeta>>
writerSupplier;
+
+ private RollingFileWriter<T, DataFileMeta> writer;
+
+ public DirectSinkWriter(Supplier<RollingFileWriter<T, DataFileMeta>>
writerSupplier) {
+ this.writerSupplier = writerSupplier;
+ }
+
+ @Override
+ public boolean write(T data) throws IOException {
+ if (writer == null) {
+ writer = writerSupplier.get();
+ }
+ writer.write(data);
+ return true;
+ }
+
+ public void writeBundle(BundleRecords bundle) throws IOException {
+ if (writer == null) {
+ writer = writerSupplier.get();
+ }
+ writer.writeBundle(bundle);
+ }
+
+ @Override
+ public List<DataFileMeta> flush() throws IOException {
+ List<DataFileMeta> flushedFiles = new ArrayList<>();
+ if (writer != null) {
+ writer.close();
+ flushedFiles.addAll(writer.result());
+ writer = null;
+ }
+ return flushedFiles;
+ }
+
+ @Override
+ public boolean flushMemory() throws IOException {
+ return false;
+ }
+
+ @Override
+ public long memoryOccupancy() {
+ return 0;
+ }
+
+ @Override
+ public void close() {
+ if (writer != null) {
+ writer.abort();
+ writer = null;
+ }
+ }
+
+ @Override
+ public void setMemoryPool(MemorySegmentPool memoryPool) {
+ // do nothing
+ }
+
+ @Override
+ public boolean bufferSpillableWriter() {
+ return false;
+ }
+ }
+
+ /**
+ * Use buffered writer, segment pooled from segment pool. When spillable,
may delay checkpoint
+ * acknowledge time. When non-spillable, may cause too many small files.
+ */
+ class BufferedSinkWriter<T> implements SinkWriter<T> {
+
+ private final Supplier<RollingFileWriter<T, DataFileMeta>>
writerSupplier;
+ private final Function<T, InternalRow> toRow;
+ private final Function<InternalRow, T> fromRow;
+ private final IOManager ioManager;
+ private final RowType rowType;
+ private final boolean spillable;
+ private final MemorySize maxDiskSize;
+ private final CompressOptions compression;
+
+ private RowBuffer writeBuffer;
+
+ public BufferedSinkWriter(
+ Supplier<RollingFileWriter<T, DataFileMeta>> writerSupplier,
+ Function<T, InternalRow> toRow,
+ Function<InternalRow, T> fromRow,
+ IOManager ioManager,
+ RowType rowType,
+ boolean spillable,
+ MemorySize maxDiskSize,
+ CompressOptions compression) {
+ this.writerSupplier = writerSupplier;
+ this.toRow = toRow;
+ this.fromRow = fromRow;
+ this.ioManager = ioManager;
+ this.rowType = rowType;
+ this.spillable = spillable;
+ this.maxDiskSize = maxDiskSize;
+ this.compression = compression;
+ }
+
+ public RowBuffer rowBuffer() {
+ return writeBuffer;
+ }
+
+ @Override
+ public boolean write(T data) throws IOException {
+ return writeBuffer.put(toRow.apply(data));
+ }
+
+ @Override
+ public List<DataFileMeta> flush() throws IOException {
+ List<DataFileMeta> flushedFiles = new ArrayList<>();
+ if (writeBuffer != null) {
+ writeBuffer.complete();
+ RollingFileWriter<T, DataFileMeta> writer =
writerSupplier.get();
+ IOException exception = null;
+ try (RowBuffer.RowBufferIterator iterator =
writeBuffer.newIterator()) {
+ while (iterator.advanceNext()) {
+ writer.write(fromRow.apply(iterator.getRow()));
+ }
+ } catch (IOException e) {
+ exception = e;
+ } finally {
+ if (exception != null) {
+ IOUtils.closeQuietly(writer);
+ // cleanup code that might throw another exception
+ throw exception;
+ }
+ writer.close();
+ }
+ flushedFiles.addAll(writer.result());
+ // reuse writeBuffer
+ writeBuffer.reset();
+ }
+ return flushedFiles;
+ }
+
+ @Override
+ public long memoryOccupancy() {
+ return writeBuffer.memoryOccupancy();
+ }
+
+ @Override
+ public void close() {
+ if (writeBuffer != null) {
+ writeBuffer.reset();
+ writeBuffer = null;
+ }
+ }
+
+ @Override
+ public void setMemoryPool(MemorySegmentPool memoryPool) {
+ writeBuffer =
+ RowBuffer.getBuffer(
+ ioManager,
+ memoryPool,
+ new InternalRowSerializer(rowType),
+ spillable,
+ maxDiskSize,
+ compression);
+ }
+
+ @Override
+ public boolean bufferSpillableWriter() {
+ return spillable;
+ }
+
+ @Override
+ public boolean flushMemory() throws IOException {
+ return writeBuffer.flushMemory();
+ }
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
index 0bf4971f94..35928e2335 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
@@ -21,6 +21,7 @@ package org.apache.paimon.table;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.CoreOptions.ChangelogProducer;
import org.apache.paimon.CoreOptions.LookupLocalFileType;
+import org.apache.paimon.KeyValue;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
@@ -34,9 +35,12 @@ import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFileMeta;
+import org.apache.paimon.operation.AbstractFileStoreWrite;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
+import org.apache.paimon.postpone.PostponeBucketFileStoreWrite;
+import org.apache.paimon.postpone.PostponeBucketWriter;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.reader.RecordReader;
@@ -54,6 +58,7 @@ import org.apache.paimon.table.sink.InnerTableCommit;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.sink.StreamWriteBuilder;
+import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.paimon.table.sink.WriteSelector;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.InnerTableRead;
@@ -136,6 +141,39 @@ import static
org.assertj.core.api.Assertions.assertThatThrownBy;
/** Tests for {@link PrimaryKeyFileStoreTable}. */
public class PrimaryKeySimpleTableTest extends SimpleTableTestBase {
+ @Test
+ public void testPostponeBucketWithManyPartitions() throws Exception {
+ FileStoreTable table =
+ createFileStoreTable(options -> options.set(BUCKET,
BucketMode.POSTPONE_BUCKET));
+
+ BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+ try (BatchTableWrite write = writeBuilder.newWrite();
+ BatchTableCommit commit = writeBuilder.newCommit()) {
+ write.withIOManager(new IOManagerImpl(tempDir.toString()));
+ for (int i = 0; i < 100; i++) {
+ write.write(rowData(i, i, (long) i));
+ }
+
+ for (Map<Integer,
AbstractFileStoreWrite.WriterContainer<KeyValue>> bucketWriters :
+ ((PostponeBucketFileStoreWrite) ((TableWriteImpl<?>)
write).getWrite())
+ .writers()
+ .values()) {
+ for (AbstractFileStoreWrite.WriterContainer<KeyValue>
writerContainer :
+ bucketWriters.values()) {
+ PostponeBucketWriter writer = (PostponeBucketWriter)
writerContainer.writer;
+ assertThat(writer.useBufferedSinkWriter()).isTrue();
+ }
+ }
+ commit.commit(write.prepareCommit());
+ }
+
+ Snapshot snapshot = table.latestSnapshot().get();
+ ManifestFileMeta manifest =
+
table.manifestListReader().read(snapshot.deltaManifestList()).get(0);
+ List<ManifestEntry> entries =
table.manifestFileReader().read(manifest.fileName());
+ assertThat(entries.size()).isEqualTo(100);
+ }
+
@Test
public void testPostponeBucket() throws Exception {
FileStoreTable table =