This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d738a9b8a [format] Introduce 'write.batch-memory' to control memory 
in arrow (#5988)
3d738a9b8a is described below

commit 3d738a9b8a7ed7260e4d59c400a053d8bdfc845e
Author: YeJunHao <41894543+leaves12...@users.noreply.github.com>
AuthorDate: Thu Jul 31 12:10:22 2025 +0800

    [format] Introduce 'write.batch-memory' to control memory in arrow (#5988)
---
 .../shortcodes/generated/core_configuration.html   |  6 +++
 .../main/java/org/apache/paimon/CoreOptions.java   |  6 +++
 .../paimon/arrow/vector/ArrowFormatCWriter.java    | 14 +++++-
 .../paimon/arrow/vector/ArrowFormatWriter.java     | 43 +++++++++++++++--
 .../paimon/arrow/vector/ArrowFormatWriterTest.java | 54 ++++++++++++++++++++++
 .../java/org/apache/paimon/format/FileFormat.java  |  1 +
 .../apache/paimon/format/FileFormatFactory.java    | 18 +++++++-
 .../format/parquet/ParquetFileFormatTest.java      |  4 +-
 .../paimon/format/lance/LanceFileFormat.java       |  7 ++-
 9 files changed, 144 insertions(+), 9 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 85fa54aa63..c287901335 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -1248,6 +1248,12 @@ If the data size allocated for the sorting task is 
uneven,which may lead to perf
             <td>Boolean</td>
             <td>If set to true, compactions and snapshot expiration will be 
skipped. This option is used along with dedicated compact jobs.</td>
         </tr>
+        <tr>
+            <td><h5>write.batch-memory</h5></td>
+            <td style="word-wrap: break-word;">128 mb</td>
+            <td>MemorySize</td>
+            <td>Write batch memory for any file format if it supports.</td>
+        </tr>
         <tr>
             <td><h5>write.batch-size</h5></td>
             <td style="word-wrap: break-word;">1024</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index ca2c67d0f4..54802731b4 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1179,6 +1179,12 @@ public class CoreOptions implements Serializable {
                     .withFallbackKeys("orc.write.batch-size")
                     .withDescription("Write batch size for any file format if 
it supports.");
 
+    public static final ConfigOption<MemorySize> WRITE_BATCH_MEMORY =
+            key("write.batch-memory")
+                    .memoryType()
+                    .defaultValue(MemorySize.parse("128 mb"))
+                    .withDescription("Write batch memory for any file format 
if it supports.");
+
     public static final ConfigOption<String> CONSUMER_ID =
             key("consumer-id")
                     .stringType()
diff --git 
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatCWriter.java
 
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatCWriter.java
index 2f1e2f2a53..afa58250c3 100644
--- 
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatCWriter.java
+++ 
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatCWriter.java
@@ -27,6 +27,8 @@ import org.apache.arrow.c.ArrowSchema;
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.vector.VectorSchemaRoot;
 
+import javax.annotation.Nullable;
+
 /**
  * This writer could flush to c struct, but you need to release it, except it 
has been released in c
  * code.
@@ -38,12 +40,20 @@ public class ArrowFormatCWriter implements AutoCloseable {
     private final ArrowFormatWriter realWriter;
 
     public ArrowFormatCWriter(RowType rowType, int writeBatchSize, boolean 
caseSensitive) {
-        this(new ArrowFormatWriter(rowType, writeBatchSize, caseSensitive));
+        this(new ArrowFormatWriter(rowType, writeBatchSize, caseSensitive, 
null));
+    }
+
+    public ArrowFormatCWriter(
+            RowType rowType,
+            int writeBatchSize,
+            boolean caseSensitive,
+            @Nullable Long memoryUsedMaxInVSR) {
+        this(new ArrowFormatWriter(rowType, writeBatchSize, caseSensitive, 
memoryUsedMaxInVSR));
     }
 
     public ArrowFormatCWriter(
             RowType rowType, int writeBatchSize, boolean caseSensitive, 
BufferAllocator allocator) {
-        this(new ArrowFormatWriter(rowType, writeBatchSize, caseSensitive, 
allocator));
+        this(new ArrowFormatWriter(rowType, writeBatchSize, caseSensitive, 
allocator, null));
     }
 
     private ArrowFormatCWriter(ArrowFormatWriter arrowFormatWriter) {
diff --git 
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatWriter.java
 
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatWriter.java
index a957fa2288..44c764804b 100644
--- 
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatWriter.java
+++ 
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatWriter.java
@@ -27,11 +27,14 @@ import org.apache.paimon.types.RowType;
 
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.FieldVector;
 import org.apache.arrow.vector.VectorSchemaRoot;
 import org.apache.arrow.vector.util.OversizedAllocationException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 /** Write from {@link InternalRow} to {@link VectorSchemaRoot}. */
 public class ArrowFormatWriter implements AutoCloseable {
 
@@ -39,18 +42,29 @@ public class ArrowFormatWriter implements AutoCloseable {
 
     private final VectorSchemaRoot vectorSchemaRoot;
     private final ArrowFieldWriter[] fieldWriters;
-
     private final int batchSize;
-
     private final BufferAllocator allocator;
+    @Nullable private final Long memoryUsedMaxInBytes;
     private int rowId;
 
     public ArrowFormatWriter(RowType rowType, int writeBatchSize, boolean 
caseSensitive) {
-        this(rowType, writeBatchSize, caseSensitive, new RootAllocator());
+        this(rowType, writeBatchSize, caseSensitive, new RootAllocator(), 
null);
     }
 
     public ArrowFormatWriter(
-            RowType rowType, int writeBatchSize, boolean caseSensitive, 
BufferAllocator allocator) {
+            RowType rowType,
+            int writeBatchSize,
+            boolean caseSensitive,
+            @Nullable Long memoryUsedMaxInBytes) {
+        this(rowType, writeBatchSize, caseSensitive, new RootAllocator(), 
memoryUsedMaxInBytes);
+    }
+
+    public ArrowFormatWriter(
+            RowType rowType,
+            int writeBatchSize,
+            boolean caseSensitive,
+            BufferAllocator allocator,
+            @Nullable Long memoryUsedMaxInBytes) {
         this.allocator = allocator;
 
         vectorSchemaRoot = ArrowUtils.createVectorSchemaRoot(rowType, 
allocator, caseSensitive);
@@ -65,6 +79,7 @@ public class ArrowFormatWriter implements AutoCloseable {
         }
 
         this.batchSize = writeBatchSize;
+        this.memoryUsedMaxInBytes = memoryUsedMaxInBytes;
     }
 
     public void flush() {
@@ -75,6 +90,17 @@ public class ArrowFormatWriter implements AutoCloseable {
         if (rowId >= batchSize) {
             return false;
         }
+        if (memoryUsedMaxInBytes != null && rowId % 32 == 0) {
+            long memoryUsed = memoryUsed();
+            if (memoryUsed > memoryUsedMaxInBytes) {
+                LOG.debug(
+                        "Memory used by ArrowFormatCWriter exceeds the limit: 
{} > {} while writing record row id: {}",
+                        memoryUsed,
+                        memoryUsedMaxInBytes,
+                        rowId);
+                return false;
+            }
+        }
         for (int i = 0; i < currentRow.getFieldCount(); i++) {
             try {
                 fieldWriters[i].write(rowId, currentRow, i);
@@ -89,6 +115,15 @@ public class ArrowFormatWriter implements AutoCloseable {
         return true;
     }
 
+    public long memoryUsed() {
+        vectorSchemaRoot.setRowCount(rowId);
+        long memoryUsed = 0;
+        for (FieldVector fieldVector : vectorSchemaRoot.getFieldVectors()) {
+            memoryUsed += fieldVector.getBufferSize();
+        }
+        return memoryUsed;
+    }
+
     public boolean empty() {
         return rowId == 0;
     }
diff --git 
a/paimon-arrow/src/test/java/org/apache/paimon/arrow/vector/ArrowFormatWriterTest.java
 
b/paimon-arrow/src/test/java/org/apache/paimon/arrow/vector/ArrowFormatWriterTest.java
index d7e857c111..d7ee8ca1a5 100644
--- 
a/paimon-arrow/src/test/java/org/apache/paimon/arrow/vector/ArrowFormatWriterTest.java
+++ 
b/paimon-arrow/src/test/java/org/apache/paimon/arrow/vector/ArrowFormatWriterTest.java
@@ -42,6 +42,7 @@ import org.junit.jupiter.params.provider.ValueSource;
 
 import java.math.BigDecimal;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Random;
@@ -165,6 +166,59 @@ public class ArrowFormatWriterTest {
         }
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void testWriteWithMemoryLimit(boolean limitMemory) {
+        RowType rowType =
+                new RowType(
+                        Arrays.asList(
+                                new DataField(0, "f0", DataTypes.BYTES()),
+                                new DataField(1, "f1", DataTypes.BYTES())));
+        Long memoryLimit = limitMemory ? 100 * 1024 * 1024L : null;
+        try (ArrowFormatWriter writer = new ArrowFormatWriter(rowType, 4096, 
true, memoryLimit)) {
+
+            GenericRow genericRow = new GenericRow(2);
+            genericRow.setField(0, randomBytes(1024 * 1024, 1024 * 1024));
+            genericRow.setField(1, randomBytes(1024 * 1024, 1024 * 1024));
+
+            // normal write
+            for (int i = 0; i < 200; i++) {
+                boolean success = writer.write(genericRow);
+                if (!success) {
+                    writer.flush();
+                    writer.reset();
+                    writer.write(genericRow);
+                }
+            }
+            writer.reset();
+
+            if (limitMemory) {
+                for (int i = 0; i < 64; i++) {
+                    Assertions.assertThat(writer.write(genericRow)).isTrue();
+                }
+                Assertions.assertThat(writer.write(genericRow)).isFalse();
+            }
+            writer.reset();
+
+            // Write batch records
+            for (int i = 0; i < 2000; i++) {
+                boolean success = writer.write(genericRow);
+                if (!success) {
+                    writer.flush();
+                    writer.reset();
+                    writer.write(genericRow);
+                }
+            }
+
+            if (limitMemory) {
+                
Assertions.assertThat(writer.memoryUsed()).isLessThan(memoryLimit);
+                
Assertions.assertThat(writer.getAllocator().getAllocatedMemory())
+                        .isGreaterThan(memoryLimit)
+                        .isLessThan(2 * memoryLimit);
+            }
+        }
+    }
+
     @Test
     public void testArrowBundleRecords() {
         try (ArrowFormatWriter writer = new ArrowFormatWriter(PRIMITIVE_TYPE, 
4096, true)) {
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java 
b/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java
index bef47edbf3..7ee33e6843 100644
--- a/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java
+++ b/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java
@@ -84,6 +84,7 @@ public abstract class FileFormat {
                         options,
                         options.get(CoreOptions.READ_BATCH_SIZE),
                         options.get(CoreOptions.WRITE_BATCH_SIZE),
+                        options.get(CoreOptions.WRITE_BATCH_MEMORY),
                         options.get(CoreOptions.FILE_COMPRESSION_ZSTD_LEVEL),
                         options.get(CoreOptions.FILE_BLOCK_SIZE)));
     }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/format/FileFormatFactory.java 
b/paimon-common/src/main/java/org/apache/paimon/format/FileFormatFactory.java
index b726a84f24..79354b9c26 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/format/FileFormatFactory.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/format/FileFormatFactory.java
@@ -37,23 +37,35 @@ public interface FileFormatFactory {
         private final Options options;
         private final int readBatchSize;
         private final int writeBatchSize;
+        private final MemorySize writeBatchMemory;
         private final int zstdLevel;
         @Nullable private final MemorySize blockSize;
 
         @VisibleForTesting
         public FormatContext(Options options, int readBatchSize, int 
writeBatchSize) {
-            this(options, readBatchSize, writeBatchSize, 1, null);
+            this(options, readBatchSize, writeBatchSize, 
MemorySize.VALUE_128_MB, 1, null);
         }
 
+        @VisibleForTesting
         public FormatContext(
                 Options options,
                 int readBatchSize,
                 int writeBatchSize,
+                MemorySize writeBatchMemory) {
+            this(options, readBatchSize, writeBatchSize, writeBatchMemory, 1, 
null);
+        }
+
+        public FormatContext(
+                Options options,
+                int readBatchSize,
+                int writeBatchSize,
+                MemorySize writeBatchMemory,
                 int zstdLevel,
                 @Nullable MemorySize blockSize) {
             this.options = options;
             this.readBatchSize = readBatchSize;
             this.writeBatchSize = writeBatchSize;
+            this.writeBatchMemory = writeBatchMemory;
             this.zstdLevel = zstdLevel;
             this.blockSize = blockSize;
         }
@@ -70,6 +82,10 @@ public interface FileFormatFactory {
             return writeBatchSize;
         }
 
+        public MemorySize writeBatchMemory() {
+            return writeBatchMemory;
+        }
+
         public int zstdLevel() {
             return zstdLevel;
         }
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFileFormatTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFileFormatTest.java
index b51f8fc05d..72159c1ad6 100644
--- 
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFileFormatTest.java
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFileFormatTest.java
@@ -22,6 +22,7 @@ import 
org.apache.paimon.format.FileFormatFactory.FormatContext;
 import org.apache.paimon.format.parquet.writer.RowDataParquetBuilder;
 import org.apache.paimon.options.ConfigOption;
 import org.apache.paimon.options.ConfigOptions;
+import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataTypes;
@@ -48,7 +49,8 @@ public class ParquetFileFormatTest {
         Options options = new Options();
         options.set(parquetKey, "hello");
         options.set(otherKey, "test");
-        FormatContext context = new FormatContext(options, 1024, 1024, 2, 
null);
+        FormatContext context =
+                new FormatContext(options, 1024, 1024, 
MemorySize.VALUE_128_MB, 2, null);
 
         Options actual = new ParquetFileFormat(context).getOptions();
         assertThat(actual.get(parquetKey)).isEqualTo("hello");
diff --git 
a/paimon-lance/src/main/java/org/apache/paimon/format/lance/LanceFileFormat.java
 
b/paimon-lance/src/main/java/org/apache/paimon/format/lance/LanceFileFormat.java
index 72f99b2a32..c476652640 100644
--- 
a/paimon-lance/src/main/java/org/apache/paimon/format/lance/LanceFileFormat.java
+++ 
b/paimon-lance/src/main/java/org/apache/paimon/format/lance/LanceFileFormat.java
@@ -75,7 +75,12 @@ public class LanceFileFormat extends FileFormat {
     @Override
     public FormatWriterFactory createWriterFactory(RowType type) {
         return new LanceWriterFactory(
-                () -> new ArrowFormatWriter(type, 
formatContext.writeBatchSize(), true));
+                () ->
+                        new ArrowFormatWriter(
+                                type,
+                                formatContext.writeBatchSize(),
+                                true,
+                                formatContext.writeBatchMemory().getBytes()));
     }
 
     @Override

Reply via email to