This is an automated email from the ASF dual-hosted git repository.

wombatukun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new d640774f9953 perf: use shallow projection where applicable (#17682)
d640774f9953 is described below

commit d640774f99532e5524b41b815e4c3fe7830d5f75
Author: Sergey Troshkov <[email protected]>
AuthorDate: Tue Dec 30 09:02:44 2025 +0700

    perf: use shallow projection where applicable (#17682)
---
 .../java/org/apache/hudi/io/HoodieCDCLogger.java   | 17 +--------
 .../java/org/apache/hudi/avro/HoodieAvroUtils.java | 40 ++++++++++++++++++++
 .../apache/hudi/common/model/HoodieAvroRecord.java |  2 +-
 .../scala/org/apache/hudi/AvroProjection.scala     | 44 ----------------------
 4 files changed, 43 insertions(+), 60 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
index a833ac863da3..0d9b74a7f2dc 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.io;
 
+import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
 import org.apache.hudi.common.model.HoodieAvroPayload;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -44,7 +45,6 @@ import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
 
-import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
@@ -275,20 +275,7 @@ public class HoodieCDCLogger implements Closeable {
   }
 
   private GenericRecord removeCommitMetadata(GenericRecord record) {
-    return record == null ? null : getRecordWithoutMetadata(record);
-  }
-
-  private GenericRecord getRecordWithoutMetadata(GenericRecord record) {
-    Schema avroSchema = dataSchema.getAvroSchema();
-    if (record.getSchema().getFields().size() == 
avroSchema.getFields().size()) {
-      return record;
-    } else {
-      GenericData.Record rec = new GenericData.Record(avroSchema);
-      for (Schema.Field field : avroSchema.getFields()) {
-        rec.put(field.pos(), record.get(field.name()));
-      }
-      return rec;
-    }
+    return record == null ? null : 
HoodieAvroUtils.projectRecordToNewSchemaShallow(record, 
dataSchema.getAvroSchema());
   }
 
   // -------------------------------------------------------------------------
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index 0dce463eef0c..f2951fa05a02 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -1370,6 +1370,46 @@ public class HoodieAvroUtils {
         scale, new MathContext(precision, RoundingMode.HALF_UP));
   }
 
+  /**
+   * Projects a record to a new schema by performing a shallow copy of fields.
+   * Best used for adding or removing top-level metadata fields.
+   * <p>
+   * This is a high-performance alternative to deep rewriting. It only 
iterates through
+   * the top-level fields of the target schema and pulls values from the 
source record
+   * by field name.
+   * <p>
+   * <p>
+   * This is significantly faster than {@link #rewriteRecordWithNewSchema} for:
+   * 1. Wide records (many top-level fields): Reduces CPU overhead/recursion.
+   * 2. Deeply nested records: Uses reference-copying for nested structures 
instead of rebuilding them.
+   * <p>
+   * <b>Warning:</b> This method does not recursively rewrite/transform nested 
records, arrays,
+   * or maps. It assumes that the underlying values for each field are already
+   * compatible with the target schema.
+   *
+   * @param record      The source GenericRecord to project.
+   * @param targetSchema The schema to project the record into.
+   * @return A new GenericRecord matching targetSchema, or the original record 
if
+   * the schemas are identical in field count.
+   */
+
+  public static GenericRecord projectRecordToNewSchemaShallow(IndexedRecord 
record, Schema targetSchema) {
+    if (record.getSchema().getFields().size() == 
targetSchema.getFields().size()) {
+      return (GenericRecord) record;
+    } else {
+      GenericData.Record rec = new GenericData.Record(targetSchema);
+      for (Schema.Field field : targetSchema.getFields()) {
+        Field sourceField = record.getSchema().getField(field.name());
+        if (sourceField == null) {
+          rec.put(field.pos(), null);
+        } else {
+          rec.put(field.pos(), record.get(sourceField.pos()));
+        }
+      }
+      return rec;
+    }
+  }
+
   /**
    * Avro does not support type promotion from numbers to string. This 
function returns true if
    * it will be necessary to rewrite the record to support this promotion.
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
index 2b3822851feb..5337cb86f13e 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
@@ -152,7 +152,7 @@ public class HoodieAvroRecord<T extends 
HoodieRecordPayload> extends HoodieRecor
   public HoodieRecord prependMetaFields(Schema recordSchema, Schema 
targetSchema, MetadataValues metadataValues, Properties props) {
     try {
       Option<IndexedRecord> avroRecordOpt = 
getData().getInsertValue(recordSchema, props);
-      GenericRecord newAvroRecord = 
HoodieAvroUtils.rewriteRecordWithNewSchema(avroRecordOpt.get(), targetSchema);
+      GenericRecord newAvroRecord = 
HoodieAvroUtils.projectRecordToNewSchemaShallow(avroRecordOpt.get(), 
targetSchema);
       updateMetadataValuesInternal(newAvroRecord, metadataValues);
       return new HoodieAvroIndexedRecord(getKey(), newAvroRecord, 
getOperation(), this.currentLocation, this.newLocation);
     } catch (IOException e) {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/AvroProjection.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/AvroProjection.scala
deleted file mode 100644
index 06b465a2e18c..000000000000
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/AvroProjection.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.hudi
-
-import org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema
-
-import org.apache.avro.Schema
-import org.apache.avro.generic.GenericRecord
-
-abstract class AvroProjection extends (GenericRecord => GenericRecord)
-
-object AvroProjection {
-
-  /**
-   * Creates projection into provided [[Schema]] allowing to convert 
[[GenericRecord]] into
-   * new schema
-   */
-  def create(schema: Schema): AvroProjection = {
-    val projection = (record: GenericRecord) => 
rewriteRecordWithNewSchema(record, schema)
-    (record: GenericRecord) => if (record.getSchema == schema) {
-      record
-    } else {
-      projection(record)
-    }
-  }
-
-}

Reply via email to