This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 11d09cd7f4 [orc] Limiting Memory Usage of OrcBulkWriter When Writing
VectorizedRowBatch (#6590)
11d09cd7f4 is described below
commit 11d09cd7f438e93cbc2e5f89a1bf5368d48db1a9
Author: zhuanshenbsj1 <[email protected]>
AuthorDate: Tue Dec 2 15:32:13 2025 +0800
[orc] Limiting Memory Usage of OrcBulkWriter When Writing
VectorizedRowBatch (#6590)
---
.../apache/paimon/format/orc/OrcFileFormat.java | 5 +-
.../apache/paimon/format/orc/OrcWriterFactory.java | 14 +++-
.../paimon/format/orc/writer/FieldWriter.java | 3 +-
.../format/orc/writer/FieldWriterFactory.java | 77 ++++++++++++++++------
.../paimon/format/orc/writer/OrcBulkWriter.java | 18 ++++-
.../format/orc/writer/RowDataVectorizer.java | 7 +-
.../paimon/format/orc/writer/Vectorizer.java | 3 +-
.../format/orc/writer/OrcBulkWriterTest.java | 3 +
8 files changed, 98 insertions(+), 32 deletions(-)
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
index 76286b8cce..1072f036d0 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
@@ -69,6 +69,7 @@ public class OrcFileFormat extends FileFormat {
private final org.apache.hadoop.conf.Configuration writerConf;
private final int readBatchSize;
private final int writeBatchSize;
+ private final MemorySize writeBatchMemory;
private final boolean deletionVectorsEnabled;
private final boolean legacyTimestampLtzType;
@@ -81,6 +82,7 @@ public class OrcFileFormat extends FileFormat {
this.orcProperties.forEach((k, v) -> writerConf.set(k.toString(),
v.toString()));
this.readBatchSize = formatContext.readBatchSize();
this.writeBatchSize = formatContext.writeBatchSize();
+ this.writeBatchMemory = formatContext.writeBatchMemory();
this.deletionVectorsEnabled =
formatContext.options().get(DELETION_VECTORS_ENABLED);
this.legacyTimestampLtzType =
formatContext.options().get(ORC_TIMESTAMP_LTZ_LEGACY_TYPE);
}
@@ -149,7 +151,8 @@ public class OrcFileFormat extends FileFormat {
new RowDataVectorizer(
typeDescription, refinedType.getFields(),
legacyTimestampLtzType);
- return new OrcWriterFactory(vectorizer, orcProperties, writerConf,
writeBatchSize);
+ return new OrcWriterFactory(
+ vectorizer, orcProperties, writerConf, writeBatchSize,
writeBatchMemory);
}
private Properties getOrcProperties(Options options, FormatContext
formatContext) {
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcWriterFactory.java
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcWriterFactory.java
index 784095f5d9..8be5798e2f 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcWriterFactory.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcWriterFactory.java
@@ -25,6 +25,7 @@ import org.apache.paimon.format.FormatWriterFactory;
import org.apache.paimon.format.orc.writer.OrcBulkWriter;
import org.apache.paimon.format.orc.writer.Vectorizer;
import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.options.MemorySize;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -57,6 +58,7 @@ public class OrcWriterFactory implements FormatWriterFactory {
private OrcFile.WriterOptions writerOptions;
private final int writeBatchSize;
+ private final MemorySize writeBatchMemory;
/**
* Creates a new OrcBulkWriterFactory using the provided Vectorizer
implementation.
@@ -66,7 +68,7 @@ public class OrcWriterFactory implements FormatWriterFactory {
*/
@VisibleForTesting
OrcWriterFactory(Vectorizer<InternalRow> vectorizer) {
- this(vectorizer, new Properties(), new Configuration(false), 1024);
+ this(vectorizer, new Properties(), new Configuration(false), 1024,
MemorySize.ZERO);
}
/**
@@ -81,7 +83,8 @@ public class OrcWriterFactory implements FormatWriterFactory {
Vectorizer<InternalRow> vectorizer,
Properties writerProperties,
Configuration configuration,
- int writeBatchSize) {
+ int writeBatchSize,
+ MemorySize writeBatchMemory) {
this.vectorizer = checkNotNull(vectorizer);
this.writerProperties = checkNotNull(writerProperties);
this.confMap = new HashMap<>();
@@ -91,6 +94,7 @@ public class OrcWriterFactory implements FormatWriterFactory {
confMap.put(entry.getKey(), entry.getValue());
}
this.writeBatchSize = writeBatchSize;
+ this.writeBatchMemory = writeBatchMemory;
}
@Override
@@ -117,7 +121,11 @@ public class OrcWriterFactory implements
FormatWriterFactory {
// the key of writer in the ORC memory manager, thus we need to make
it unique.
Path unusedPath = new Path(UUID.randomUUID().toString());
return new OrcBulkWriter(
- vectorizer, new WriterImpl(null, unusedPath, opts), out,
writeBatchSize);
+ vectorizer,
+ new WriterImpl(null, unusedPath, opts),
+ out,
+ writeBatchSize,
+ writeBatchMemory);
}
@VisibleForTesting
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/FieldWriter.java
b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/FieldWriter.java
index bf4d0e0260..b61c290a48 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/FieldWriter.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/FieldWriter.java
@@ -24,5 +24,6 @@ import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
/** Orc field writer. */
public interface FieldWriter {
- void write(int rowId, ColumnVector column, DataGetters getters, int
columnId);
+
+ int write(int rowId, ColumnVector column, DataGetters getters, int
columnId);
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/FieldWriterFactory.java
b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/FieldWriterFactory.java
index 2a65acfa6c..6d8e771a61 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/FieldWriterFactory.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/FieldWriterFactory.java
@@ -70,6 +70,7 @@ public class FieldWriterFactory implements
DataTypeVisitor<FieldWriter> {
BytesColumnVector vector = (BytesColumnVector) column;
byte[] bytes = getters.getString(columnId).toBytes();
vector.setVal(rowId, bytes, 0, bytes.length);
+ return bytes.length;
};
private static final FieldWriter BYTES_WRITER =
@@ -77,36 +78,57 @@ public class FieldWriterFactory implements
DataTypeVisitor<FieldWriter> {
BytesColumnVector vector = (BytesColumnVector) column;
byte[] bytes = getters.getBinary(columnId);
vector.setVal(rowId, bytes, 0, bytes.length);
+ return bytes.length;
};
private static final FieldWriter BOOLEAN_WRITER =
- (rowId, column, getters, columnId) ->
- ((LongColumnVector) column).vector[rowId] =
- getters.getBoolean(columnId) ? 1 : 0;
+ (rowId, column, getters, columnId) -> {
+ ((LongColumnVector) column).vector[rowId] =
getters.getBoolean(columnId) ? 1 : 0;
+ // Boolean takes 1 byte
+ return 1;
+ };
private static final FieldWriter INT_WRITER =
- (rowId, column, getters, columnId) ->
- ((LongColumnVector) column).vector[rowId] =
getters.getInt(columnId);
+ (rowId, column, getters, columnId) -> {
+ ((LongColumnVector) column).vector[rowId] =
getters.getInt(columnId);
+ // Integer takes 4 bytes
+ return 4;
+ };
private static final FieldWriter TINYINT_WRITER =
- (rowId, column, getters, columnId) ->
- ((LongColumnVector) column).vector[rowId] =
getters.getByte(columnId);
+ (rowId, column, getters, columnId) -> {
+ ((LongColumnVector) column).vector[rowId] =
getters.getByte(columnId);
+ // Byte takes 1 byte
+ return 1;
+ };
private static final FieldWriter SMALLINT_WRITER =
- (rowId, column, getters, columnId) ->
- ((LongColumnVector) column).vector[rowId] =
getters.getShort(columnId);
+ (rowId, column, getters, columnId) -> {
+ ((LongColumnVector) column).vector[rowId] =
getters.getShort(columnId);
+ // Short takes 2 bytes
+ return 2;
+ };
private static final FieldWriter BIGINT_WRITER =
- (rowId, column, getters, columnId) ->
- ((LongColumnVector) column).vector[rowId] =
getters.getLong(columnId);
+ (rowId, column, getters, columnId) -> {
+ ((LongColumnVector) column).vector[rowId] =
getters.getLong(columnId);
+ // Long takes 8 bytes
+ return 8;
+ };
private static final FieldWriter FLOAT_WRITER =
- (rowId, column, getters, columnId) ->
- ((DoubleColumnVector) column).vector[rowId] =
getters.getFloat(columnId);
+ (rowId, column, getters, columnId) -> {
+ ((DoubleColumnVector) column).vector[rowId] =
getters.getFloat(columnId);
+ // Float takes 4 bytes
+ return 4;
+ };
private static final FieldWriter DOUBLE_WRITER =
- (rowId, column, getters, columnId) ->
- ((DoubleColumnVector) column).vector[rowId] =
getters.getDouble(columnId);
+ (rowId, column, getters, columnId) -> {
+ ((DoubleColumnVector) column).vector[rowId] =
getters.getDouble(columnId);
+ // Double takes 8 bytes
+ return 8;
+ };
private final boolean legacyTimestampLtzType;
@@ -186,6 +208,8 @@ public class FieldWriterFactory implements
DataTypeVisitor<FieldWriter> {
getters.getTimestamp(columnId,
timestampType.getPrecision()).toSQLTimestamp();
TimestampColumnVector vector = (TimestampColumnVector) column;
vector.set(rowId, timestamp);
+ // Timestamp consists of milliseconds (long - 8 bytes) and nanos
(int - 4 bytes)
+ return 12;
};
}
@@ -203,11 +227,13 @@ public class FieldWriterFactory implements
DataTypeVisitor<FieldWriter> {
LocalZoneTimestamp.fromEpochMillis(
localTimestamp.getMillisecond(),
localTimestamp.getNanoOfMillisecond());
- timestamp =
java.sql.Timestamp.from(localZoneTimestamp.toInstant());
+ timestamp = Timestamp.from(localZoneTimestamp.toInstant());
}
TimestampColumnVector vector = (TimestampColumnVector) column;
vector.set(rowId, timestamp);
+ // Timestamp consists of milliseconds (long - 8 bytes) and nanos
(int - 4 bytes)
+ return 12;
};
}
@@ -230,6 +256,11 @@ public class FieldWriterFactory implements
DataTypeVisitor<FieldWriter> {
columnId, decimalType.getPrecision(),
decimalType.getScale());
HiveDecimal hiveDecimal =
HiveDecimal.create(decimal.toBigDecimal());
vector.set(rowId, hiveDecimal);
+ // Decimal size using a rough estimate
+ int precision = decimalType.getPrecision();
+ return (precision <= 2)
+ ? 1
+ : (precision <= 4) ? 2 : (precision <= 9) ? 4 : (precision
<= 18) ? 8 : 16;
};
}
@@ -247,6 +278,7 @@ public class FieldWriterFactory implements
DataTypeVisitor<FieldWriter> {
listColumnVector.childCount,
listColumnVector.offsets[rowId] != 0);
+ int totalSize = 0;
for (int i = 0; i < arrayData.size(); i++) {
ColumnVector fieldColumn = listColumnVector.child;
int fieldIndex = (int) listColumnVector.offsets[rowId] + i;
@@ -254,9 +286,10 @@ public class FieldWriterFactory implements
DataTypeVisitor<FieldWriter> {
fieldColumn.noNulls = false;
fieldColumn.isNull[fieldIndex] = true;
} else {
- elementWriter.write(fieldIndex, fieldColumn, arrayData, i);
+ totalSize += elementWriter.write(fieldIndex, fieldColumn,
arrayData, i);
}
}
+ return totalSize;
};
}
@@ -281,6 +314,7 @@ public class FieldWriterFactory implements
DataTypeVisitor<FieldWriter> {
mapColumnVector.childCount,
mapColumnVector.offsets[rowId] != 0);
+ int totalSize = 0;
for (int i = 0; i < keyArray.size(); i++) {
int fieldIndex = (int) mapColumnVector.offsets[rowId] + i;
@@ -289,7 +323,7 @@ public class FieldWriterFactory implements
DataTypeVisitor<FieldWriter> {
keyColumn.noNulls = false;
keyColumn.isNull[fieldIndex] = true;
} else {
- keyWriter.write(fieldIndex, keyColumn, keyArray, i);
+ totalSize += keyWriter.write(fieldIndex, keyColumn,
keyArray, i);
}
ColumnVector valueColumn = mapColumnVector.values;
@@ -297,9 +331,10 @@ public class FieldWriterFactory implements
DataTypeVisitor<FieldWriter> {
valueColumn.noNulls = false;
valueColumn.isNull[fieldIndex] = true;
} else {
- valueWriter.write(fieldIndex, valueColumn, valueArray, i);
+ totalSize += valueWriter.write(fieldIndex, valueColumn,
valueArray, i);
}
}
+ return totalSize;
};
}
@@ -312,15 +347,17 @@ public class FieldWriterFactory implements
DataTypeVisitor<FieldWriter> {
return (rowId, column, getters, columnId) -> {
StructColumnVector structColumnVector = (StructColumnVector)
column;
InternalRow structRow = getters.getRow(columnId,
structColumnVector.fields.length);
+ int totalSize = 0;
for (int i = 0; i < structRow.getFieldCount(); i++) {
ColumnVector fieldColumn = structColumnVector.fields[i];
if (structRow.isNullAt(i)) {
fieldColumn.noNulls = false;
fieldColumn.isNull[rowId] = true;
} else {
- fieldWriters.get(i).write(rowId, fieldColumn, structRow,
i);
+ totalSize += fieldWriters.get(i).write(rowId, fieldColumn,
structRow, i);
}
}
+ return totalSize;
};
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/OrcBulkWriter.java
b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/OrcBulkWriter.java
index 960a6ee15d..c44e3f26d6 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/OrcBulkWriter.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/OrcBulkWriter.java
@@ -22,6 +22,7 @@ import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.format.FormatWriter;
import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.options.MemorySize;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.Writer;
@@ -38,22 +39,27 @@ public class OrcBulkWriter implements FormatWriter {
private final VectorizedRowBatch rowBatch;
private final PositionOutputStream underlyingStream;
+ private long currentBatchMemoryUsage = 0;
+ private final long memoryLimit;
+
public OrcBulkWriter(
Vectorizer<InternalRow> vectorizer,
Writer writer,
PositionOutputStream underlyingStream,
- int batchSize) {
+ int batchSize,
+ MemorySize memoryLimit) {
this.vectorizer = checkNotNull(vectorizer);
this.writer = checkNotNull(writer);
this.rowBatch = vectorizer.getSchema().createRowBatch(batchSize);
this.underlyingStream = underlyingStream;
+ this.memoryLimit = memoryLimit.getBytes();
}
@Override
public void addElement(InternalRow element) throws IOException {
- vectorizer.vectorize(element, rowBatch);
- if (rowBatch.size == rowBatch.getMaxSize()) {
+ currentBatchMemoryUsage += vectorizer.vectorize(element, rowBatch);
+ if (rowBatch.size == rowBatch.getMaxSize() || currentBatchMemoryUsage
>= this.memoryLimit) {
flush();
}
}
@@ -62,6 +68,7 @@ public class OrcBulkWriter implements FormatWriter {
if (rowBatch.size != 0) {
writer.addRowBatch(rowBatch);
rowBatch.reset();
+ currentBatchMemoryUsage = 0;
}
}
@@ -88,4 +95,9 @@ public class OrcBulkWriter implements FormatWriter {
VectorizedRowBatch getRowBatch() {
return rowBatch;
}
+
+ @VisibleForTesting
+ long getMemoryLimit() {
+ return memoryLimit;
+ }
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/RowDataVectorizer.java
b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/RowDataVectorizer.java
index e36fbdfee9..5d9a3fc889 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/RowDataVectorizer.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/RowDataVectorizer.java
@@ -50,9 +50,9 @@ public class RowDataVectorizer extends
Vectorizer<InternalRow> {
}
}
- @Override
- public void vectorize(InternalRow row, VectorizedRowBatch batch) {
+ public long vectorize(InternalRow row, VectorizedRowBatch batch) {
int rowId = batch.size++;
+ int memBytes = 0;
for (int i = 0; i < fieldNames.length; ++i) {
ColumnVector fieldColumn = batch.cols[i];
if (row.isNullAt(i)) {
@@ -69,8 +69,9 @@ public class RowDataVectorizer extends
Vectorizer<InternalRow> {
fieldColumn.noNulls = false;
fieldColumn.isNull[rowId] = true;
} else {
- fieldWriters[i].write(rowId, fieldColumn, row, i);
+ memBytes += fieldWriters[i].write(rowId, fieldColumn, row, i);
}
}
+ return memBytes;
}
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/Vectorizer.java
b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/Vectorizer.java
index 2add46531a..c73c9a1bdd 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/Vectorizer.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/Vectorizer.java
@@ -59,7 +59,8 @@ public abstract class Vectorizer<T> implements Serializable {
*
* @param element The input element
* @param batch The batch to write the ColumnVectors
+ * @return The memory size in bytes used by the element
* @throws IOException if there is an error while transforming the input.
*/
- public abstract void vectorize(T element, VectorizedRowBatch batch) throws
IOException;
+ public abstract long vectorize(T element, VectorizedRowBatch batch) throws
IOException;
}
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/orc/writer/OrcBulkWriterTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/orc/writer/OrcBulkWriterTest.java
index 1510a93823..4690437c6a 100644
---
a/paimon-format/src/test/java/org/apache/paimon/format/orc/writer/OrcBulkWriterTest.java
+++
b/paimon-format/src/test/java/org/apache/paimon/format/orc/writer/OrcBulkWriterTest.java
@@ -27,6 +27,7 @@ import org.apache.paimon.format.orc.OrcWriterFactory;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
@@ -43,6 +44,7 @@ class OrcBulkWriterTest {
void testRowBatch(@TempDir java.nio.file.Path tempDir) throws IOException {
Options options = new Options();
options.set(CoreOptions.WRITE_BATCH_SIZE, 1);
+ options.set(CoreOptions.WRITE_BATCH_MEMORY, MemorySize.parse("1 Kb"));
FileFormat orc = FileFormat.fromIdentifier("orc", options);
Assertions.assertThat(orc).isInstanceOf(OrcFileFormat.class);
@@ -62,5 +64,6 @@ class OrcBulkWriterTest {
OrcBulkWriter orcBulkWriter = (OrcBulkWriter) formatWriter;
Assertions.assertThat(orcBulkWriter.getRowBatch().getMaxSize()).isEqualTo(1);
+ Assertions.assertThat(orcBulkWriter.getMemoryLimit()).isEqualTo(1024);
}
}