This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push: new 3d738a9b8a [format] Introduce 'write.batch-memory' to control memory in arrow (#5988) 3d738a9b8a is described below commit 3d738a9b8a7ed7260e4d59c400a053d8bdfc845e Author: YeJunHao <41894543+leaves12...@users.noreply.github.com> AuthorDate: Thu Jul 31 12:10:22 2025 +0800 [format] Introduce 'write.batch-memory' to control memory in arrow (#5988) --- .../shortcodes/generated/core_configuration.html | 6 +++ .../main/java/org/apache/paimon/CoreOptions.java | 6 +++ .../paimon/arrow/vector/ArrowFormatCWriter.java | 14 +++++- .../paimon/arrow/vector/ArrowFormatWriter.java | 43 +++++++++++++++-- .../paimon/arrow/vector/ArrowFormatWriterTest.java | 54 ++++++++++++++++++++++ .../java/org/apache/paimon/format/FileFormat.java | 1 + .../apache/paimon/format/FileFormatFactory.java | 18 +++++++- .../format/parquet/ParquetFileFormatTest.java | 4 +- .../paimon/format/lance/LanceFileFormat.java | 7 ++- 9 files changed, 144 insertions(+), 9 deletions(-) diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 85fa54aa63..c287901335 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -1248,6 +1248,12 @@ If the data size allocated for the sorting task is uneven,which may lead to perf <td>Boolean</td> <td>If set to true, compactions and snapshot expiration will be skipped. This option is used along with dedicated compact jobs.</td> </tr> + <tr> + <td><h5>write.batch-memory</h5></td> + <td style="word-wrap: break-word;">128 mb</td> + <td>MemorySize</td> + <td>Write batch memory for any file format if it supports.</td> + </tr> <tr> <td><h5>write.batch-size</h5></td> <td style="word-wrap: break-word;">1024</td> diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index ca2c67d0f4..54802731b4 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -1179,6 +1179,12 @@ public class CoreOptions implements Serializable { .withFallbackKeys("orc.write.batch-size") .withDescription("Write batch size for any file format if it supports."); + public static final ConfigOption<MemorySize> WRITE_BATCH_MEMORY = + key("write.batch-memory") + .memoryType() + .defaultValue(MemorySize.parse("128 mb")) + .withDescription("Write batch memory for any file format if it supports."); + public static final ConfigOption<String> CONSUMER_ID = key("consumer-id") .stringType() diff --git a/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatCWriter.java b/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatCWriter.java index 2f1e2f2a53..afa58250c3 100644 --- a/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatCWriter.java +++ b/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatCWriter.java @@ -27,6 +27,8 @@ import org.apache.arrow.c.ArrowSchema; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.vector.VectorSchemaRoot; +import javax.annotation.Nullable; + /** * This writer could flush to c struct, but you need to release it, except it has been released in c * code. @@ -38,12 +40,20 @@ public class ArrowFormatCWriter implements AutoCloseable { private final ArrowFormatWriter realWriter; public ArrowFormatCWriter(RowType rowType, int writeBatchSize, boolean caseSensitive) { - this(new ArrowFormatWriter(rowType, writeBatchSize, caseSensitive)); + this(new ArrowFormatWriter(rowType, writeBatchSize, caseSensitive, null)); + } + + public ArrowFormatCWriter( + RowType rowType, + int writeBatchSize, + boolean caseSensitive, + @Nullable Long memoryUsedMaxInVSR) { + this(new ArrowFormatWriter(rowType, writeBatchSize, caseSensitive, memoryUsedMaxInVSR)); } public ArrowFormatCWriter( RowType rowType, int writeBatchSize, boolean caseSensitive, BufferAllocator allocator) { - this(new ArrowFormatWriter(rowType, writeBatchSize, caseSensitive, allocator)); + this(new ArrowFormatWriter(rowType, writeBatchSize, caseSensitive, allocator, null)); } private ArrowFormatCWriter(ArrowFormatWriter arrowFormatWriter) { diff --git a/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatWriter.java b/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatWriter.java index a957fa2288..44c764804b 100644 --- a/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatWriter.java +++ b/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatWriter.java @@ -27,11 +27,14 @@ import org.apache.paimon.types.RowType; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.util.OversizedAllocationException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + /** Write from {@link InternalRow} to {@link VectorSchemaRoot}. */ public class ArrowFormatWriter implements AutoCloseable { @@ -39,18 +42,29 @@ public class ArrowFormatWriter implements AutoCloseable { private final VectorSchemaRoot vectorSchemaRoot; private final ArrowFieldWriter[] fieldWriters; - private final int batchSize; - private final BufferAllocator allocator; + @Nullable private final Long memoryUsedMaxInBytes; private int rowId; public ArrowFormatWriter(RowType rowType, int writeBatchSize, boolean caseSensitive) { - this(rowType, writeBatchSize, caseSensitive, new RootAllocator()); + this(rowType, writeBatchSize, caseSensitive, new RootAllocator(), null); } public ArrowFormatWriter( - RowType rowType, int writeBatchSize, boolean caseSensitive, BufferAllocator allocator) { + RowType rowType, + int writeBatchSize, + boolean caseSensitive, + @Nullable Long memoryUsedMaxInBytes) { + this(rowType, writeBatchSize, caseSensitive, new RootAllocator(), memoryUsedMaxInBytes); + } + + public ArrowFormatWriter( + RowType rowType, + int writeBatchSize, + boolean caseSensitive, + BufferAllocator allocator, + @Nullable Long memoryUsedMaxInBytes) { this.allocator = allocator; vectorSchemaRoot = ArrowUtils.createVectorSchemaRoot(rowType, allocator, caseSensitive); @@ -65,6 +79,7 @@ public class ArrowFormatWriter implements AutoCloseable { } this.batchSize = writeBatchSize; + this.memoryUsedMaxInBytes = memoryUsedMaxInBytes; } public void flush() { @@ -75,6 +90,17 @@ public class ArrowFormatWriter implements AutoCloseable { if (rowId >= batchSize) { return false; } + if (memoryUsedMaxInBytes != null && rowId % 32 == 0) { + long memoryUsed = memoryUsed(); + if (memoryUsed > memoryUsedMaxInBytes) { + LOG.debug( + "Memory used by ArrowFormatCWriter exceeds the limit: {} > {} while writing record row id: {}", + memoryUsed, + memoryUsedMaxInBytes, + rowId); + return false; + } + } for (int i = 0; i < currentRow.getFieldCount(); i++) { try { fieldWriters[i].write(rowId, currentRow, i); @@ -89,6 +115,15 @@ public class ArrowFormatWriter implements AutoCloseable { return true; } + public long memoryUsed() { + vectorSchemaRoot.setRowCount(rowId); + long memoryUsed = 0; + for (FieldVector fieldVector : vectorSchemaRoot.getFieldVectors()) { + memoryUsed += fieldVector.getBufferSize(); + } + return memoryUsed; + } + public boolean empty() { return rowId == 0; } diff --git a/paimon-arrow/src/test/java/org/apache/paimon/arrow/vector/ArrowFormatWriterTest.java b/paimon-arrow/src/test/java/org/apache/paimon/arrow/vector/ArrowFormatWriterTest.java index d7e857c111..d7ee8ca1a5 100644 --- a/paimon-arrow/src/test/java/org/apache/paimon/arrow/vector/ArrowFormatWriterTest.java +++ b/paimon-arrow/src/test/java/org/apache/paimon/arrow/vector/ArrowFormatWriterTest.java @@ -42,6 +42,7 @@ import org.junit.jupiter.params.provider.ValueSource; import java.math.BigDecimal; import java.util.ArrayList; +import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Random; @@ -165,6 +166,59 @@ public class ArrowFormatWriterTest { } } + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testWriteWithMemoryLimit(boolean limitMemory) { + RowType rowType = + new RowType( + Arrays.asList( + new DataField(0, "f0", DataTypes.BYTES()), + new DataField(1, "f1", DataTypes.BYTES()))); + Long memoryLimit = limitMemory ? 100 * 1024 * 1024L : null; + try (ArrowFormatWriter writer = new ArrowFormatWriter(rowType, 4096, true, memoryLimit)) { + + GenericRow genericRow = new GenericRow(2); + genericRow.setField(0, randomBytes(1024 * 1024, 1024 * 1024)); + genericRow.setField(1, randomBytes(1024 * 1024, 1024 * 1024)); + + // normal write + for (int i = 0; i < 200; i++) { + boolean success = writer.write(genericRow); + if (!success) { + writer.flush(); + writer.reset(); + writer.write(genericRow); + } + } + writer.reset(); + + if (limitMemory) { + for (int i = 0; i < 64; i++) { + Assertions.assertThat(writer.write(genericRow)).isTrue(); + } + Assertions.assertThat(writer.write(genericRow)).isFalse(); + } + writer.reset(); + + // Write batch records + for (int i = 0; i < 2000; i++) { + boolean success = writer.write(genericRow); + if (!success) { + writer.flush(); + writer.reset(); + writer.write(genericRow); + } + } + + if (limitMemory) { + Assertions.assertThat(writer.memoryUsed()).isLessThan(memoryLimit); + Assertions.assertThat(writer.getAllocator().getAllocatedMemory()) + .isGreaterThan(memoryLimit) + .isLessThan(2 * memoryLimit); + } + } + } + @Test public void testArrowBundleRecords() { try (ArrowFormatWriter writer = new ArrowFormatWriter(PRIMITIVE_TYPE, 4096, true)) { diff --git a/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java b/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java index bef47edbf3..7ee33e6843 100644 --- a/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java +++ b/paimon-common/src/main/java/org/apache/paimon/format/FileFormat.java @@ -84,6 +84,7 @@ public abstract class FileFormat { options, options.get(CoreOptions.READ_BATCH_SIZE), options.get(CoreOptions.WRITE_BATCH_SIZE), + options.get(CoreOptions.WRITE_BATCH_MEMORY), options.get(CoreOptions.FILE_COMPRESSION_ZSTD_LEVEL), options.get(CoreOptions.FILE_BLOCK_SIZE))); } diff --git a/paimon-common/src/main/java/org/apache/paimon/format/FileFormatFactory.java b/paimon-common/src/main/java/org/apache/paimon/format/FileFormatFactory.java index b726a84f24..79354b9c26 100644 --- a/paimon-common/src/main/java/org/apache/paimon/format/FileFormatFactory.java +++ b/paimon-common/src/main/java/org/apache/paimon/format/FileFormatFactory.java @@ -37,23 +37,35 @@ public interface FileFormatFactory { private final Options options; private final int readBatchSize; private final int writeBatchSize; + private final MemorySize writeBatchMemory; private final int zstdLevel; @Nullable private final MemorySize blockSize; @VisibleForTesting public FormatContext(Options options, int readBatchSize, int writeBatchSize) { - this(options, readBatchSize, writeBatchSize, 1, null); + this(options, readBatchSize, writeBatchSize, MemorySize.VALUE_128_MB, 1, null); } + @VisibleForTesting public FormatContext( Options options, int readBatchSize, int writeBatchSize, + MemorySize writeBatchMemory) { + this(options, readBatchSize, writeBatchSize, writeBatchMemory, 1, null); + } + + public FormatContext( + Options options, + int readBatchSize, + int writeBatchSize, + MemorySize writeBatchMemory, int zstdLevel, @Nullable MemorySize blockSize) { this.options = options; this.readBatchSize = readBatchSize; this.writeBatchSize = writeBatchSize; + this.writeBatchMemory = writeBatchMemory; this.zstdLevel = zstdLevel; this.blockSize = blockSize; } @@ -70,6 +82,10 @@ public interface FileFormatFactory { return writeBatchSize; } + public MemorySize writeBatchMemory() { + return writeBatchMemory; + } + public int zstdLevel() { return zstdLevel; } diff --git a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFileFormatTest.java b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFileFormatTest.java index b51f8fc05d..72159c1ad6 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFileFormatTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFileFormatTest.java @@ -22,6 +22,7 @@ import org.apache.paimon.format.FileFormatFactory.FormatContext; import org.apache.paimon.format.parquet.writer.RowDataParquetBuilder; import org.apache.paimon.options.ConfigOption; import org.apache.paimon.options.ConfigOptions; +import org.apache.paimon.options.MemorySize; import org.apache.paimon.options.Options; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypes; @@ -48,7 +49,8 @@ public class ParquetFileFormatTest { Options options = new Options(); options.set(parquetKey, "hello"); options.set(otherKey, "test"); - FormatContext context = new FormatContext(options, 1024, 1024, 2, null); + FormatContext context = + new FormatContext(options, 1024, 1024, MemorySize.VALUE_128_MB, 2, null); Options actual = new ParquetFileFormat(context).getOptions(); assertThat(actual.get(parquetKey)).isEqualTo("hello"); diff --git a/paimon-lance/src/main/java/org/apache/paimon/format/lance/LanceFileFormat.java b/paimon-lance/src/main/java/org/apache/paimon/format/lance/LanceFileFormat.java index 72f99b2a32..c476652640 100644 --- a/paimon-lance/src/main/java/org/apache/paimon/format/lance/LanceFileFormat.java +++ b/paimon-lance/src/main/java/org/apache/paimon/format/lance/LanceFileFormat.java @@ -75,7 +75,12 @@ public class LanceFileFormat extends FileFormat { @Override public FormatWriterFactory createWriterFactory(RowType type) { return new LanceWriterFactory( - () -> new ArrowFormatWriter(type, formatContext.writeBatchSize(), true)); + () -> + new ArrowFormatWriter( + type, + formatContext.writeBatchSize(), + true, + formatContext.writeBatchMemory().getBytes())); } @Override