This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 11d09cd7f4 [orc] Limiting Memory Usage of OrcBulkWriter When Writing 
VectorizedRowBatch (#6590)
11d09cd7f4 is described below

commit 11d09cd7f438e93cbc2e5f89a1bf5368d48db1a9
Author: zhuanshenbsj1 <[email protected]>
AuthorDate: Tue Dec 2 15:32:13 2025 +0800

    [orc] Limiting Memory Usage of OrcBulkWriter When Writing 
VectorizedRowBatch (#6590)
---
 .../apache/paimon/format/orc/OrcFileFormat.java    |  5 +-
 .../apache/paimon/format/orc/OrcWriterFactory.java | 14 +++-
 .../paimon/format/orc/writer/FieldWriter.java      |  3 +-
 .../format/orc/writer/FieldWriterFactory.java      | 77 ++++++++++++++++------
 .../paimon/format/orc/writer/OrcBulkWriter.java    | 18 ++++-
 .../format/orc/writer/RowDataVectorizer.java       |  7 +-
 .../paimon/format/orc/writer/Vectorizer.java       |  3 +-
 .../format/orc/writer/OrcBulkWriterTest.java       |  3 +
 8 files changed, 98 insertions(+), 32 deletions(-)

diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
index 76286b8cce..1072f036d0 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
@@ -69,6 +69,7 @@ public class OrcFileFormat extends FileFormat {
     private final org.apache.hadoop.conf.Configuration writerConf;
     private final int readBatchSize;
     private final int writeBatchSize;
+    private final MemorySize writeBatchMemory;
     private final boolean deletionVectorsEnabled;
     private final boolean legacyTimestampLtzType;
 
@@ -81,6 +82,7 @@ public class OrcFileFormat extends FileFormat {
         this.orcProperties.forEach((k, v) -> writerConf.set(k.toString(), 
v.toString()));
         this.readBatchSize = formatContext.readBatchSize();
         this.writeBatchSize = formatContext.writeBatchSize();
+        this.writeBatchMemory = formatContext.writeBatchMemory();
         this.deletionVectorsEnabled = 
formatContext.options().get(DELETION_VECTORS_ENABLED);
         this.legacyTimestampLtzType = 
formatContext.options().get(ORC_TIMESTAMP_LTZ_LEGACY_TYPE);
     }
@@ -149,7 +151,8 @@ public class OrcFileFormat extends FileFormat {
                 new RowDataVectorizer(
                         typeDescription, refinedType.getFields(), 
legacyTimestampLtzType);
 
-        return new OrcWriterFactory(vectorizer, orcProperties, writerConf, 
writeBatchSize);
+        return new OrcWriterFactory(
+                vectorizer, orcProperties, writerConf, writeBatchSize, 
writeBatchMemory);
     }
 
     private Properties getOrcProperties(Options options, FormatContext 
formatContext) {
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcWriterFactory.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcWriterFactory.java
index 784095f5d9..8be5798e2f 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcWriterFactory.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcWriterFactory.java
@@ -25,6 +25,7 @@ import org.apache.paimon.format.FormatWriterFactory;
 import org.apache.paimon.format.orc.writer.OrcBulkWriter;
 import org.apache.paimon.format.orc.writer.Vectorizer;
 import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.options.MemorySize;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -57,6 +58,7 @@ public class OrcWriterFactory implements FormatWriterFactory {
 
     private OrcFile.WriterOptions writerOptions;
     private final int writeBatchSize;
+    private final MemorySize writeBatchMemory;
 
     /**
      * Creates a new OrcBulkWriterFactory using the provided Vectorizer 
implementation.
@@ -66,7 +68,7 @@ public class OrcWriterFactory implements FormatWriterFactory {
      */
     @VisibleForTesting
     OrcWriterFactory(Vectorizer<InternalRow> vectorizer) {
-        this(vectorizer, new Properties(), new Configuration(false), 1024);
+        this(vectorizer, new Properties(), new Configuration(false), 1024, 
MemorySize.ZERO);
     }
 
     /**
@@ -81,7 +83,8 @@ public class OrcWriterFactory implements FormatWriterFactory {
             Vectorizer<InternalRow> vectorizer,
             Properties writerProperties,
             Configuration configuration,
-            int writeBatchSize) {
+            int writeBatchSize,
+            MemorySize writeBatchMemory) {
         this.vectorizer = checkNotNull(vectorizer);
         this.writerProperties = checkNotNull(writerProperties);
         this.confMap = new HashMap<>();
@@ -91,6 +94,7 @@ public class OrcWriterFactory implements FormatWriterFactory {
             confMap.put(entry.getKey(), entry.getValue());
         }
         this.writeBatchSize = writeBatchSize;
+        this.writeBatchMemory = writeBatchMemory;
     }
 
     @Override
@@ -117,7 +121,11 @@ public class OrcWriterFactory implements 
FormatWriterFactory {
         // the key of writer in the ORC memory manager, thus we need to make 
it unique.
         Path unusedPath = new Path(UUID.randomUUID().toString());
         return new OrcBulkWriter(
-                vectorizer, new WriterImpl(null, unusedPath, opts), out, 
writeBatchSize);
+                vectorizer,
+                new WriterImpl(null, unusedPath, opts),
+                out,
+                writeBatchSize,
+                writeBatchMemory);
     }
 
     @VisibleForTesting
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/FieldWriter.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/FieldWriter.java
index bf4d0e0260..b61c290a48 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/FieldWriter.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/FieldWriter.java
@@ -24,5 +24,6 @@ import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
 
 /** Orc field writer. */
 public interface FieldWriter {
-    void write(int rowId, ColumnVector column, DataGetters getters, int 
columnId);
+
+    int write(int rowId, ColumnVector column, DataGetters getters, int 
columnId);
 }
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/FieldWriterFactory.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/FieldWriterFactory.java
index 2a65acfa6c..6d8e771a61 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/FieldWriterFactory.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/FieldWriterFactory.java
@@ -70,6 +70,7 @@ public class FieldWriterFactory implements 
DataTypeVisitor<FieldWriter> {
                 BytesColumnVector vector = (BytesColumnVector) column;
                 byte[] bytes = getters.getString(columnId).toBytes();
                 vector.setVal(rowId, bytes, 0, bytes.length);
+                return bytes.length;
             };
 
     private static final FieldWriter BYTES_WRITER =
@@ -77,36 +78,57 @@ public class FieldWriterFactory implements 
DataTypeVisitor<FieldWriter> {
                 BytesColumnVector vector = (BytesColumnVector) column;
                 byte[] bytes = getters.getBinary(columnId);
                 vector.setVal(rowId, bytes, 0, bytes.length);
+                return bytes.length;
             };
 
     private static final FieldWriter BOOLEAN_WRITER =
-            (rowId, column, getters, columnId) ->
-                    ((LongColumnVector) column).vector[rowId] =
-                            getters.getBoolean(columnId) ? 1 : 0;
+            (rowId, column, getters, columnId) -> {
+                ((LongColumnVector) column).vector[rowId] = 
getters.getBoolean(columnId) ? 1 : 0;
+                // Boolean takes 1 byte
+                return 1;
+            };
 
     private static final FieldWriter INT_WRITER =
-            (rowId, column, getters, columnId) ->
-                    ((LongColumnVector) column).vector[rowId] = 
getters.getInt(columnId);
+            (rowId, column, getters, columnId) -> {
+                ((LongColumnVector) column).vector[rowId] = 
getters.getInt(columnId);
+                // Integer takes 4 bytes
+                return 4;
+            };
 
     private static final FieldWriter TINYINT_WRITER =
-            (rowId, column, getters, columnId) ->
-                    ((LongColumnVector) column).vector[rowId] = 
getters.getByte(columnId);
+            (rowId, column, getters, columnId) -> {
+                ((LongColumnVector) column).vector[rowId] = 
getters.getByte(columnId);
+                // Byte takes 1 byte
+                return 1;
+            };
 
     private static final FieldWriter SMALLINT_WRITER =
-            (rowId, column, getters, columnId) ->
-                    ((LongColumnVector) column).vector[rowId] = 
getters.getShort(columnId);
+            (rowId, column, getters, columnId) -> {
+                ((LongColumnVector) column).vector[rowId] = 
getters.getShort(columnId);
+                // Short takes 2 bytes
+                return 2;
+            };
 
     private static final FieldWriter BIGINT_WRITER =
-            (rowId, column, getters, columnId) ->
-                    ((LongColumnVector) column).vector[rowId] = 
getters.getLong(columnId);
+            (rowId, column, getters, columnId) -> {
+                ((LongColumnVector) column).vector[rowId] = 
getters.getLong(columnId);
+                // Long takes 8 bytes
+                return 8;
+            };
 
     private static final FieldWriter FLOAT_WRITER =
-            (rowId, column, getters, columnId) ->
-                    ((DoubleColumnVector) column).vector[rowId] = 
getters.getFloat(columnId);
+            (rowId, column, getters, columnId) -> {
+                ((DoubleColumnVector) column).vector[rowId] = 
getters.getFloat(columnId);
+                // Float takes 4 bytes
+                return 4;
+            };
 
     private static final FieldWriter DOUBLE_WRITER =
-            (rowId, column, getters, columnId) ->
-                    ((DoubleColumnVector) column).vector[rowId] = 
getters.getDouble(columnId);
+            (rowId, column, getters, columnId) -> {
+                ((DoubleColumnVector) column).vector[rowId] = 
getters.getDouble(columnId);
+                // Double takes 8 bytes
+                return 8;
+            };
 
     private final boolean legacyTimestampLtzType;
 
@@ -186,6 +208,8 @@ public class FieldWriterFactory implements 
DataTypeVisitor<FieldWriter> {
                     getters.getTimestamp(columnId, 
timestampType.getPrecision()).toSQLTimestamp();
             TimestampColumnVector vector = (TimestampColumnVector) column;
             vector.set(rowId, timestamp);
+            // Timestamp consists of milliseconds (long - 8 bytes) and nanos 
(int - 4 bytes)
+            return 12;
         };
     }
 
@@ -203,11 +227,13 @@ public class FieldWriterFactory implements 
DataTypeVisitor<FieldWriter> {
                         LocalZoneTimestamp.fromEpochMillis(
                                 localTimestamp.getMillisecond(),
                                 localTimestamp.getNanoOfMillisecond());
-                timestamp = 
java.sql.Timestamp.from(localZoneTimestamp.toInstant());
+                timestamp = Timestamp.from(localZoneTimestamp.toInstant());
             }
 
             TimestampColumnVector vector = (TimestampColumnVector) column;
             vector.set(rowId, timestamp);
+            // Timestamp consists of milliseconds (long - 8 bytes) and nanos 
(int - 4 bytes)
+            return 12;
         };
     }
 
@@ -230,6 +256,11 @@ public class FieldWriterFactory implements 
DataTypeVisitor<FieldWriter> {
                             columnId, decimalType.getPrecision(), 
decimalType.getScale());
             HiveDecimal hiveDecimal = 
HiveDecimal.create(decimal.toBigDecimal());
             vector.set(rowId, hiveDecimal);
+            // Decimal size using a rough estimate
+            int precision = decimalType.getPrecision();
+            return (precision <= 2)
+                    ? 1
+                    : (precision <= 4) ? 2 : (precision <= 9) ? 4 : (precision 
<= 18) ? 8 : 16;
         };
     }
 
