This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit f1f6f93fe0f6e88b68e51b9b5703e9ae34030a95
Author: Danny Chan <[email protected]>
AuthorDate: Wed Apr 17 11:31:17 2024 +0800

    [HUDI-7578] Avoid unnecessary rewriting to improve performance (#11028)
---
 .../src/main/java/org/apache/hudi/io/HoodieMergeHandle.java | 13 +++++--------
 .../org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java  |  2 +-
 .../java/org/apache/hudi/io/HoodieSortedMergeHandle.java    |  4 ++--
 .../hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java    |  2 +-
 .../org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java   |  2 +-
 .../src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java |  4 ++++
 6 files changed, 14 insertions(+), 13 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index e40a5585067..749b08c3e7e 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -103,7 +103,7 @@ public class HoodieMergeHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O>
   protected Map<String, HoodieRecord<T>> keyToNewRecords;
   protected Set<String> writtenRecordKeys;
   protected HoodieFileWriter fileWriter;
-  private boolean preserveMetadata = false;
+  protected boolean preserveMetadata = false;
 
   protected Path newFilePath;
   protected Path oldFilePath;
@@ -111,7 +111,6 @@ public class HoodieMergeHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O>
   protected long recordsDeleted = 0;
   protected long updatedRecordsWritten = 0;
   protected long insertRecordsWritten = 0;
-  protected boolean useWriterSchemaForCompaction;
   protected Option<BaseKeyGenerator> keyGeneratorOpt;
   private HoodieBaseFile baseFileToMerge;
 
@@ -142,7 +141,6 @@ public class HoodieMergeHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O>
                            HoodieBaseFile dataFileToBeMerged, 
TaskContextSupplier taskContextSupplier, Option<BaseKeyGenerator> 
keyGeneratorOpt) {
     super(config, instantTime, partitionPath, fileId, hoodieTable, 
taskContextSupplier);
     this.keyToNewRecords = keyToNewRecords;
-    this.useWriterSchemaForCompaction = true;
     this.preserveMetadata = true;
     init(fileId, this.partitionPath, dataFileToBeMerged);
     validateAndSetAndKeyGenProps(keyGeneratorOpt, config.populateMetaFields());
@@ -279,7 +277,7 @@ public class HoodieMergeHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O>
   }
 
   protected void writeInsertRecord(HoodieRecord<T> newRecord) throws 
