This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 3048dbecf [core] Introduce write-buffer-for-append to writing lots of 
partitions (#1969)
3048dbecf is described below

commit 3048dbecfebf922f5e26e2bf36ac610b7c45609b
Author: YeJunHao <[email protected]>
AuthorDate: Tue Sep 12 19:58:35 2023 +0800

    [core] Introduce write-buffer-for-append to writing lots of partitions 
(#1969)
---
 docs/content/concepts/append-only-table.md         |  30 ++-
 .../shortcodes/generated/core_configuration.html   |   6 +
 .../main/java/org/apache/paimon/CoreOptions.java   |  11 ++
 .../org/apache/paimon/append/AppendOnlyWriter.java | 202 +++++++++++++++++++--
 .../org/apache/paimon/disk/ExternalBuffer.java     | 136 ++------------
 .../org/apache/paimon/disk/InMemoryBuffer.java     | 176 ++++++++++++++++++
 .../org/apache/paimon/disk/InternalRowBuffer.java  |  65 +++++++
 .../paimon/operation/AppendOnlyFileStoreWrite.java |  11 +-
 .../apache/paimon/append/AppendOnlyWriterTest.java | 183 ++++++++++++++++++-
 .../org/apache/paimon/disk/ExternalBufferTest.java |  11 +-
 .../org/apache/paimon/disk/InMemoryBufferTest.java | 117 ++++++++++++
 .../apache/paimon/format/FileFormatSuffixTest.java |  10 +-
 .../sink/index/GlobalIndexAssignerOperator.java    |  16 +-
 13 files changed, 819 insertions(+), 155 deletions(-)

diff --git a/docs/content/concepts/append-only-table.md 
b/docs/content/concepts/append-only-table.md
index 3ae9d88b3..e8a87b699 100644
--- a/docs/content/concepts/append-only-table.md
+++ b/docs/content/concepts/append-only-table.md
@@ -27,7 +27,7 @@ under the License.
 # Append Only Table
 
 If a table does not have a primary key defined, it is an append-only table by 
default. Separated by the definition of bucket,
-we have two different append-only mode: "Append For Queue" and "Append For 
Scalable Table". 
+we have two different append-only mode: "Append For Queue" and "Append For 
Scalable Table".
 
 ## Append For Queue
 
@@ -276,4 +276,30 @@ CREATE TABLE MyTable (
 ```
 {{< /tab >}}
 
-{{< /tabs >}}
\ No newline at end of file
+{{< /tabs >}}
+
+## Multiple Partitions Write
+While writing multiple partitions in a single insert job, we may get an 
out-of-memory error if
+too many records arrived between two checkpoint.
+
+On 0.6 version, introduced a `write-buffer-for-append` option for append-only 
table. Setting this
+parameter to true, we will cache the records use Segment Pool to avoid OOM. 
+
+### Influence
+If we also set `write-buffer-spillable` to true, we can spill the records to 
disk by serializer. 
+But this may cause the checkpoint acknowledge delay and have an influence on 
sink speed.
+
+But if we don't set `write-buffer-spillable`, once we run out of memory in 
segment poll, we will flush
+them to filesystem as a complete data file. This may cause too many small 
files, and make more pressure on 
+compaction. We need to trade off carefully.
+
+### Example
+```sql
+CREATE TABLE MyTable (
+    product_id BIGINT,
+    price DOUBLE,
+    sales BIGINT
+) WITH (
+    'write-buffer-for-append' = 'true'
+);
+```
\ No newline at end of file
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index c3b877982..d327b9243 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -557,6 +557,12 @@ This config option does not affect the default filesystem 
metastore.</td>
             <td>MemorySize</td>
             <td>Target size of a file.</td>
         </tr>
+        <tr>
+            <td><h5>write-buffer-for-append</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>This option only works for append-only table. Whether the 
write use write buffer to avoid out-of-memory error.</td>
+        </tr>
         <tr>
             <td><h5>write-buffer-size</h5></td>
             <td style="word-wrap: break-word;">256 mb</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 7ded97bba..e4f233ef2 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -277,6 +277,13 @@ public class CoreOptions implements Serializable {
                     .withDescription(
                             "Whether the write buffer can be spillable. 
Enabled by default when using object storage.");
 
+    public static final ConfigOption<Boolean> WRITE_BUFFER_FOR_APPEND =
+            key("write-buffer-for-append")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "This option only works for append-only table. 
Whether the write use write buffer to avoid out-of-memory error.");
+
     public static final ConfigOption<MemorySize> WRITE_MANIFEST_CACHE =
             key("write-manifest-cache")
                     .memoryType()
@@ -1007,6 +1014,10 @@ public class CoreOptions implements Serializable {
         return 
options.getOptional(WRITE_BUFFER_SPILLABLE).orElse(usingObjectStore || 
!isStreaming);
     }
 
+    public boolean useWriteBuffer() {
+        return options.get(WRITE_BUFFER_FOR_APPEND);
+    }
+
     public long sortSpillBufferSize() {
         return options.get(SORT_SPILL_BUFFER_SIZE).getBytes();
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
index 84fd01d73..7b0909568 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
@@ -19,8 +19,12 @@
 
 package org.apache.paimon.append;
 
+import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.compact.CompactManager;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.disk.InternalRowBuffer;
 import org.apache.paimon.format.FileFormat;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.io.CompactIncrement;
@@ -28,6 +32,8 @@ import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataFilePathFactory;
 import org.apache.paimon.io.NewFilesIncrement;
 import org.apache.paimon.io.RowDataRollingFileWriter;
+import org.apache.paimon.memory.MemoryOwner;
+import org.apache.paimon.memory.MemorySegmentPool;
 import org.apache.paimon.statistics.FieldStatsCollector;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.types.RowType;
@@ -38,6 +44,7 @@ import org.apache.paimon.utils.RecordWriter;
 
 import javax.annotation.Nullable;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -48,7 +55,7 @@ import java.util.concurrent.ExecutionException;
  * A {@link RecordWriter} implementation that only accepts records which are 
always insert
  * operations and don't have any unique keys or sort keys.
  */
-public class AppendOnlyWriter implements RecordWriter<InternalRow> {
+public class AppendOnlyWriter implements RecordWriter<InternalRow>, 
MemoryOwner {
 
     private final FileIO fileIO;
     private final long schemaId;
@@ -63,12 +70,14 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow> {
     private final List<DataFileMeta> compactAfter;
     private final LongCounter seqNumCounter;
     private final String fileCompression;
+    private final boolean spillable;
+    private final SinkWriter sinkWriter;
     private final FieldStatsCollector.Factory[] statsCollectors;
-
-    private RowDataRollingFileWriter writer;
+    private final IOManager ioManager;
 
     public AppendOnlyWriter(
             FileIO fileIO,
+            IOManager ioManager,
             long schemaId,
             FileFormat fileFormat,
             long targetFileSize,
@@ -78,6 +87,8 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow> {
             boolean forceCompact,
             DataFilePathFactory pathFactory,
             @Nullable CommitIncrement increment,
+            boolean useWriteBuffer,
+            boolean spillable,
             String fileCompression,
             FieldStatsCollector.Factory[] statsCollectors) {
         this.fileIO = fileIO;
@@ -93,9 +104,11 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow> {
         this.compactAfter = new ArrayList<>();
         this.seqNumCounter = new LongCounter(maxSequenceNumber + 1);
         this.fileCompression = fileCompression;
+        this.spillable = spillable;
+        this.ioManager = ioManager;
         this.statsCollectors = statsCollectors;
 
-        this.writer = createRollingRowWriter();
+        sinkWriter = createSinkWrite(useWriteBuffer);
 
         if (increment != null) {
             newFiles.addAll(increment.newFilesIncrement().newFiles());
@@ -110,12 +123,22 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow> {
                 rowData.getRowKind() == RowKind.INSERT,
                 "Append-only writer can only accept insert row kind, but 
current row kind is: %s",
                 rowData.getRowKind());
-        writer.write(rowData);
+        boolean success = sinkWriter.write(rowData);
+        if (!success) {
+            flush(false, false);
+            success = sinkWriter.write(rowData);
+            if (!success) {
+                // Should not get here, because writeBuffer will throw too big 
exception out.
+                // But we throw again in case of something unexpected happens. 
(like someone changed
+                // code in SpillableBuffer.)
+                throw new RuntimeException("Mem table is too small to hold a 
single element.");
+            }
+        }
     }
 
     @Override
     public void compact(boolean fullCompaction) throws Exception {
-        flushWriter(true, fullCompaction);
+        flush(true, fullCompaction);
     }
 
     @Override
@@ -130,19 +153,14 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow> {
 
     @Override
     public CommitIncrement prepareCommit(boolean waitCompaction) throws 
Exception {
-        flushWriter(false, false);
+        flush(false, false);
         trySyncLatestCompaction(waitCompaction || forceCompact);
         return drainIncrement();
     }
 
-    private void flushWriter(boolean waitForLatestCompaction, boolean 
forcedFullCompaction)
+    private void flush(boolean waitForLatestCompaction, boolean 
forcedFullCompaction)
             throws Exception {
-        List<DataFileMeta> flushedFiles = new ArrayList<>();
-        if (writer != null) {
-            writer.close();
-            flushedFiles.addAll(writer.result());
-            writer = createRollingRowWriter();
-        }
+        List<DataFileMeta> flushedFiles = sinkWriter.flush();
 
         // add new generated files
         flushedFiles.forEach(compactManager::addNewFile);
@@ -169,9 +187,14 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow> {
             fileIO.deleteQuietly(pathFactory.toPath(file.fileName()));
         }
 
-        if (writer != null) {
-            writer.abort();
-            writer = null;
+        sinkWriter.close();
+    }
+
+    private SinkWriter createSinkWrite(boolean useWriteBuffer) {
+        if (useWriteBuffer) {
+            return new BufferedSinkWriter();
+        } else {
+            return new DirectSinkWriter();
         }
     }
 
@@ -214,4 +237,149 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow> {
 
         return new CommitIncrement(newFilesIncrement, compactIncrement);
     }
+
+    @Override
+    public void setMemoryPool(MemorySegmentPool memoryPool) {
+        sinkWriter.setMemoryPool(memoryPool);
+    }
+
+    @Override
+    public long memoryOccupancy() {
+        return sinkWriter.memoryOccupancy();
+    }
+
+    @Override
+    public void flushMemory() throws Exception {
+        flush(false, false);
+    }
+
+    @VisibleForTesting
+    InternalRowBuffer getWriteBuffer() {
+        if (sinkWriter instanceof BufferedSinkWriter) {
+            return ((BufferedSinkWriter) sinkWriter).writeBuffer;
+        } else {
+            return null;
+        }
+    }
+
+    @VisibleForTesting
+    List<DataFileMeta> getNewFiles() {
+        return newFiles;
+    }
+
+    /** Internal interface to Sink Data from input. */
+    interface SinkWriter {
+
+        boolean write(InternalRow data) throws IOException;
+
+        List<DataFileMeta> flush() throws IOException;
+
+        long memoryOccupancy();
+
+        void close();
+
+        void setMemoryPool(MemorySegmentPool memoryPool);
+    }
+
+    /**
+     * Directly sink data to file, no memory cache here, use 
OrcWriter/ParquetWrite/etc directly
+     * write data. May cause out-of-memory.
+     */
+    private class DirectSinkWriter implements SinkWriter {
+
+        private RowDataRollingFileWriter writer = createRollingRowWriter();
+
+        @Override
+        public boolean write(InternalRow data) throws IOException {
+            writer.write(data);
+            return true;
+        }
+
+        @Override
+        public List<DataFileMeta> flush() throws IOException {
+            List<DataFileMeta> flushedFiles = new ArrayList<>();
+            if (writer != null) {
+                writer.close();
+                flushedFiles.addAll(writer.result());
+                writer = createRollingRowWriter();
+            }
+            return flushedFiles;
+        }
+
+        @Override
+        public long memoryOccupancy() {
+            return 0;
+        }
+
+        @Override
+        public void close() {
+            if (writer != null) {
+                writer.abort();
+                writer = null;
+            }
+        }
+
+        @Override
+        public void setMemoryPool(MemorySegmentPool memoryPool) {
+            // do nothing
+        }
+    }
+
+    /**
+     * Use buffered writer, segment pooled from segment pool. When spillable, 
may delay checkpoint
+     * acknowledge time. When non-spillable, may cause too many small files.
+     */
+    private class BufferedSinkWriter implements SinkWriter {
+
+        InternalRowBuffer writeBuffer;
+
+        @Override
+        public boolean write(InternalRow data) throws IOException {
+            return writeBuffer.put(data);
+        }
+
+        @Override
+        public List<DataFileMeta> flush() throws IOException {
+            List<DataFileMeta> flushedFiles = new ArrayList<>();
+            if (writeBuffer != null) {
+                writeBuffer.complete();
+                RowDataRollingFileWriter writer = createRollingRowWriter();
+                try (InternalRowBuffer.InternalRowBufferIterator iterator =
+                        writeBuffer.newIterator()) {
+                    while (iterator.advanceNext()) {
+                        writer.write(iterator.getRow());
+                    }
+                } finally {
+                    writer.close();
+                }
+                flushedFiles.addAll(writer.result());
+                // reuse writeBuffer
+                writeBuffer.reset();
+            }
+            return flushedFiles;
+        }
+
+        @Override
+        public long memoryOccupancy() {
+            return writeBuffer.memoryOccupancy();
+        }
+
+        @Override
+        public void close() {
+            if (writeBuffer != null) {
+                writeBuffer.reset();
+                writeBuffer = null;
+            }
+        }
+
+        @Override
+        public void setMemoryPool(MemorySegmentPool memoryPool) {
+            writeBuffer =
+                    InternalRowBuffer.getBuffer(
+                            ioManager,
+                            memoryPool,
+                            new InternalRowSerializer(writeSchema),
+                            spillable);
+        }
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java 
b/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java
index 9ced8e83d..2503bf8e6 100644
--- a/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java
@@ -20,8 +20,6 @@ package org.apache.paimon.disk;
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.data.RandomAccessInputView;
-import org.apache.paimon.data.SimpleCollectingOutputView;
 import org.apache.paimon.data.serializer.AbstractRowDataSerializer;
 import org.apache.paimon.data.serializer.BinaryRowSerializer;
 import org.apache.paimon.memory.Buffer;
@@ -32,8 +30,6 @@ import org.apache.paimon.utils.MutableObjectIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.Closeable;
-import java.io.EOFException;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -42,7 +38,7 @@ import java.util.List;
 import static org.apache.paimon.utils.Preconditions.checkState;
 
 /** An external buffer for storing rows, it will spill the data to disk when 
the memory is full. */
-public class ExternalBuffer {
+public class ExternalBuffer implements InternalRowBuffer {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(ExternalBuffer.class);
 
@@ -59,7 +55,7 @@ public class ExternalBuffer {
 
     private boolean addCompleted;
 
-    public ExternalBuffer(
+    ExternalBuffer(
             IOManager ioManager, MemorySegmentPool pool, 
AbstractRowDataSerializer<?> serializer) {
         this.ioManager = ioManager;
         this.pool = pool;
@@ -79,7 +75,7 @@ public class ExternalBuffer {
 
         //noinspection unchecked
         this.inMemoryBuffer =
-                new InMemoryBuffer((AbstractRowDataSerializer<InternalRow>) 
serializer);
+                new InMemoryBuffer(pool, 
(AbstractRowDataSerializer<InternalRow>) serializer);
     }
 
     public void reset() {
@@ -89,33 +85,35 @@ public class ExternalBuffer {
         addCompleted = false;
     }
 
-    public void add(InternalRow row) throws IOException {
+    public boolean put(InternalRow row) throws IOException {
         checkState(!addCompleted, "This buffer has add completed.");
-        if (!inMemoryBuffer.write(row)) {
+        if (!inMemoryBuffer.put(row)) {
             // Check if record is too big.
             if (inMemoryBuffer.getCurrentDataBufferOffset() == 0) {
                 throwTooBigException(row);
             }
             spill();
-            if (!inMemoryBuffer.write(row)) {
+            if (!inMemoryBuffer.put(row)) {
                 throwTooBigException(row);
             }
         }
 
         numRows++;
+        return true;
     }
 
     public void complete() {
         addCompleted = true;
     }
 
-    public BufferIterator newIterator() {
+    @Override
+    public InternalRowBufferIterator newIterator() {
         checkState(addCompleted, "This buffer has not add completed.");
         return new BufferIterator();
     }
 
     private void throwTooBigException(InternalRow row) throws IOException {
-        int rowSize = 
inMemoryBuffer.serializer.toBinaryRow(row).toBytes().length;
+        int rowSize = 
inMemoryBuffer.getSerializer().toBinaryRow(row).toBytes().length;
         throw new IOException(
                 "Record is too big, it can't be added to a empty 
InMemoryBuffer! "
                         + "Record size: "
@@ -142,7 +140,7 @@ public class ExternalBuffer {
             }
             LOG.info(
                     "here spill the reset buffer data with {} records {} 
bytes",
-                    inMemoryBuffer.numRecords,
+                    inMemoryBuffer.size(),
                     writer.getSize());
             writer.close();
         } catch (IOException e) {
@@ -163,6 +161,10 @@ public class ExternalBuffer {
         return numRows;
     }
 
+    public long memoryOccupancy() {
+        return inMemoryBuffer.memoryOccupancy();
+    }
+
     private int memorySize() {
         return pool.freePages() * segmentSize;
     }
@@ -179,7 +181,7 @@ public class ExternalBuffer {
     }
 
     /** Iterator of external buffer. */
-    public class BufferIterator implements Closeable {
+    public class BufferIterator implements InternalRowBufferIterator {
 
         private MutableObjectIterator<BinaryRow> currentIterator;
         private final BinaryRow reuse = binaryRowSerializer.createInstance();
@@ -234,7 +236,7 @@ public class ExternalBuffer {
         }
 
         private boolean nextIterator() throws IOException {
-            if (currentChannelID == Integer.MAX_VALUE) {
+            if (currentChannelID == Integer.MAX_VALUE || numRows == 0) {
                 return false;
             } else if (currentChannelID < spilledChannelIDs.size() - 1) {
                 nextSpilledIterator();
@@ -275,109 +277,7 @@ public class ExternalBuffer {
     }
 
     @VisibleForTesting
-    List<ChannelWithMeta> getSpillChannels() {
+    public List<ChannelWithMeta> getSpillChannels() {
         return spilledChannelIDs;
     }
-
-    private class InMemoryBuffer {
-
-        private final AbstractRowDataSerializer<InternalRow> serializer;
-        private final ArrayList<MemorySegment> recordBufferSegments;
-        private final SimpleCollectingOutputView recordCollector;
-
-        private long currentDataBufferOffset;
-        private int numBytesInLastBuffer;
-        private int numRecords = 0;
-
-        private InMemoryBuffer(AbstractRowDataSerializer<InternalRow> 
serializer) {
-            // serializer has states, so we must duplicate
-            this.serializer = (AbstractRowDataSerializer<InternalRow>) 
serializer.duplicate();
-            this.recordBufferSegments = new ArrayList<>();
-            this.recordCollector =
-                    new SimpleCollectingOutputView(this.recordBufferSegments, 
pool, segmentSize);
-        }
-
-        private void reset() {
-            this.currentDataBufferOffset = 0;
-            this.numRecords = 0;
-            returnToSegmentPool();
-            this.recordCollector.reset();
-        }
-
-        private void returnToSegmentPool() {
-            pool.returnAll(this.recordBufferSegments);
-            this.recordBufferSegments.clear();
-        }
-
-        public boolean write(InternalRow row) throws IOException {
-            try {
-                this.serializer.serializeToPages(row, this.recordCollector);
-                currentDataBufferOffset = 
this.recordCollector.getCurrentOffset();
-                numBytesInLastBuffer = 
this.recordCollector.getCurrentPositionInSegment();
-                numRecords++;
-                return true;
-            } catch (EOFException e) {
-                return false;
-            }
-        }
-
-        private ArrayList<MemorySegment> getRecordBufferSegments() {
-            return recordBufferSegments;
-        }
-
-        private long getCurrentDataBufferOffset() {
-            return currentDataBufferOffset;
-        }
-
-        private int getNumRecordBuffers() {
-            int result = (int) (currentDataBufferOffset / segmentSize);
-            long mod = currentDataBufferOffset % segmentSize;
-            if (mod != 0) {
-                result += 1;
-            }
-            return result;
-        }
-
-        private int getNumBytesInLastBuffer() {
-            return numBytesInLastBuffer;
-        }
-
-        private InMemoryBufferIterator newIterator() {
-            RandomAccessInputView recordBuffer =
-                    new RandomAccessInputView(
-                            this.recordBufferSegments, segmentSize, 
numBytesInLastBuffer);
-            return new InMemoryBufferIterator(recordBuffer, serializer);
-        }
-    }
-
-    private static class InMemoryBufferIterator
-            implements MutableObjectIterator<BinaryRow>, Closeable {
-
-        private final RandomAccessInputView recordBuffer;
-        private final AbstractRowDataSerializer<InternalRow> serializer;
-
-        private InMemoryBufferIterator(
-                RandomAccessInputView recordBuffer,
-                AbstractRowDataSerializer<InternalRow> serializer) {
-            this.recordBuffer = recordBuffer;
-            this.serializer = serializer;
-        }
-
-        @Override
-        public BinaryRow next(BinaryRow reuse) throws IOException {
-            try {
-                return (BinaryRow) serializer.mapFromPages(reuse, 
recordBuffer);
-            } catch (EOFException e) {
-                return null;
-            }
-        }
-
-        @Override
-        public BinaryRow next() throws IOException {
-            throw new RuntimeException("Not support!");
-        }
-
-        @Override
-        public void close() {}
-    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/disk/InMemoryBuffer.java 
b/paimon-core/src/main/java/org/apache/paimon/disk/InMemoryBuffer.java
new file mode 100644
index 000000000..4758bb183
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/disk/InMemoryBuffer.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.disk;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.RandomAccessInputView;
+import org.apache.paimon.data.SimpleCollectingOutputView;
+import org.apache.paimon.data.serializer.AbstractRowDataSerializer;
+import org.apache.paimon.memory.MemorySegment;
+import org.apache.paimon.memory.MemorySegmentPool;
+import org.apache.paimon.utils.MutableObjectIterator;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+
+/** Only cache {@link InternalRow}s in memory. */
+public class InMemoryBuffer implements InternalRowBuffer {
+    private final AbstractRowDataSerializer<InternalRow> serializer;
+    private final ArrayList<MemorySegment> recordBufferSegments;
+    private final SimpleCollectingOutputView recordCollector;
+    private final MemorySegmentPool pool;
+    private final int segmentSize;
+
+    private long currentDataBufferOffset;
+    private int numBytesInLastBuffer;
+    private int numRecords = 0;
+
+    InMemoryBuffer(MemorySegmentPool pool, 
AbstractRowDataSerializer<InternalRow> serializer) {
+        // serializer has states, so we must duplicate
+        this.serializer = (AbstractRowDataSerializer<InternalRow>) 
serializer.duplicate();
+        this.pool = pool;
+        this.segmentSize = pool.pageSize();
+        this.recordBufferSegments = new ArrayList<>();
+        this.recordCollector =
+                new SimpleCollectingOutputView(this.recordBufferSegments, 
pool, segmentSize);
+    }
+
+    @Override
+    public void reset() {
+        this.currentDataBufferOffset = 0;
+        this.numRecords = 0;
+        returnToSegmentPool();
+        this.recordCollector.reset();
+    }
+
+    private void returnToSegmentPool() {
+        pool.returnAll(this.recordBufferSegments);
+        this.recordBufferSegments.clear();
+    }
+
+    @Override
+    public boolean put(InternalRow row) throws IOException {
+        try {
+            this.serializer.serializeToPages(row, this.recordCollector);
+            currentDataBufferOffset = this.recordCollector.getCurrentOffset();
+            numBytesInLastBuffer = 
this.recordCollector.getCurrentPositionInSegment();
+            numRecords++;
+            return true;
+        } catch (EOFException e) {
+            return false;
+        }
+    }
+
+    @Override
+    public int size() {
+        return numRecords;
+    }
+
+    @Override
+    public void complete() {}
+
+    @Override
+    public long memoryOccupancy() {
+        return currentDataBufferOffset;
+    }
+
+    @Override
+    public InMemoryBufferIterator newIterator() {
+        RandomAccessInputView recordBuffer =
+                new RandomAccessInputView(
+                        this.recordBufferSegments, segmentSize, 
numBytesInLastBuffer);
+        return new InMemoryBufferIterator(recordBuffer, serializer);
+    }
+
+    ArrayList<MemorySegment> getRecordBufferSegments() {
+        return recordBufferSegments;
+    }
+
+    long getCurrentDataBufferOffset() {
+        return currentDataBufferOffset;
+    }
+
+    int getNumRecordBuffers() {
+        int result = (int) (currentDataBufferOffset / segmentSize);
+        long mod = currentDataBufferOffset % segmentSize;
+        if (mod != 0) {
+            result += 1;
+        }
+        return result;
+    }
+
+    int getNumBytesInLastBuffer() {
+        return numBytesInLastBuffer;
+    }
+
+    AbstractRowDataSerializer<InternalRow> getSerializer() {
+        return serializer;
+    }
+
+    private static class InMemoryBufferIterator
+            implements InternalRowBufferIterator, 
MutableObjectIterator<BinaryRow> {
+
+        private final RandomAccessInputView recordBuffer;
+        private final AbstractRowDataSerializer<InternalRow> serializer;
+        private final BinaryRow reuse;
+        private BinaryRow row;
+
+        private InMemoryBufferIterator(
+                RandomAccessInputView recordBuffer,
+                AbstractRowDataSerializer<InternalRow> serializer) {
+            this.recordBuffer = recordBuffer;
+            this.serializer = serializer;
+            this.reuse = new BinaryRow(serializer.getArity());
+        }
+
+        @Override
+        public boolean advanceNext() {
+            try {
+                row = next(reuse);
+                return row != null;
+            } catch (IOException ioException) {
+                throw new RuntimeException(ioException);
+            }
+        }
+
+        @Override
+        public BinaryRow getRow() {
+            return row;
+        }
+
+        @Override
+        public BinaryRow next(BinaryRow reuse) throws IOException {
+            try {
+                return (BinaryRow) serializer.mapFromPages(reuse, 
recordBuffer);
+            } catch (EOFException e) {
+                return null;
+            }
+        }
+
+        @Override
+        public BinaryRow next() throws IOException {
+            return next(reuse);
+        }
+
+        @Override
+        public void close() {}
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/disk/InternalRowBuffer.java 
b/paimon-core/src/main/java/org/apache/paimon/disk/InternalRowBuffer.java
new file mode 100644
index 000000000..776db52aa
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/disk/InternalRowBuffer.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.disk;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.AbstractRowDataSerializer;
+import org.apache.paimon.memory.MemorySegmentPool;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/** Cache buffer for {@link InternalRow}. */
+public interface InternalRowBuffer {
+
+    boolean put(InternalRow row) throws IOException;
+
+    int size();
+
+    long memoryOccupancy();
+
+    void complete();
+
+    void reset();
+
+    InternalRowBufferIterator newIterator();
+
+    /** Iterator to fetch record from buffer. */
+    interface InternalRowBufferIterator extends Closeable {
+
+        boolean advanceNext();
+
+        BinaryRow getRow();
+
+        void close();
+    }
+
+    static InternalRowBuffer getBuffer(
+            IOManager ioManager,
+            MemorySegmentPool memoryPool,
+            AbstractRowDataSerializer<InternalRow> serializer,
+            boolean spillable) {
+        if (spillable) {
+            return new ExternalBuffer(ioManager, memoryPool, serializer);
+        } else {
+            return new InMemoryBuffer(memoryPool, serializer);
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
index 784fdfb65..ca1d65cae 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
@@ -52,7 +52,7 @@ import java.util.concurrent.ExecutorService;
 import static org.apache.paimon.io.DataFileMeta.getMaxSequenceNumber;
 
 /** {@link FileStoreWrite} for {@link AppendOnlyFileStore}. */
-public class AppendOnlyFileStoreWrite extends 
AbstractFileStoreWrite<InternalRow> {
+public class AppendOnlyFileStoreWrite extends 
MemoryFileStoreWrite<InternalRow> {
 
     private final FileIO fileIO;
     private final AppendOnlyFileStoreRead read;
@@ -65,6 +65,8 @@ public class AppendOnlyFileStoreWrite extends 
AbstractFileStoreWrite<InternalRow
     private final int compactionMaxFileNum;
     private final boolean commitForceCompact;
     private final String fileCompression;
+    private final boolean useWriteBuffer;
+    private final boolean spillable;
     private final FieldStatsCollector.Factory[] statsCollectors;
 
     private boolean skipCompaction;
@@ -80,7 +82,7 @@ public class AppendOnlyFileStoreWrite extends 
AbstractFileStoreWrite<InternalRow
             SnapshotManager snapshotManager,
             FileStoreScan scan,
             CoreOptions options) {
-        super(commitUser, snapshotManager, scan, null);
+        super(commitUser, snapshotManager, scan, options, null);
         this.fileIO = fileIO;
         this.read = read;
         this.schemaId = schemaId;
@@ -93,6 +95,8 @@ public class AppendOnlyFileStoreWrite extends 
AbstractFileStoreWrite<InternalRow
         this.commitForceCompact = options.commitForceCompact();
         this.skipCompaction = options.writeOnly();
         this.fileCompression = options.fileCompression();
+        this.useWriteBuffer = options.useWriteBuffer();
+        this.spillable = options.writeBufferSpillable(fileIO.isObjectStore(), 
isStreamingMode);
         this.statsCollectors =
                 StatsCollectorFactories.createStatsFactories(options, 
rowType.getFieldNames());
     }
@@ -121,6 +125,7 @@ public class AppendOnlyFileStoreWrite extends 
AbstractFileStoreWrite<InternalRow
 
         return new AppendOnlyWriter(
                 fileIO,
+                ioManager,
                 schemaId,
                 fileFormat,
                 targetFileSize,
@@ -130,6 +135,8 @@ public class AppendOnlyFileStoreWrite extends 
AbstractFileStoreWrite<InternalRow
                 commitForceCompact,
                 factory,
                 restoreIncrement,
+                useWriteBuffer,
+                spillable,
                 fileCompression,
                 statsCollectors);
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
index 680e483e2..5699ea415 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
@@ -24,12 +24,17 @@ import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.ChannelWithMeta;
+import org.apache.paimon.disk.ExternalBuffer;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.disk.InternalRowBuffer;
 import org.apache.paimon.format.FieldStats;
 import org.apache.paimon.format.FileFormat;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataFilePathFactory;
+import org.apache.paimon.memory.HeapMemorySegmentPool;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.stats.FieldStatsArraySerializer;
 import org.apache.paimon.types.DataType;
@@ -42,12 +47,15 @@ import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.RecordWriter;
 import org.apache.paimon.utils.StatsCollectorFactories;
 
+import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
+import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -306,6 +314,137 @@ public class AppendOnlyWriterTest {
         
assertThat(afterClosedUnexpectedly).containsExactlyInAnyOrderElementsOf(committedFiles);
     }
 
+    @Test
+    public void testExternalBufferWorks() throws Exception {
+        AppendOnlyWriter writer = createEmptyWriter(Long.MAX_VALUE, true);
+
+        // we give it a small Memory Pool, force it to spill
+        writer.setMemoryPool(new HeapMemorySegmentPool(16384L, 1024));
+
+        char[] s = new char[990];
+        Arrays.fill(s, 'a');
+
+        // set the record that much larger than the maxMemory
+        for (int j = 0; j < 100; j++) {
+            writer.write(row(j, String.valueOf(s), PART));
+        }
+
+        InternalRowBuffer buffer = writer.getWriteBuffer();
+        Assertions.assertThat(buffer.size()).isEqualTo(100);
+        
Assertions.assertThat(buffer.memoryOccupancy()).isLessThanOrEqualTo(16384L);
+
+        writer.close();
+    }
+
+    @Test
+    public void testNoBuffer() throws Exception {
+        AppendOnlyWriter writer = createEmptyWriter(Long.MAX_VALUE);
+
+        // we give it a small Memory Pool, force it to spill
+        writer.setMemoryPool(new HeapMemorySegmentPool(16384L, 1024));
+
+        InternalRowBuffer buffer = writer.getWriteBuffer();
+        Assertions.assertThat(buffer).isNull();
+
+        writer.close();
+    }
+
+    @Test
+    public void testMultipleFlush() throws Exception {
+        AppendOnlyWriter writer = createEmptyWriter(Long.MAX_VALUE, true);
+
+        // we give it a small Memory Pool, force it to spill
+        writer.setMemoryPool(new HeapMemorySegmentPool(16384L, 1024));
+
+        char[] s = new char[990];
+        Arrays.fill(s, 'a');
+
+        // set the record that much larger than the maxMemory
+        for (int j = 0; j < 100; j++) {
+            writer.write(row(j, String.valueOf(s), PART));
+        }
+
+        writer.flushMemory();
+        Assertions.assertThat(writer.memoryOccupancy()).isEqualTo(0L);
+        Assertions.assertThat(writer.getWriteBuffer().size()).isEqualTo(0);
+        Assertions.assertThat(writer.getNewFiles().size()).isGreaterThan(0);
+        long rowCount =
+                
writer.getNewFiles().stream().map(DataFileMeta::rowCount).reduce(0L, Long::sum);
+        Assertions.assertThat(rowCount).isEqualTo(100);
+
+        for (int j = 0; j < 100; j++) {
+            writer.write(row(j, String.valueOf(s), PART));
+        }
+        writer.flushMemory();
+
+        Assertions.assertThat(writer.memoryOccupancy()).isEqualTo(0L);
+        Assertions.assertThat(writer.getWriteBuffer().size()).isEqualTo(0);
+        Assertions.assertThat(writer.getNewFiles().size()).isGreaterThan(1);
+        rowCount = 
writer.getNewFiles().stream().map(DataFileMeta::rowCount).reduce(0L, Long::sum);
+        Assertions.assertThat(rowCount).isEqualTo(200);
+    }
+
+    @Test
+    public void testClose() throws Exception {
+        AppendOnlyWriter writer = createEmptyWriter(Long.MAX_VALUE, true);
+
+        // we give it a small Memory Pool, force it to spill
+        writer.setMemoryPool(new HeapMemorySegmentPool(16384L, 1024));
+
+        char[] s = new char[990];
+        Arrays.fill(s, 'a');
+
+        // set the record that much larger than the maxMemory
+        for (int j = 0; j < 100; j++) {
+            writer.write(row(j, String.valueOf(s), PART));
+        }
+
+        ExternalBuffer externalBuffer = (ExternalBuffer) 
writer.getWriteBuffer();
+        List<ChannelWithMeta> channel = externalBuffer.getSpillChannels();
+
+        writer.close();
+
+        for (ChannelWithMeta meta : channel) {
+            File file = new File(meta.getChannel().getPath());
+            Assertions.assertThat(file.exists()).isEqualTo(false);
+        }
+        Assertions.assertThat(writer.getWriteBuffer()).isEqualTo(null);
+    }
+
+    @Test
+    public void testNonSpillable() throws Exception {
+        AppendOnlyWriter writer = createEmptyWriter(Long.MAX_VALUE, true);
+
+        // we give it a small Memory Pool, force it to spill
+        writer.setMemoryPool(new HeapMemorySegmentPool(2048L, 1024));
+
+        char[] s = new char[990];
+        Arrays.fill(s, 'a');
+
+        // set the record that much larger than the maxMemory
+        for (int j = 0; j < 100; j++) {
+            writer.write(row(j, String.valueOf(s), PART));
+        }
+        // we got only one file after commit
+        
Assertions.assertThat(writer.prepareCommit(true).newFilesIncrement().newFiles().size())
+                .isEqualTo(1);
+        writer.close();
+
+        writer = createEmptyWriter(Long.MAX_VALUE, false);
+
+        // we give it a small Memory Pool, force it to spill
+        writer.setMemoryPool(new HeapMemorySegmentPool(2048L, 1024));
+
+        // set the record that much larger than the maxMemory
+        for (int j = 0; j < 100; j++) {
+            writer.write(row(j, String.valueOf(s), PART));
+        }
+        // we got 100 files
+        
Assertions.assertThat(writer.prepareCommit(true).newFilesIncrement().newFiles().size())
+                .isEqualTo(100);
+        writer.close();
+    }
+
     private FieldStats initStats(Integer min, Integer max, long nullCount) {
         return new FieldStats(min, max, nullCount);
     }
@@ -328,17 +467,48 @@ public class AppendOnlyWriterTest {
     }
 
     private AppendOnlyWriter createEmptyWriter(long targetFileSize) {
-        return createWriter(targetFileSize, false, 
Collections.emptyList()).getLeft();
+        return createWriter(targetFileSize, false, false, false, 
Collections.emptyList()).getLeft();
+    }
+
+    private AppendOnlyWriter createEmptyWriter(long targetFileSize, boolean 
spillable) {
+        return createWriter(targetFileSize, false, true, spillable, 
Collections.emptyList())
+                .getLeft();
     }
 
     private Pair<AppendOnlyWriter, List<DataFileMeta>> createWriter(
             long targetFileSize, boolean forceCompact, List<DataFileMeta> 
scannedFiles) {
-        return createWriter(targetFileSize, forceCompact, scannedFiles, new 
CountDownLatch(0));
+        return createWriter(
+                targetFileSize, forceCompact, true, true, scannedFiles, new 
CountDownLatch(0));
+    }
+
+    private Pair<AppendOnlyWriter, List<DataFileMeta>> createWriter(
+            long targetFileSize,
+            boolean forceCompact,
+            boolean useWriteBuffer,
+            boolean spillable,
+            List<DataFileMeta> scannedFiles) {
+        return createWriter(
+                targetFileSize,
+                forceCompact,
+                useWriteBuffer,
+                spillable,
+                scannedFiles,
+                new CountDownLatch(0));
+    }
+
+    private Pair<AppendOnlyWriter, List<DataFileMeta>> createWriter(
+            long targetFileSize,
+            boolean forceCompact,
+            List<DataFileMeta> scannedFiles,
+            CountDownLatch latch) {
+        return createWriter(targetFileSize, forceCompact, false, false, 
scannedFiles, latch);
     }
 
     private Pair<AppendOnlyWriter, List<DataFileMeta>> createWriter(
             long targetFileSize,
             boolean forceCompact,
+            boolean useWriteBuffer,
+            boolean spillable,
             List<DataFileMeta> scannedFiles,
             CountDownLatch latch) {
         FileFormat fileFormat = FileFormat.fromIdentifier(AVRO, new Options());
@@ -358,9 +528,11 @@ public class AppendOnlyWriterTest {
                                     : Collections.singletonList(
                                             
generateCompactAfter(compactBefore));
                         });
+        CoreOptions options = new CoreOptions(new HashMap<>());
         AppendOnlyWriter writer =
                 new AppendOnlyWriter(
                         LocalFileIO.create(),
+                        IOManager.create(tempDir.toString()),
                         SCHEMA_ID,
                         fileFormat,
                         targetFileSize,
@@ -370,10 +542,13 @@ public class AppendOnlyWriterTest {
                         forceCompact,
                         pathFactory,
                         null,
+                        useWriteBuffer,
+                        spillable,
                         CoreOptions.FILE_COMPRESSION.defaultValue(),
                         StatsCollectorFactories.createStatsFactories(
-                                new CoreOptions(new HashMap<>()),
-                                AppendOnlyWriterTest.SCHEMA.getFieldNames()));
+                                options, 
AppendOnlyWriterTest.SCHEMA.getFieldNames()));
+        writer.setMemoryPool(
+                new HeapMemorySegmentPool(options.writeBufferSize(), 
options.pageSize()));
         return Pair.of(writer, compactManager.allFiles());
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/disk/ExternalBufferTest.java 
b/paimon-core/src/test/java/org/apache/paimon/disk/ExternalBufferTest.java
index 5bd9ad511..6ca73bd6a 100644
--- a/paimon-core/src/test/java/org/apache/paimon/disk/ExternalBufferTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/disk/ExternalBufferTest.java
@@ -165,16 +165,17 @@ public class ExternalBufferTest {
         RandomDataGenerator random = new RandomDataGenerator();
         writer.writeString(0, 
BinaryString.fromString(random.nextHexString(500000)));
         writer.complete();
-        buffer.add(row);
+        buffer.put(row);
     }
 
-    private void assertBuffer(List<Long> expected, ExternalBuffer buffer) {
-        ExternalBuffer.BufferIterator iterator = buffer.newIterator();
+    private void assertBuffer(List<Long> expected, InternalRowBuffer buffer) {
+        InternalRowBuffer.InternalRowBufferIterator iterator = 
buffer.newIterator();
         assertBuffer(expected, iterator);
         iterator.close();
     }
 
-    private void assertBuffer(List<Long> expected, 
ExternalBuffer.BufferIterator iterator) {
+    private void assertBuffer(
+            List<Long> expected, InternalRowBuffer.InternalRowBufferIterator 
iterator) {
         List<Long> values = new ArrayList<>();
         while (iterator.advanceNext()) {
             values.add(iterator.getRow().getLong(0));
@@ -203,7 +204,7 @@ public class ExternalBufferTest {
         writer.reset();
         writer.writeLong(0, l);
         writer.complete();
-        buffer.add(row);
+        buffer.put(row);
         return l;
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/disk/InMemoryBufferTest.java 
b/paimon-core/src/test/java/org/apache/paimon/disk/InMemoryBufferTest.java
new file mode 100644
index 000000000..e1b127714
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/disk/InMemoryBufferTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.disk;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryRowWriter;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.memory.HeapMemorySegmentPool;
+import org.apache.paimon.types.DataTypes;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+import static org.apache.paimon.memory.MemorySegmentPool.DEFAULT_PAGE_SIZE;
+
+/** Tests for {@link InternalRowBuffer}. */
+public class InMemoryBufferTest {
+
+    private InternalRowSerializer serializer;
+
+    @BeforeEach
+    public void before() {
+        this.serializer = new InternalRowSerializer(DataTypes.STRING());
+    }
+
+    @Test
+    public void testNonSpill() throws Exception {
+        InMemoryBuffer buffer =
+                new InMemoryBuffer(
+                        new HeapMemorySegmentPool(2 * DEFAULT_PAGE_SIZE, 
DEFAULT_PAGE_SIZE),
+                        this.serializer);
+
+        BinaryRow binaryRow = new BinaryRow(1);
+        BinaryRowWriter binaryRowWriter = new BinaryRowWriter(binaryRow);
+
+        byte[] s = new byte[20 * 1024];
+        Arrays.fill(s, (byte) 'a');
+        binaryRowWriter.writeString(0, BinaryString.fromBytes(s));
+        binaryRowWriter.complete();
+
+        boolean result = buffer.put(binaryRow);
+        Assertions.assertThat(result).isTrue();
+        result = buffer.put(binaryRow);
+        Assertions.assertThat(result).isTrue();
+        result = buffer.put(binaryRow);
+        Assertions.assertThat(result).isTrue();
+        result = buffer.put(binaryRow);
+        Assertions.assertThat(result).isFalse();
+    }
+
+    @Test
+    public void testPutRead() throws Exception {
+        InMemoryBuffer buffer =
+                new InMemoryBuffer(
+                        new HeapMemorySegmentPool(2 * DEFAULT_PAGE_SIZE, 
DEFAULT_PAGE_SIZE),
+                        this.serializer);
+
+        BinaryRow binaryRow = new BinaryRow(1);
+        BinaryRowWriter binaryRowWriter = new BinaryRowWriter(binaryRow);
+
+        byte[] s = new byte[10];
+        Arrays.fill(s, (byte) 'a');
+        binaryRowWriter.writeString(0, BinaryString.fromBytes(s));
+        binaryRowWriter.complete();
+        for (int i = 0; i < 100; i++) {
+            buffer.put(binaryRow.copy());
+        }
+
+        Assertions.assertThat(buffer.size()).isEqualTo(100);
+        try (InternalRowBuffer.InternalRowBufferIterator iterator = 
buffer.newIterator()) {
+            while (iterator.advanceNext()) {
+                Assertions.assertThat(iterator.getRow()).isEqualTo(binaryRow);
+            }
+        }
+    }
+
+    @Test
+    public void testClose() throws Exception {
+        InMemoryBuffer buffer =
+                new InMemoryBuffer(
+                        new HeapMemorySegmentPool(2 * DEFAULT_PAGE_SIZE, 
DEFAULT_PAGE_SIZE),
+                        this.serializer);
+
+        BinaryRow binaryRow = new BinaryRow(1);
+        BinaryRowWriter binaryRowWriter = new BinaryRowWriter(binaryRow);
+
+        byte[] s = new byte[10];
+        Arrays.fill(s, (byte) 'a');
+        binaryRowWriter.writeString(0, BinaryString.fromBytes(s));
+        binaryRowWriter.complete();
+        buffer.put(binaryRow.copy());
+
+        Assertions.assertThat(buffer.memoryOccupancy()).isGreaterThan(0);
+        buffer.reset();
+        Assertions.assertThat(buffer.memoryOccupancy()).isEqualTo(0);
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java 
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
index 9e88548d7..3404214d1 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
@@ -23,12 +23,14 @@ import org.apache.paimon.append.AppendOnlyCompactManager;
 import org.apache.paimon.append.AppendOnlyWriter;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataFilePathFactory;
 import org.apache.paimon.io.KeyValueFileReadWriteTest;
 import org.apache.paimon.io.KeyValueFileWriterFactory;
+import org.apache.paimon.memory.HeapMemorySegmentPool;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.IntType;
@@ -64,9 +66,11 @@ public class FileFormatSuffixTest extends 
KeyValueFileReadWriteTest {
                 new DataFilePathFactory(new Path(tempDir.toString()), "dt=1", 
1, format);
         FileFormat fileFormat = FileFormat.fromIdentifier(format, new 
Options());
         LinkedList<DataFileMeta> toCompact = new LinkedList<>();
+        CoreOptions options = new CoreOptions(new HashMap<>());
         AppendOnlyWriter appendOnlyWriter =
                 new AppendOnlyWriter(
                         LocalFileIO.create(),
+                        IOManager.create(tempDir.toString()),
                         0,
                         fileFormat,
                         10,
@@ -76,9 +80,13 @@ public class FileFormatSuffixTest extends 
KeyValueFileReadWriteTest {
                         false,
                         dataFilePathFactory,
                         null,
+                        false,
+                        false,
                         CoreOptions.FILE_COMPRESSION.defaultValue(),
                         StatsCollectorFactories.createStatsFactories(
-                                new CoreOptions(new HashMap<>()), 
SCHEMA.getFieldNames()));
+                                options, SCHEMA.getFieldNames()));
+        appendOnlyWriter.setMemoryPool(
+                new HeapMemorySegmentPool(options.writeBufferSize(), 
options.pageSize()));
         appendOnlyWriter.write(
                 GenericRow.of(1, BinaryString.fromString("aaa"), 
BinaryString.fromString("1")));
         CommitIncrement increment = appendOnlyWriter.prepareCommit(true);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
index 0c10b01ff..fd36d4a39 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
@@ -21,8 +21,8 @@ package org.apache.paimon.flink.sink.index;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.serializer.InternalRowSerializer;
-import org.apache.paimon.disk.ExternalBuffer;
 import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.disk.InternalRowBuffer;
 import org.apache.paimon.flink.FlinkRowData;
 import org.apache.paimon.flink.FlinkRowWrapper;
 import org.apache.paimon.flink.sink.RowDataPartitionKeyExtractor;
@@ -60,7 +60,7 @@ public class GlobalIndexAssignerOperator<T> extends 
AbstractStreamOperator<Tuple
     private final SerializableFunction<T, InternalRow> toRow;
     private final SerializableFunction<InternalRow, T> fromRow;
 
-    private transient ExternalBuffer bootstrapBuffer;
+    private transient InternalRowBuffer bootstrapBuffer;
 
     public GlobalIndexAssignerOperator(
             Table table,
@@ -89,10 +89,11 @@ public class GlobalIndexAssignerOperator<T> extends 
AbstractStreamOperator<Tuple
         long bufferSize = 
options.get(CoreOptions.WRITE_BUFFER_SIZE).getBytes();
         long pageSize = options.get(CoreOptions.PAGE_SIZE).getBytes();
         bootstrapBuffer =
-                new ExternalBuffer(
+                InternalRowBuffer.getBuffer(
                         
IOManager.create(ioManager.getSpillingDirectoriesPaths()),
                         new HeapMemorySegmentPool(bufferSize, (int) pageSize),
-                        new InternalRowSerializer(table.rowType()));
+                        new InternalRowSerializer(table.rowType()),
+                        true);
     }
 
     @Override
@@ -106,7 +107,9 @@ public class GlobalIndexAssignerOperator<T> extends 
AbstractStreamOperator<Tuple
                 break;
             case ROW:
                 if (bootstrapBuffer != null) {
-                    bootstrapBuffer.add(toRow.apply(value));
+                    // ignore return value, we must enable spillable for 
bootstrapBuffer, so return
+                    // is always true
+                    bootstrapBuffer.put(toRow.apply(value));
                 } else {
                     assigner.process(value);
                 }
@@ -127,7 +130,8 @@ public class GlobalIndexAssignerOperator<T> extends 
AbstractStreamOperator<Tuple
     private void endBootstrap() throws Exception {
         if (bootstrapBuffer != null) {
             bootstrapBuffer.complete();
-            try (ExternalBuffer.BufferIterator iterator = 
bootstrapBuffer.newIterator()) {
+            try (InternalRowBuffer.InternalRowBufferIterator iterator =
+                    bootstrapBuffer.newIterator()) {
                 while (iterator.advanceNext()) {
                     assigner.process(fromRow.apply(iterator.getRow()));
                 }

Reply via email to