This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new a51c10929 [core] Refactor Multiple Partitions Write for Append table 
(#1994)
a51c10929 is described below

commit a51c10929d99cd3d2964c28ce1bcaec450f4a9ae
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Sep 12 22:43:12 2023 +0800

    [core] Refactor Multiple Partitions Write for Append table (#1994)
---
 docs/content/concepts/append-only-table.md         | 25 +++-----------
 .../main/java/org/apache/paimon/CoreOptions.java   |  2 +-
 .../org/apache/paimon/append/AppendOnlyWriter.java | 39 +++++++++++-----------
 .../org/apache/paimon/disk/ExternalBuffer.java     | 13 ++++++--
 .../org/apache/paimon/disk/InMemoryBuffer.java     |  5 +--
 .../{InternalRowBuffer.java => RowBuffer.java}     |  8 ++---
 .../paimon/operation/AppendOnlyFileStoreWrite.java |  2 +-
 .../apache/paimon/append/AppendOnlyWriterTest.java |  6 ++--
 .../org/apache/paimon/disk/ExternalBufferTest.java |  7 ++--
 .../org/apache/paimon/disk/InMemoryBufferTest.java |  4 +--
 .../sink/index/GlobalIndexAssignerOperator.java    |  9 +++--
 11 files changed, 55 insertions(+), 65 deletions(-)

diff --git a/docs/content/concepts/append-only-table.md 
b/docs/content/concepts/append-only-table.md
index e8a87b699..b3bfa8168 100644
--- a/docs/content/concepts/append-only-table.md
+++ b/docs/content/concepts/append-only-table.md
@@ -279,27 +279,12 @@ CREATE TABLE MyTable (
 {{< /tabs >}}
 
 ## Multiple Partitions Write
+
 While writing multiple partitions in a single insert job, we may get an 
out-of-memory error if
 too many records arrived between two checkpoint.
 
-On 0.6 version, introduced a `write-buffer-for-append` option for append-only 
table. Setting this
-parameter to true, we will cache the records use Segment Pool to avoid OOM. 
-
-### Influence
-If we also set `write-buffer-spillable` to true, we can spill the records to 
disk by serializer. 
-But this may cause the checkpoint acknowledge delay and have an influence on 
sink speed.
-
-But if we don't set `write-buffer-spillable`, once we run out of memory in 
segment poll, we will flush
-them to filesystem as a complete data file. This may cause too many small 
files, and make more pressure on 
-compaction. We need to trade off carefully.
+You can `write-buffer-for-append` option for append-only table. Setting this 
parameter to true, writer will cache
+the records use Segment Pool to avoid OOM.
 
-### Example
-```sql
-CREATE TABLE MyTable (
-    product_id BIGINT,
-    price DOUBLE,
-    sales BIGINT
-) WITH (
-    'write-buffer-for-append' = 'true'
-);
-```
\ No newline at end of file
+You can also set `write-buffer-spillable` to true, writer can spill the 
records to disk. This can reduce small
+files as much as possible.
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index e4f233ef2..84723a2fd 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1014,7 +1014,7 @@ public class CoreOptions implements Serializable {
         return 
options.getOptional(WRITE_BUFFER_SPILLABLE).orElse(usingObjectStore || 
!isStreaming);
     }
 
-    public boolean useWriteBuffer() {
+    public boolean useWriteBufferForAppend() {
         return options.get(WRITE_BUFFER_FOR_APPEND);
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
index 7b0909568..51ddbbed0 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
@@ -24,7 +24,7 @@ import org.apache.paimon.compact.CompactManager;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.serializer.InternalRowSerializer;
 import org.apache.paimon.disk.IOManager;
-import org.apache.paimon.disk.InternalRowBuffer;
+import org.apache.paimon.disk.RowBuffer;
 import org.apache.paimon.format.FileFormat;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.io.CompactIncrement;
@@ -70,7 +70,6 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow>, MemoryOwner
     private final List<DataFileMeta> compactAfter;
     private final LongCounter seqNumCounter;
     private final String fileCompression;
-    private final boolean spillable;
     private final SinkWriter sinkWriter;
     private final FieldStatsCollector.Factory[] statsCollectors;
     private final IOManager ioManager;
@@ -104,11 +103,11 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow>, MemoryOwner
         this.compactAfter = new ArrayList<>();
         this.seqNumCounter = new LongCounter(maxSequenceNumber + 1);
         this.fileCompression = fileCompression;
-        this.spillable = spillable;
         this.ioManager = ioManager;
         this.statsCollectors = statsCollectors;
 
-        sinkWriter = createSinkWrite(useWriteBuffer);
+        this.sinkWriter =
+                useWriteBuffer ? new BufferedSinkWriter(spillable) : new 
DirectSinkWriter();
 
         if (increment != null) {
             newFiles.addAll(increment.newFilesIncrement().newFiles());
@@ -190,14 +189,6 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow>, MemoryOwner
         sinkWriter.close();
     }
 
-    private SinkWriter createSinkWrite(boolean useWriteBuffer) {
-        if (useWriteBuffer) {
-            return new BufferedSinkWriter();
-        } else {
-            return new DirectSinkWriter();
-        }
-    }
-
     private RowDataRollingFileWriter createRollingRowWriter() {
         return new RowDataRollingFileWriter(
                 fileIO,
@@ -254,7 +245,7 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow>, MemoryOwner
     }
 
     @VisibleForTesting
-    InternalRowBuffer getWriteBuffer() {
+    RowBuffer getWriteBuffer() {
         if (sinkWriter instanceof BufferedSinkWriter) {
             return ((BufferedSinkWriter) sinkWriter).writeBuffer;
         } else {
@@ -268,7 +259,7 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow>, MemoryOwner
     }
 
     /** Internal interface to Sink Data from input. */
-    interface SinkWriter {
+    private interface SinkWriter {
 
         boolean write(InternalRow data) throws IOException;
 
@@ -287,10 +278,13 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow>, MemoryOwner
      */
     private class DirectSinkWriter implements SinkWriter {
 
-        private RowDataRollingFileWriter writer = createRollingRowWriter();
+        private RowDataRollingFileWriter writer;
 
         @Override
         public boolean write(InternalRow data) throws IOException {
+            if (writer == null) {
+                writer = createRollingRowWriter();
+            }
             writer.write(data);
             return true;
         }
@@ -301,7 +295,7 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow>, MemoryOwner
             if (writer != null) {
                 writer.close();
                 flushedFiles.addAll(writer.result());
-                writer = createRollingRowWriter();
+                writer = null;
             }
             return flushedFiles;
         }
@@ -331,7 +325,13 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow>, MemoryOwner
      */
     private class BufferedSinkWriter implements SinkWriter {
 
-        InternalRowBuffer writeBuffer;
+        private final boolean spillable;
+
+        private RowBuffer writeBuffer;
+
+        private BufferedSinkWriter(boolean spillable) {
+            this.spillable = spillable;
+        }
 
         @Override
         public boolean write(InternalRow data) throws IOException {
@@ -344,8 +344,7 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow>, MemoryOwner
             if (writeBuffer != null) {
                 writeBuffer.complete();
                 RowDataRollingFileWriter writer = createRollingRowWriter();
-                try (InternalRowBuffer.InternalRowBufferIterator iterator =
-                        writeBuffer.newIterator()) {
+                try (RowBuffer.RowBufferIterator iterator = 
writeBuffer.newIterator()) {
                     while (iterator.advanceNext()) {
                         writer.write(iterator.getRow());
                     }
@@ -375,7 +374,7 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow>, MemoryOwner
         @Override
         public void setMemoryPool(MemorySegmentPool memoryPool) {
             writeBuffer =
-                    InternalRowBuffer.getBuffer(
+                    RowBuffer.getBuffer(
                             ioManager,
                             memoryPool,
                             new InternalRowSerializer(writeSchema),
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java 
b/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java
index 2503bf8e6..702e8914e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java
@@ -38,7 +38,7 @@ import java.util.List;
 import static org.apache.paimon.utils.Preconditions.checkState;
 
 /** An external buffer for storing rows, it will spill the data to disk when 
the memory is full. */
-public class ExternalBuffer implements InternalRowBuffer {
+public class ExternalBuffer implements RowBuffer {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(ExternalBuffer.class);
 
@@ -78,6 +78,7 @@ public class ExternalBuffer implements InternalRowBuffer {
                 new InMemoryBuffer(pool, 
(AbstractRowDataSerializer<InternalRow>) serializer);
     }
 
+    @Override
     public void reset() {
         clearChannels();
         inMemoryBuffer.reset();
@@ -85,6 +86,7 @@ public class ExternalBuffer implements InternalRowBuffer {
         addCompleted = false;
     }
 
+    @Override
     public boolean put(InternalRow row) throws IOException {
         checkState(!addCompleted, "This buffer has add completed.");
         if (!inMemoryBuffer.put(row)) {
@@ -102,12 +104,13 @@ public class ExternalBuffer implements InternalRowBuffer {
         return true;
     }
 
+    @Override
     public void complete() {
         addCompleted = true;
     }
 
     @Override
-    public InternalRowBufferIterator newIterator() {
+    public RowBufferIterator newIterator() {
         checkState(addCompleted, "This buffer has not add completed.");
         return new BufferIterator();
     }
@@ -157,10 +160,12 @@ public class ExternalBuffer implements InternalRowBuffer {
         inMemoryBuffer.reset();
     }
 
+    @Override
     public int size() {
         return numRows;
     }
 
+    @Override
     public long memoryOccupancy() {
         return inMemoryBuffer.memoryOccupancy();
     }
@@ -181,7 +186,7 @@ public class ExternalBuffer implements InternalRowBuffer {
     }
 
     /** Iterator of external buffer. */
-    public class BufferIterator implements InternalRowBufferIterator {
+    public class BufferIterator implements RowBufferIterator {
 
         private MutableObjectIterator<BinaryRow> currentIterator;
         private final BinaryRow reuse = binaryRowSerializer.createInstance();
@@ -216,6 +221,7 @@ public class ExternalBuffer implements InternalRowBuffer {
             closed = true;
         }
 
+        @Override
         public boolean advanceNext() {
             checkValidity();
 
@@ -246,6 +252,7 @@ public class ExternalBuffer implements InternalRowBuffer {
             return true;
         }
 
+        @Override
         public BinaryRow getRow() {
             return row;
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/disk/InMemoryBuffer.java 
b/paimon-core/src/main/java/org/apache/paimon/disk/InMemoryBuffer.java
index 4758bb183..a9eaf55e5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/disk/InMemoryBuffer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/disk/InMemoryBuffer.java
@@ -32,7 +32,8 @@ import java.io.IOException;
 import java.util.ArrayList;
 
 /** Only cache {@link InternalRow}s in memory. */
-public class InMemoryBuffer implements InternalRowBuffer {
+public class InMemoryBuffer implements RowBuffer {
+
     private final AbstractRowDataSerializer<InternalRow> serializer;
     private final ArrayList<MemorySegment> recordBufferSegments;
     private final SimpleCollectingOutputView recordCollector;
@@ -126,7 +127,7 @@ public class InMemoryBuffer implements InternalRowBuffer {
     }
 
     private static class InMemoryBufferIterator
-            implements InternalRowBufferIterator, 
MutableObjectIterator<BinaryRow> {
+            implements RowBufferIterator, MutableObjectIterator<BinaryRow> {
 
         private final RandomAccessInputView recordBuffer;
         private final AbstractRowDataSerializer<InternalRow> serializer;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/disk/InternalRowBuffer.java 
b/paimon-core/src/main/java/org/apache/paimon/disk/RowBuffer.java
similarity index 90%
rename from 
paimon-core/src/main/java/org/apache/paimon/disk/InternalRowBuffer.java
rename to paimon-core/src/main/java/org/apache/paimon/disk/RowBuffer.java
index 776db52aa..c5c881331 100644
--- a/paimon-core/src/main/java/org/apache/paimon/disk/InternalRowBuffer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/disk/RowBuffer.java
@@ -27,7 +27,7 @@ import java.io.Closeable;
 import java.io.IOException;
 
 /** Cache buffer for {@link InternalRow}. */
-public interface InternalRowBuffer {
+public interface RowBuffer {
 
     boolean put(InternalRow row) throws IOException;
 
@@ -39,10 +39,10 @@ public interface InternalRowBuffer {
 
     void reset();
 
-    InternalRowBufferIterator newIterator();
+    RowBufferIterator newIterator();
 
     /** Iterator to fetch record from buffer. */
-    interface InternalRowBufferIterator extends Closeable {
+    interface RowBufferIterator extends Closeable {
 
         boolean advanceNext();
 
@@ -51,7 +51,7 @@ public interface InternalRowBuffer {
         void close();
     }
 
-    static InternalRowBuffer getBuffer(
+    static RowBuffer getBuffer(
             IOManager ioManager,
             MemorySegmentPool memoryPool,
             AbstractRowDataSerializer<InternalRow> serializer,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
index ca1d65cae..dc8b74433 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
@@ -95,7 +95,7 @@ public class AppendOnlyFileStoreWrite extends 
MemoryFileStoreWrite<InternalRow>
         this.commitForceCompact = options.commitForceCompact();
         this.skipCompaction = options.writeOnly();
         this.fileCompression = options.fileCompression();
-        this.useWriteBuffer = options.useWriteBuffer();
+        this.useWriteBuffer = options.useWriteBufferForAppend();
         this.spillable = options.writeBufferSpillable(fileIO.isObjectStore(), 
isStreamingMode);
         this.statsCollectors =
                 StatsCollectorFactories.createStatsFactories(options, 
rowType.getFieldNames());
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
index 5699ea415..62a0292a8 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
@@ -27,7 +27,7 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.disk.ChannelWithMeta;
 import org.apache.paimon.disk.ExternalBuffer;
 import org.apache.paimon.disk.IOManager;
-import org.apache.paimon.disk.InternalRowBuffer;
+import org.apache.paimon.disk.RowBuffer;
 import org.apache.paimon.format.FieldStats;
 import org.apache.paimon.format.FileFormat;
 import org.apache.paimon.fs.Path;
@@ -329,7 +329,7 @@ public class AppendOnlyWriterTest {
             writer.write(row(j, String.valueOf(s), PART));
         }
 
-        InternalRowBuffer buffer = writer.getWriteBuffer();
+        RowBuffer buffer = writer.getWriteBuffer();
         Assertions.assertThat(buffer.size()).isEqualTo(100);
         
Assertions.assertThat(buffer.memoryOccupancy()).isLessThanOrEqualTo(16384L);
 
@@ -343,7 +343,7 @@ public class AppendOnlyWriterTest {
         // we give it a small Memory Pool, force it to spill
         writer.setMemoryPool(new HeapMemorySegmentPool(16384L, 1024));
 
-        InternalRowBuffer buffer = writer.getWriteBuffer();
+        RowBuffer buffer = writer.getWriteBuffer();
         Assertions.assertThat(buffer).isNull();
 
         writer.close();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/disk/ExternalBufferTest.java 
b/paimon-core/src/test/java/org/apache/paimon/disk/ExternalBufferTest.java
index 6ca73bd6a..53e530e91 100644
--- a/paimon-core/src/test/java/org/apache/paimon/disk/ExternalBufferTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/disk/ExternalBufferTest.java
@@ -168,14 +168,13 @@ public class ExternalBufferTest {
         buffer.put(row);
     }
 
-    private void assertBuffer(List<Long> expected, InternalRowBuffer buffer) {
-        InternalRowBuffer.InternalRowBufferIterator iterator = 
buffer.newIterator();
+    private void assertBuffer(List<Long> expected, RowBuffer buffer) {
+        RowBuffer.RowBufferIterator iterator = buffer.newIterator();
         assertBuffer(expected, iterator);
         iterator.close();
     }
 
-    private void assertBuffer(
-            List<Long> expected, InternalRowBuffer.InternalRowBufferIterator 
iterator) {
+    private void assertBuffer(List<Long> expected, RowBuffer.RowBufferIterator 
iterator) {
         List<Long> values = new ArrayList<>();
         while (iterator.advanceNext()) {
             values.add(iterator.getRow().getLong(0));
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/disk/InMemoryBufferTest.java 
b/paimon-core/src/test/java/org/apache/paimon/disk/InMemoryBufferTest.java
index e1b127714..9bcfdc6c6 100644
--- a/paimon-core/src/test/java/org/apache/paimon/disk/InMemoryBufferTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/disk/InMemoryBufferTest.java
@@ -33,7 +33,7 @@ import java.util.Arrays;
 
 import static org.apache.paimon.memory.MemorySegmentPool.DEFAULT_PAGE_SIZE;
 
-/** Tests for {@link InternalRowBuffer}. */
+/** Tests for {@link RowBuffer}. */
 public class InMemoryBufferTest {
 
     private InternalRowSerializer serializer;
@@ -87,7 +87,7 @@ public class InMemoryBufferTest {
         }
 
         Assertions.assertThat(buffer.size()).isEqualTo(100);
-        try (InternalRowBuffer.InternalRowBufferIterator iterator = 
buffer.newIterator()) {
+        try (RowBuffer.RowBufferIterator iterator = buffer.newIterator()) {
             while (iterator.advanceNext()) {
                 Assertions.assertThat(iterator.getRow()).isEqualTo(binaryRow);
             }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
index fd36d4a39..d255cdfbb 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
@@ -22,7 +22,7 @@ import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.serializer.InternalRowSerializer;
 import org.apache.paimon.disk.IOManager;
-import org.apache.paimon.disk.InternalRowBuffer;
+import org.apache.paimon.disk.RowBuffer;
 import org.apache.paimon.flink.FlinkRowData;
 import org.apache.paimon.flink.FlinkRowWrapper;
 import org.apache.paimon.flink.sink.RowDataPartitionKeyExtractor;
@@ -60,7 +60,7 @@ public class GlobalIndexAssignerOperator<T> extends 
AbstractStreamOperator<Tuple
     private final SerializableFunction<T, InternalRow> toRow;
     private final SerializableFunction<InternalRow, T> fromRow;
 
-    private transient InternalRowBuffer bootstrapBuffer;
+    private transient RowBuffer bootstrapBuffer;
 
     public GlobalIndexAssignerOperator(
             Table table,
@@ -89,7 +89,7 @@ public class GlobalIndexAssignerOperator<T> extends 
AbstractStreamOperator<Tuple
         long bufferSize = 
options.get(CoreOptions.WRITE_BUFFER_SIZE).getBytes();
         long pageSize = options.get(CoreOptions.PAGE_SIZE).getBytes();
         bootstrapBuffer =
-                InternalRowBuffer.getBuffer(
+                RowBuffer.getBuffer(
                         
IOManager.create(ioManager.getSpillingDirectoriesPaths()),
                         new HeapMemorySegmentPool(bufferSize, (int) pageSize),
                         new InternalRowSerializer(table.rowType()),
@@ -130,8 +130,7 @@ public class GlobalIndexAssignerOperator<T> extends 
AbstractStreamOperator<Tuple
     private void endBootstrap() throws Exception {
         if (bootstrapBuffer != null) {
             bootstrapBuffer.complete();
-            try (InternalRowBuffer.InternalRowBufferIterator iterator =
-                    bootstrapBuffer.newIterator()) {
+            try (RowBuffer.RowBufferIterator iterator = 
bootstrapBuffer.newIterator()) {
                 while (iterator.advanceNext()) {
                     assigner.process(fromRow.apply(iterator.getRow()));
                 }

Reply via email to