This is an automated email from the ASF dual-hosted git repository.
wombatukun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new d640774f9953 perf: use shallow projection where applicable (#17682)
d640774f9953 is described below
commit d640774f99532e5524b41b815e4c3fe7830d5f75
Author: Sergey Troshkov <[email protected]>
AuthorDate: Tue Dec 30 09:02:44 2025 +0700
perf: use shallow projection where applicable (#17682)
---
.../java/org/apache/hudi/io/HoodieCDCLogger.java | 17 +--------
.../java/org/apache/hudi/avro/HoodieAvroUtils.java | 40 ++++++++++++++++++++
.../apache/hudi/common/model/HoodieAvroRecord.java | 2 +-
.../scala/org/apache/hudi/AvroProjection.scala | 44 ----------------------
4 files changed, 43 insertions(+), 60 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
index a833ac863da3..0d9b74a7f2dc 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
@@ -18,6 +18,7 @@
package org.apache.hudi.io;
+import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieRecord;
@@ -44,7 +45,6 @@ import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
-import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
@@ -275,20 +275,7 @@ public class HoodieCDCLogger implements Closeable {
}
private GenericRecord removeCommitMetadata(GenericRecord record) {
- return record == null ? null : getRecordWithoutMetadata(record);
- }
-
- private GenericRecord getRecordWithoutMetadata(GenericRecord record) {
- Schema avroSchema = dataSchema.getAvroSchema();
- if (record.getSchema().getFields().size() ==
avroSchema.getFields().size()) {
- return record;
- } else {
- GenericData.Record rec = new GenericData.Record(avroSchema);
- for (Schema.Field field : avroSchema.getFields()) {
- rec.put(field.pos(), record.get(field.name()));
- }
- return rec;
- }
+ return record == null ? null :
HoodieAvroUtils.projectRecordToNewSchemaShallow(record,
dataSchema.getAvroSchema());
}
// -------------------------------------------------------------------------
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index 0dce463eef0c..f2951fa05a02 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -1370,6 +1370,46 @@ public class HoodieAvroUtils {
scale, new MathContext(precision, RoundingMode.HALF_UP));
}
+ /**
+ * Projects a record to a new schema by performing a shallow copy of fields.
+ * Best used for adding or removing top-level metadata fields.
+ * <p>
+ * This is a high-performance alternative to deep rewriting. It only
iterates through
+ * the top-level fields of the target schema and pulls values from the
source record
+ * by field name.
+ * <p>
+ * <p>
+ * This is significantly faster than {@link #rewriteRecordWithNewSchema} for:
+ * 1. Wide records (many top-level fields): Reduces CPU overhead/recursion.
+ * 2. Deeply nested records: Uses reference-copying for nested structures
instead of rebuilding them.
+ * <p>
+ * <b>Warning:</b> This method does not recursively rewrite/transform nested
records, arrays,
+ * or maps. It assumes that the underlying values for each field are already
+ * compatible with the target schema.
+ *
+ * @param record The source GenericRecord to project.
+ * @param targetSchema The schema to project the record into.
+ * @return A new GenericRecord matching targetSchema, or the original record
if
+ * the schemas are identical in field count.
+ */
+
+ public static GenericRecord projectRecordToNewSchemaShallow(IndexedRecord
record, Schema targetSchema) {
+ if (record.getSchema().getFields().size() ==
targetSchema.getFields().size()) {
+ return (GenericRecord) record;
+ } else {
+ GenericData.Record rec = new GenericData.Record(targetSchema);
+ for (Schema.Field field : targetSchema.getFields()) {
+ Field sourceField = record.getSchema().getField(field.name());
+ if (sourceField == null) {
+ rec.put(field.pos(), null);
+ } else {
+ rec.put(field.pos(), record.get(sourceField.pos()));
+ }
+ }
+ return rec;
+ }
+ }
+
/**
* Avro does not support type promotion from numbers to string. This
function returns true if
* it will be necessary to rewrite the record to support this promotion.
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
index 2b3822851feb..5337cb86f13e 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
@@ -152,7 +152,7 @@ public class HoodieAvroRecord<T extends
HoodieRecordPayload> extends HoodieRecor
public HoodieRecord prependMetaFields(Schema recordSchema, Schema
targetSchema, MetadataValues metadataValues, Properties props) {
try {
Option<IndexedRecord> avroRecordOpt =
getData().getInsertValue(recordSchema, props);
- GenericRecord newAvroRecord =
HoodieAvroUtils.rewriteRecordWithNewSchema(avroRecordOpt.get(), targetSchema);
+ GenericRecord newAvroRecord =
HoodieAvroUtils.projectRecordToNewSchemaShallow(avroRecordOpt.get(),
targetSchema);
updateMetadataValuesInternal(newAvroRecord, metadataValues);
return new HoodieAvroIndexedRecord(getKey(), newAvroRecord,
getOperation(), this.currentLocation, this.newLocation);
} catch (IOException e) {
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/AvroProjection.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/AvroProjection.scala
deleted file mode 100644
index 06b465a2e18c..000000000000
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/AvroProjection.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.hudi
-
-import org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema
-
-import org.apache.avro.Schema
-import org.apache.avro.generic.GenericRecord
-
-abstract class AvroProjection extends (GenericRecord => GenericRecord)
-
-object AvroProjection {
-
- /**
- * Creates projection into provided [[Schema]] allowing to convert
[[GenericRecord]] into
- * new schema
- */
- def create(schema: Schema): AvroProjection = {
- val projection = (record: GenericRecord) =>
rewriteRecordWithNewSchema(record, schema)
- (record: GenericRecord) => if (record.getSchema == schema) {
- record
- } else {
- projection(record)
- }
- }
-
-}