This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 6428cb258366 perf(spark): Resolve drop-partition-columns projection
once per writer instead of per row (#18972)
6428cb258366 is described below
commit 6428cb258366df5fdba79dad35c7c8b9933aa8a2
Author: voonhous <[email protected]>
AuthorDate: Sat Jun 13 12:04:56 2026 +0800
perf(spark): Resolve drop-partition-columns projection once per writer
instead of per row (#18972)
BulkInsertDataInternalWriterHelper#write redid constant work for every
row when hoodie.datasource.write.drop.partition.columns is enabled:
resolving the config flag, instantiating a key generator via
constructor reflection through getPartitionPathCols, recomputing the
partition-column ordinals into a fresh HashSet, and round-tripping the
whole row through toSeq/fromSeq (boxing every column).
The flag is now resolved once in the constructor, and the retained
(non-partition) field ordinals and types are computed once on the
first write(). The lazy initialization keeps the partition-column
resolution unreachable for the bucket-index subclasses, which override
write() and never drop columns, and for tasks that write no rows,
matching the previous reachability exactly. write() copies the
retained fields into a fresh GenericInternalRow, which is
value-identical to the previous toSeq/filter/fromSeq output.
---
.../commit/BulkInsertDataInternalWriterHelper.java | 64 ++++++++++++++++------
1 file changed, 46 insertions(+), 18 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertDataInternalWriterHelper.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertDataInternalWriterHelper.java
index 57847faedb82..712a27f81833 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertDataInternalWriterHelper.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertDataInternalWriterHelper.java
@@ -32,6 +32,7 @@ import org.apache.hudi.util.JavaScalaConverters;
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.types.UTF8String;
@@ -69,6 +70,13 @@ public class BulkInsertDataInternalWriterHelper {
protected final boolean simpleKeyGen;
protected final int simplePartitionFieldIndex;
protected final DataType simplePartitionFieldDataType;
+ protected final boolean shouldDropPartitionColumns;
+ // Ordinals and types of the non-partition fields, computed once on the
first write() instead of
+ // in the constructor: bucket-index subclasses override write() and never
drop columns, and the
+ // partition-column resolution must stay unreachable for them (and for tasks
that write no rows)
+ // exactly as before. The helper is confined to a single task thread, so
plain lazy init is safe.
+ private int[] retainedOrdinals;
+ private DataType[] retainedTypes;
/**
* NOTE: This is stored as Catalyst's internal {@link UTF8String} to avoid
* conversion (deserialization) b/w {@link UTF8String} and {@link
String}
@@ -114,6 +122,36 @@ public class BulkInsertDataInternalWriterHelper {
this.simplePartitionFieldIndex = -1;
this.simplePartitionFieldDataType = null;
}
+
+ this.shouldDropPartitionColumns = writeConfig.shouldDropPartitionColumns();
+ }
+
+ /**
+ * Resolves the ordinals and types of the non-partition fields. The
partition columns are a pure
+ * function of the write config and schema, both immutable for the helper's
lifetime, so this
+ * runs once per helper instead of once per row (getPartitionPathCols
instantiates a key
+ * generator reflectively).
+ */
+ private void initRetainedFields() {
+ List<String> partitionCols =
JavaScalaConverters.convertScalaListToJavaList(
+ HoodieDatasetBulkInsertHelper.getPartitionPathCols(this.writeConfig));
+ Set<Integer> partitionIdx = new HashSet<>();
+ for (String col : partitionCols) {
+ partitionIdx.add(this.structType.fieldIndex(col));
+ }
+ int numRetained = structType.fields().length - partitionIdx.size();
+ int[] ordinals = new int[numRetained];
+ DataType[] types = new DataType[numRetained];
+ int retained = 0;
+ for (int i = 0; i < structType.fields().length; i++) {
+ if (!partitionIdx.contains(i)) {
+ ordinals[retained] = i;
+ types[retained] = structType.fields()[i].dataType();
+ retained++;
+ }
+ }
+ this.retainedOrdinals = ordinals;
+ this.retainedTypes = types;
}
public void write(InternalRow row) throws IOException {
@@ -126,27 +164,17 @@ public class BulkInsertDataInternalWriterHelper {
lastKnownPartitionPath = partitionPath.clone();
}
- boolean shouldDropPartitionColumns =
writeConfig.shouldDropPartitionColumns();
if (shouldDropPartitionColumns) {
- // Drop the partition columns from the row
- List<String> partitionCols =
JavaScalaConverters.convertScalaListToJavaList(HoodieDatasetBulkInsertHelper.getPartitionPathCols(this.writeConfig));
- Set<Integer> partitionIdx = new HashSet<>();
- for (String col : partitionCols) {
- partitionIdx.add(this.structType.fieldIndex(col));
+ if (retainedOrdinals == null) {
+ initRetainedFields();
}
-
- // Relies on InternalRow::toSeq(...) preserving the column ordering
based on the supplied schema
- List<Object> cols =
JavaScalaConverters.convertScalaListToJavaList(row.toSeq(structType));
- int idx = 0;
- List<Object> newCols = new ArrayList<>();
- for (Object o : cols) {
- if (!partitionIdx.contains(idx)) {
- newCols.add(o);
- }
- idx += 1;
+ // Drop the partition columns from the row by copying the retained
fields; a fresh row is
+ // allocated per record so values keep the same aliasing behavior as
InternalRow.fromSeq
+ Object[] values = new Object[retainedOrdinals.length];
+ for (int i = 0; i < retainedOrdinals.length; i++) {
+ values[i] = row.get(retainedOrdinals[i], retainedTypes[i]);
}
- InternalRow newRow =
InternalRow.fromSeq(JavaScalaConverters.convertJavaListToScalaSeq(newCols));
- handle.write(newRow);
+ handle.write(new GenericInternalRow(values));
} else {
handle.write(row);
}