IOException {
-    Schema schema = useWriterSchemaForCompaction ? writeSchemaWithMetaFields : 
writeSchema;
+    Schema schema = preserveMetadata ? writeSchemaWithMetaFields : writeSchema;
     // just skip the ignored record
     if (newRecord.shouldIgnore(schema, config.getProps())) {
       return;
@@ -308,7 +306,7 @@ public class HoodieMergeHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O>
     }
     try {
       if (combineRecord.isPresent() && !combineRecord.get().isDelete(schema, 
config.getProps()) && !isDelete) {
-        writeToFile(newRecord.getKey(), combineRecord.get(), schema, prop, 
preserveMetadata && useWriterSchemaForCompaction);
+        writeToFile(newRecord.getKey(), combineRecord.get(), schema, prop, 
preserveMetadata);
         recordsWritten++;
       } else {
         recordsDeleted++;
@@ -335,7 +333,7 @@ public class HoodieMergeHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O>
    */
   public void write(HoodieRecord<T> oldRecord) {
     Schema oldSchema = config.populateMetaFields() ? writeSchemaWithMetaFields 
: writeSchema;
-    Schema newSchema = useWriterSchemaForCompaction ? 
writeSchemaWithMetaFields : writeSchema;
+    Schema newSchema = preserveMetadata ? writeSchemaWithMetaFields : 
writeSchema;
     boolean copyOldRecord = true;
     String key = oldRecord.getRecordKey(oldSchema, keyGeneratorOpt);
     TypedProperties props = config.getPayloadConfig().getProps();
@@ -384,8 +382,7 @@ public class HoodieMergeHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O>
     // NOTE: `FILENAME_METADATA_FIELD` has to be rewritten to correctly point 
to the
     //       file holding this record even in cases when overall metadata is 
preserved
     MetadataValues metadataValues = new 
MetadataValues().setFileName(newFilePath.getName());
-    HoodieRecord populatedRecord =
-        record.prependMetaFields(schema, writeSchemaWithMetaFields, 
metadataValues, prop);
+    HoodieRecord populatedRecord = record.prependMetaFields(schema, 
writeSchemaWithMetaFields, metadataValues, prop);
 
     if (shouldPreserveRecordMetadata) {
       fileWriter.write(key.getRecordKey(), populatedRecord, 
writeSchemaWithMetaFields);
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
index f8669416f0c..fba72310513 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
@@ -99,7 +99,7 @@ public class HoodieMergeHandleWithChangeLog<T, I, K, O> 
extends HoodieMergeHandl
   }
 
   protected void writeInsertRecord(HoodieRecord<T> newRecord) throws 
IOException {
-    Schema schema = useWriterSchemaForCompaction ? writeSchemaWithMetaFields : 
writeSchema;
+    Schema schema = preserveMetadata ? writeSchemaWithMetaFields : writeSchema;
     // TODO Remove these unnecessary newInstance invocations
     HoodieRecord<T> savedRecord = newRecord.newInstance();
     super.writeInsertRecord(newRecord);
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
index 3d3a7308bb3..ee0ee914e19 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
@@ -74,7 +74,7 @@ public class HoodieSortedMergeHandle<T, I, K, O> extends 
HoodieMergeHandle<T, I,
   @Override
   public void write(HoodieRecord oldRecord) {
     Schema oldSchema = config.populateMetaFields() ? writeSchemaWithMetaFields 
: writeSchema;
-    Schema newSchema = useWriterSchemaForCompaction ? 
writeSchemaWithMetaFields : writeSchema;
+    Schema newSchema = preserveMetadata ? writeSchemaWithMetaFields : 
writeSchema;
     String key = oldRecord.getRecordKey(oldSchema, keyGeneratorOpt);
 
     // To maintain overall sorted order across updates and inserts, write any 
new inserts whose keys are less than
@@ -111,7 +111,7 @@ public class HoodieSortedMergeHandle<T, I, K, O> extends 
HoodieMergeHandle<T, I,
         String key = newRecordKeysSorted.poll();
         HoodieRecord<T> hoodieRecord = keyToNewRecords.get(key);
         if (!writtenRecordKeys.contains(hoodieRecord.getRecordKey())) {
-          if (useWriterSchemaForCompaction) {
+          if (preserveMetadata) {
             writeRecord(hoodieRecord, Option.of(hoodieRecord), 
writeSchemaWithMetaFields, config.getProps());
           } else {
             writeRecord(hoodieRecord, Option.of(hoodieRecord), writeSchema, 
config.getProps());
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java
index 666c0a8f3fd..85fb5a43504 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java
@@ -83,7 +83,7 @@ public class FlinkMergeAndReplaceHandleWithChangeLog<T, I, K, 
O>
   }
 
   protected void writeInsertRecord(HoodieRecord<T> newRecord) throws 
IOException {
-    Schema schema = useWriterSchemaForCompaction ? writeSchemaWithMetaFields : 
writeSchema;
+    Schema schema = preserveMetadata ? writeSchemaWithMetaFields : writeSchema;
     // TODO Remove these unnecessary newInstance invocations
     HoodieRecord<T> savedRecord = newRecord.newInstance();
     super.writeInsertRecord(newRecord);
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java
index 7d19f454a92..92335d0965d 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java
@@ -81,7 +81,7 @@ public class FlinkMergeHandleWithChangeLog<T, I, K, O>
   }
 
   protected void writeInsertRecord(HoodieRecord<T> newRecord) throws 
IOException {
-    Schema schema = useWriterSchemaForCompaction ? writeSchemaWithMetaFields : 
writeSchema;
+    Schema schema = preserveMetadata ? writeSchemaWithMetaFields : writeSchema;
     // TODO Remove these unnecessary newInstance invocations
     HoodieRecord<T> savedRecord = newRecord.newInstance();
     super.writeInsertRecord(newRecord);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index 189c988dbc3..70ec37639d8 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -934,6 +934,10 @@ public class HoodieAvroUtils {
     if (oldRecord == null) {
       return null;
     }
+    if (oldAvroSchema.equals(newSchema)) {
+      // there is no need to rewrite if the schema equals.
+      return oldRecord;
+    }
     // try to get real schema for union type
     Schema oldSchema = getActualSchemaFromUnion(oldAvroSchema, oldRecord);
     Object newRecord = rewriteRecordWithNewSchemaInternal(oldRecord, 
oldSchema, newSchema, renameCols, fieldNames);

Reply via email to