@@ -247,6 +278,7 @@ public class FieldWriterFactory implements 
DataTypeVisitor<FieldWriter> {
                     listColumnVector.childCount,
                     listColumnVector.offsets[rowId] != 0);
 
+            int totalSize = 0;
             for (int i = 0; i < arrayData.size(); i++) {
                 ColumnVector fieldColumn = listColumnVector.child;
                 int fieldIndex = (int) listColumnVector.offsets[rowId] + i;
@@ -254,9 +286,10 @@ public class FieldWriterFactory implements 
DataTypeVisitor<FieldWriter> {
                     fieldColumn.noNulls = false;
                     fieldColumn.isNull[fieldIndex] = true;
                 } else {
-                    elementWriter.write(fieldIndex, fieldColumn, arrayData, i);
+                    totalSize += elementWriter.write(fieldIndex, fieldColumn, 
arrayData, i);
                 }
             }
+            return totalSize;
         };
     }
 
@@ -281,6 +314,7 @@ public class FieldWriterFactory implements 
DataTypeVisitor<FieldWriter> {
                     mapColumnVector.childCount,
                     mapColumnVector.offsets[rowId] != 0);
 
+            int totalSize = 0;
             for (int i = 0; i < keyArray.size(); i++) {
                 int fieldIndex = (int) mapColumnVector.offsets[rowId] + i;
 
@@ -289,7 +323,7 @@ public class FieldWriterFactory implements 
DataTypeVisitor<FieldWriter> {
                     keyColumn.noNulls = false;
                     keyColumn.isNull[fieldIndex] = true;
                 } else {
-                    keyWriter.write(fieldIndex, keyColumn, keyArray, i);
+                    totalSize += keyWriter.write(fieldIndex, keyColumn, 
keyArray, i);
                 }
 
                 ColumnVector valueColumn = mapColumnVector.values;
@@ -297,9 +331,10 @@ public class FieldWriterFactory implements 
DataTypeVisitor<FieldWriter> {
                     valueColumn.noNulls = false;
                     valueColumn.isNull[fieldIndex] = true;
                 } else {
-                    valueWriter.write(fieldIndex, valueColumn, valueArray, i);
+                    totalSize += valueWriter.write(fieldIndex, valueColumn, 
valueArray, i);
                 }
             }
