This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 737da60bb3 [arrow] support ArrowFormatWriter to use an external
BufferAllocator to provide finer-grained memory control and monitor allocation
events. (#5279)
737da60bb3 is described below
commit 737da60bb320930fa0d8f264643b6bc7561a9719
Author: liming.1018 <[email protected]>
AuthorDate: Thu Mar 13 17:31:11 2025 +0800
[arrow] support ArrowFormatWriter to use an external BufferAllocator to
provide finer-grained memory control and monitor allocation events. (#5279)
---
.../paimon/arrow/vector/ArrowFormatCWriter.java | 17 ++++--
.../paimon/arrow/vector/ArrowFormatWriter.java | 12 +++-
.../paimon/arrow/vector/ArrowFormatWriterTest.java | 68 +++++++++++++++-------
3 files changed, 69 insertions(+), 28 deletions(-)
diff --git
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatCWriter.java
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatCWriter.java
index eb7634fcbf..9dfbbf5782 100644
---
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatCWriter.java
+++
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatCWriter.java
@@ -24,7 +24,7 @@ import org.apache.paimon.types.RowType;
import org.apache.arrow.c.ArrowArray;
import org.apache.arrow.c.ArrowSchema;
-import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
/**
@@ -38,8 +38,17 @@ public class ArrowFormatCWriter implements AutoCloseable {
private final ArrowFormatWriter realWriter;
public ArrowFormatCWriter(RowType rowType, int writeBatchSize, boolean
caseSensitive) {
- this.realWriter = new ArrowFormatWriter(rowType, writeBatchSize,
caseSensitive);
- RootAllocator allocator = realWriter.getAllocator();
+ this(new ArrowFormatWriter(rowType, writeBatchSize, caseSensitive));
+ }
+
+ public ArrowFormatCWriter(
+ RowType rowType, int writeBatchSize, boolean caseSensitive,
BufferAllocator allocator) {
+ this(new ArrowFormatWriter(rowType, writeBatchSize, caseSensitive,
allocator));
+ }
+
+ private ArrowFormatCWriter(ArrowFormatWriter arrowFormatWriter) {
+ this.realWriter = arrowFormatWriter;
+ BufferAllocator allocator = realWriter.getAllocator();
array = ArrowArray.allocateNew(allocator);
schema = ArrowSchema.allocateNew(allocator);
}
@@ -79,7 +88,7 @@ public class ArrowFormatCWriter implements AutoCloseable {
return realWriter.getVectorSchemaRoot();
}
- public RootAllocator getAllocator() {
+ public BufferAllocator getAllocator() {
return realWriter.getAllocator();
}
}
diff --git
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatWriter.java
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatWriter.java
index 6bdc234204..a377dd9f04 100644
---
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatWriter.java
+++
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/vector/ArrowFormatWriter.java
@@ -24,6 +24,7 @@ import
org.apache.paimon.arrow.writer.ArrowFieldWriterFactoryVisitor;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.types.RowType;
+import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.util.OversizedAllocationException;
@@ -40,11 +41,16 @@ public class ArrowFormatWriter implements AutoCloseable {
private final int batchSize;
- private final RootAllocator allocator;
+ private final BufferAllocator allocator;
private int rowId;
public ArrowFormatWriter(RowType rowType, int writeBatchSize, boolean
caseSensitive) {
- allocator = new RootAllocator();
+ this(rowType, writeBatchSize, caseSensitive, new RootAllocator());
+ }
+
+ public ArrowFormatWriter(
+ RowType rowType, int writeBatchSize, boolean caseSensitive,
BufferAllocator allocator) {
+ this.allocator = allocator;
vectorSchemaRoot = ArrowUtils.createVectorSchemaRoot(rowType,
allocator, caseSensitive);
@@ -105,7 +111,7 @@ public class ArrowFormatWriter implements AutoCloseable {
return vectorSchemaRoot;
}
- public RootAllocator getAllocator() {
+ public BufferAllocator getAllocator() {
return allocator;
}
}
diff --git
a/paimon-arrow/src/test/java/org/apache/paimon/arrow/vector/ArrowFormatWriterTest.java
b/paimon-arrow/src/test/java/org/apache/paimon/arrow/vector/ArrowFormatWriterTest.java
index f4eee75185..63e53ca9b8 100644
---
a/paimon-arrow/src/test/java/org/apache/paimon/arrow/vector/ArrowFormatWriterTest.java
+++
b/paimon-arrow/src/test/java/org/apache/paimon/arrow/vector/ArrowFormatWriterTest.java
@@ -30,10 +30,15 @@ import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.StringUtils;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.OutOfMemoryException;
+import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.math.BigDecimal;
import java.util.ArrayList;
@@ -193,36 +198,57 @@ public class ArrowFormatWriterTest {
@Test
public void testCWriter() {
try (ArrowFormatCWriter writer = new
ArrowFormatCWriter(PRIMITIVE_TYPE, 4096, true)) {
- List<InternalRow> list = new ArrayList<>();
- List<InternalRow.FieldGetter> fieldGetters = new ArrayList<>();
+ writeAndCheck(writer);
+ }
+ }
- for (int i = 0; i < PRIMITIVE_TYPE.getFieldCount(); i++) {
-
fieldGetters.add(InternalRow.createFieldGetter(PRIMITIVE_TYPE.getTypeAt(i), i));
- }
- for (int i = 0; i < 1000; i++) {
- list.add(GenericRow.of(randomRowValues(null)));
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testWriteWithExternalAllocator(boolean allocationFailed) {
+ long maxAllocation = allocationFailed ? 1024L : Long.MAX_VALUE;
+ try (RootAllocator rootAllocator = new RootAllocator();
+ BufferAllocator allocator =
+ rootAllocator.newChildAllocator("paimonWriter", 0,
maxAllocation);
+ ArrowFormatCWriter writer =
+ new ArrowFormatCWriter(PRIMITIVE_TYPE, 4096, true,
allocator)) {
+ writeAndCheck(writer);
+ } catch (OutOfMemoryException e) {
+ if (!allocationFailed) {
+ throw e;
}
+ }
+ }
- list.forEach(writer::write);
+ private void writeAndCheck(ArrowFormatCWriter writer) {
+ List<InternalRow> list = new ArrayList<>();
+ List<InternalRow.FieldGetter> fieldGetters = new ArrayList<>();
- writer.flush();
- VectorSchemaRoot vectorSchemaRoot = writer.getVectorSchemaRoot();
+ for (int i = 0; i < PRIMITIVE_TYPE.getFieldCount(); i++) {
+
fieldGetters.add(InternalRow.createFieldGetter(PRIMITIVE_TYPE.getTypeAt(i), i));
+ }
+ for (int i = 0; i < 1000; i++) {
+ list.add(GenericRow.of(randomRowValues(null)));
+ }
- ArrowBatchReader arrowBatchReader = new
ArrowBatchReader(PRIMITIVE_TYPE, true);
- Iterable<InternalRow> rows =
arrowBatchReader.readBatch(vectorSchemaRoot);
+ list.forEach(writer::write);
- Iterator<InternalRow> iterator = rows.iterator();
- for (int i = 0; i < 1000; i++) {
- InternalRow actual = iterator.next();
- InternalRow expectec = list.get(i);
+ writer.flush();
+ VectorSchemaRoot vectorSchemaRoot = writer.getVectorSchemaRoot();
- for (InternalRow.FieldGetter fieldGetter : fieldGetters) {
- Assertions.assertThat(fieldGetter.getFieldOrNull(actual))
- .isEqualTo(fieldGetter.getFieldOrNull(expectec));
- }
+ ArrowBatchReader arrowBatchReader = new
ArrowBatchReader(PRIMITIVE_TYPE, true);
+ Iterable<InternalRow> rows =
arrowBatchReader.readBatch(vectorSchemaRoot);
+
+ Iterator<InternalRow> iterator = rows.iterator();
+ for (int i = 0; i < 1000; i++) {
+ InternalRow actual = iterator.next();
+ InternalRow expectec = list.get(i);
+
+ for (InternalRow.FieldGetter fieldGetter : fieldGetters) {
+ Assertions.assertThat(fieldGetter.getFieldOrNull(actual))
+ .isEqualTo(fieldGetter.getFieldOrNull(expectec));
}
- writer.release();
}
+ writer.release();
}
private Object[] randomRowValues(boolean[] nullable) {