+            return totalSize;
         };
     }
 
@@ -312,15 +347,17 @@ public class FieldWriterFactory implements 
DataTypeVisitor<FieldWriter> {
         return (rowId, column, getters, columnId) -> {
             StructColumnVector structColumnVector = (StructColumnVector) 
column;
             InternalRow structRow = getters.getRow(columnId, 
structColumnVector.fields.length);
+            int totalSize = 0;
             for (int i = 0; i < structRow.getFieldCount(); i++) {
                 ColumnVector fieldColumn = structColumnVector.fields[i];
                 if (structRow.isNullAt(i)) {
                     fieldColumn.noNulls = false;
                     fieldColumn.isNull[rowId] = true;
                 } else {
-                    fieldWriters.get(i).write(rowId, fieldColumn, structRow, 
i);
+                    totalSize += fieldWriters.get(i).write(rowId, fieldColumn, 
structRow, i);
                 }
             }
+            return totalSize;
         };
     }
 
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/OrcBulkWriter.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/OrcBulkWriter.java
index 960a6ee15d..c44e3f26d6 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/OrcBulkWriter.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/OrcBulkWriter.java
@@ -22,6 +22,7 @@ import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.format.FormatWriter;
 import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.options.MemorySize;
 
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.orc.Writer;
@@ -38,22 +39,27 @@ public class OrcBulkWriter implements FormatWriter {
     private final VectorizedRowBatch rowBatch;
     private final PositionOutputStream underlyingStream;
 
+    private long currentBatchMemoryUsage = 0;
+    private final long memoryLimit;
+
     public OrcBulkWriter(
             Vectorizer<InternalRow> vectorizer,
             Writer writer,
             PositionOutputStream underlyingStream,
-            int batchSize) {
+            int batchSize,
+            MemorySize memoryLimit) {
         this.vectorizer = checkNotNull(vectorizer);
         this.writer = checkNotNull(writer);
 
         this.rowBatch = vectorizer.getSchema().createRowBatch(batchSize);
         this.underlyingStream = underlyingStream;
+        this.memoryLimit = memoryLimit.getBytes();
     }
 
     @Override
     public void addElement(InternalRow element) throws IOException {
-        vectorizer.vectorize(element, rowBatch);
-        if (rowBatch.size == rowBatch.getMaxSize()) {
+        currentBatchMemoryUsage += vectorizer.vectorize(element, rowBatch);
+        if (rowBatch.size == rowBatch.getMaxSize() || currentBatchMemoryUsage 
>= this.memoryLimit) {
             flush();
         }
     }
@@ -62,6 +68,7 @@ public class OrcBulkWriter implements FormatWriter {
         if (rowBatch.size != 0) {
             writer.addRowBatch(rowBatch);
             rowBatch.reset();
+            currentBatchMemoryUsage = 0;
         }
     }
 
@@ -88,4 +95,9 @@ public class OrcBulkWriter implements FormatWriter {
     VectorizedRowBatch getRowBatch() {
         return rowBatch;
     }
+
+    @VisibleForTesting
+    long getMemoryLimit() {
+        return memoryLimit;
+    }
 }
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/RowDataVectorizer.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/RowDataVectorizer.java
index e36fbdfee9..5d9a3fc889 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/RowDataVectorizer.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/RowDataVectorizer.java
@@ -50,9 +50,9 @@ public class RowDataVectorizer extends 
Vectorizer<InternalRow> {
         }
     }
 
-    @Override
-    public void vectorize(InternalRow row, VectorizedRowBatch batch) {
+    public long vectorize(InternalRow row, VectorizedRowBatch batch) {
         int rowId = batch.size++;
+        int memBytes = 0;
         for (int i = 0; i < fieldNames.length; ++i) {
             ColumnVector fieldColumn = batch.cols[i];
             if (row.isNullAt(i)) {
@@ -69,8 +69,9 @@ public class RowDataVectorizer extends 
Vectorizer<InternalRow> {
                 fieldColumn.noNulls = false;
                 fieldColumn.isNull[rowId] = true;
             } else {
-                fieldWriters[i].write(rowId, fieldColumn, row, i);
+                memBytes += fieldWriters[i].write(rowId, fieldColumn, row, i);
             }
         }
+        return memBytes;
     }
 }
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/Vectorizer.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/Vectorizer.java
index 2add46531a..c73c9a1bdd 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/Vectorizer.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/Vectorizer.java
@@ -59,7 +59,8 @@ public abstract class Vectorizer<T> implements Serializable {
      *
      * @param element The input element
      * @param batch The batch to write the ColumnVectors
+     * @return The memory size in bytes used by the element
      * @throws IOException if there is an error while transforming the input.
      */
-    public abstract void vectorize(T element, VectorizedRowBatch batch) throws 
IOException;
+    public abstract long vectorize(T element, VectorizedRowBatch batch) throws 
IOException;
 }
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/orc/writer/OrcBulkWriterTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/orc/writer/OrcBulkWriterTest.java
index 1510a93823..4690437c6a 100644
--- 
a/paimon-format/src/test/java/org/apache/paimon/format/orc/writer/OrcBulkWriterTest.java
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/orc/writer/OrcBulkWriterTest.java
@@ -27,6 +27,7 @@ import org.apache.paimon.format.orc.OrcWriterFactory;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.PositionOutputStream;
 import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
@@ -43,6 +44,7 @@ class OrcBulkWriterTest {
     void testRowBatch(@TempDir java.nio.file.Path tempDir) throws IOException {
         Options options = new Options();
         options.set(CoreOptions.WRITE_BATCH_SIZE, 1);
+        options.set(CoreOptions.WRITE_BATCH_MEMORY, MemorySize.parse("1 Kb"));
         FileFormat orc = FileFormat.fromIdentifier("orc", options);
         Assertions.assertThat(orc).isInstanceOf(OrcFileFormat.class);
 
@@ -62,5 +64,6 @@ class OrcBulkWriterTest {
 
         OrcBulkWriter orcBulkWriter = (OrcBulkWriter) formatWriter;
         
Assertions.assertThat(orcBulkWriter.getRowBatch().getMaxSize()).isEqualTo(1);
+        Assertions.assertThat(orcBulkWriter.getMemoryLimit()).isEqualTo(1024);
     }
 }

Reply via email to