This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 876c4e26ecf [HUDI-7578] Avoid unnecessary rewriting to improve 
performance (#11028)
876c4e26ecf is described below

commit 876c4e26ecf2b710e37e826f583cbf7c5722f246
Author: Danny Chan <[email protected]>
AuthorDate: Wed Apr 17 11:31:17 2024 +0800

    [HUDI-7578] Avoid unnecessary rewriting to improve performance (#11028)
---
 .../src/main/java/org/apache/hudi/io/HoodieMergeHandle.java | 13 +++++--------
 .../org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java  |  2 +-
 .../java/org/apache/hudi/io/HoodieSortedMergeHandle.java    |  4 ++--
 .../hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java    |  2 +-
 .../org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java   |  2 +-
 .../src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java |  4 ++++
 6 files changed, 14 insertions(+), 13 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index 4f5f240c4fd..095b3986e4a 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -104,7 +104,7 @@ public class HoodieMergeHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O>
   protected Map<String, HoodieRecord<T>> keyToNewRecords;
   protected Set<String> writtenRecordKeys;
   protected HoodieFileWriter fileWriter;
-  private boolean preserveMetadata = false;
+  protected boolean preserveMetadata = false;
 
   protected Path newFilePath;
   protected Path oldFilePath;
@@ -112,7 +112,6 @@ public class HoodieMergeHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O>
   protected long recordsDeleted = 0;
   protected long updatedRecordsWritten = 0;
   protected long insertRecordsWritten = 0;
-  protected boolean useWriterSchemaForCompaction;
   protected Option<BaseKeyGenerator> keyGeneratorOpt;
   private HoodieBaseFile baseFileToMerge;
 
@@ -143,7 +142,6 @@ public class HoodieMergeHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O>
                            HoodieBaseFile dataFileToBeMerged, 
TaskContextSupplier taskContextSupplier, Option<BaseKeyGenerator> 
keyGeneratorOpt) {
     super(config, instantTime, partitionPath, fileId, hoodieTable, 
taskContextSupplier);
     this.keyToNewRecords = keyToNewRecords;
-    this.useWriterSchemaForCompaction = true;
     this.preserveMetadata = true;
     init(fileId, this.partitionPath, dataFileToBeMerged);
     validateAndSetAndKeyGenProps(keyGeneratorOpt, config.populateMetaFields());
@@ -280,7 +278,7 @@ public class HoodieMergeHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O>
   }
 
   protected void writeInsertRecord(HoodieRecord<T> newRecord) throws 
IOException {
-    Schema schema = useWriterSchemaForCompaction ? writeSchemaWithMetaFields : 
writeSchema;
+    Schema schema = preserveMetadata ? writeSchemaWithMetaFields : writeSchema;
     // just skip the ignored record
     if (newRecord.shouldIgnore(schema, config.getProps())) {
       return;
@@ -313,7 +311,7 @@ public class HoodieMergeHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O>
         boolean decision = recordMerger.shouldFlush(combineRecord.get(), 
schema, config.getProps());
 
         if (decision) { // CASE (1): Flush the merged record.
-          writeToFile(newRecord.getKey(), combineRecord.get(), schema, prop, 
preserveMetadata && useWriterSchemaForCompaction);
+          writeToFile(newRecord.getKey(), combineRecord.get(), schema, prop, 
preserveMetadata);
           recordsWritten++;
         } else {  // CASE (2): A delete operation.
           recordsDeleted++;
@@ -343,7 +341,7 @@ public class HoodieMergeHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O>
    */
   public void write(HoodieRecord<T> oldRecord) {
     Schema oldSchema = config.populateMetaFields() ? writeSchemaWithMetaFields 
: writeSchema;
-    Schema newSchema = useWriterSchemaForCompaction ? 
writeSchemaWithMetaFields : writeSchema;
+    Schema newSchema = preserveMetadata ? writeSchemaWithMetaFields : 
writeSchema;
     boolean copyOldRecord = true;
     String key = oldRecord.getRecordKey(oldSchema, keyGeneratorOpt);
     TypedProperties props = config.getPayloadConfig().getProps();
@@ -392,8 +390,7 @@ public class HoodieMergeHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O>
     // NOTE: `FILENAME_METADATA_FIELD` has to be rewritten to correctly point 
to the
     //       file holding this record even in cases when overall metadata is 
preserved
     MetadataValues metadataValues = new 
MetadataValues().setFileName(newFilePath.getName());
-    HoodieRecord populatedRecord =
-        record.prependMetaFields(schema, writeSchemaWithMetaFields, 
metadataValues, prop);
+    HoodieRecord populatedRecord = record.prependMetaFields(schema, 
writeSchemaWithMetaFields, metadataValues, prop);
 
     if (shouldPreserveRecordMetadata) {
       fileWriter.write(key.getRecordKey(), populatedRecord, 
writeSchemaWithMetaFields);
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
index f8669416f0c..fba72310513 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
@@ -99,7 +99,7 @@ public class HoodieMergeHandleWithChangeLog<T, I, K, O> 
extends HoodieMergeHandl
   }
 
   protected void writeInsertRecord(HoodieRecord<T> newRecord) throws 
IOException {
-    Schema schema = useWriterSchemaForCompaction ? writeSchemaWithMetaFields : 
writeSchema;
+    Schema schema = preserveMetadata ? writeSchemaWithMetaFields : writeSchema;
     // TODO Remove these unnecessary newInstance invocations
     HoodieRecord<T> savedRecord = newRecord.newInstance();
     super.writeInsertRecord(newRecord);
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
index 3d3a7308bb3..ee0ee914e19 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
@@ -74,7 +74,7 @@ public class HoodieSortedMergeHandle<T, I, K, O> extends 
HoodieMergeHandle<T, I,
   @Override
   public void write(HoodieRecord oldRecord) {
     Schema oldSchema = config.populateMetaFields() ? writeSchemaWithMetaFields 
: writeSchema;
-    Schema newSchema = useWriterSchemaForCompaction ? 
writeSchemaWithMetaFields : writeSchema;
+    Schema newSchema = preserveMetadata ? writeSchemaWithMetaFields : 
writeSchema;
     String key = oldRecord.getRecordKey(oldSchema, keyGeneratorOpt);
 
     // To maintain overall sorted order across updates and inserts, write any 
new inserts whose keys are less than
@@ -111,7 +111,7 @@ public class HoodieSortedMergeHandle<T, I, K, O> extends 
HoodieMergeHandle<T, I,
         String key = newRecordKeysSorted.poll();
         HoodieRecord<T> hoodieRecord = keyToNewRecords.get(key);
         if (!writtenRecordKeys.contains(hoodieRecord.getRecordKey())) {
-          if (useWriterSchemaForCompaction) {
+          if (preserveMetadata) {
             writeRecord(hoodieRecord, Option.of(hoodieRecord), 
writeSchemaWithMetaFields, config.getProps());
           } else {
             writeRecord(hoodieRecord, Option.of(hoodieRecord), writeSchema, 
config.getProps());
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java
index 666c0a8f3fd..85fb5a43504 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java
@@ -83,7 +83,7 @@ public class FlinkMergeAndReplaceHandleWithChangeLog<T, I, K, 
O>
   }
 
   protected void writeInsertRecord(HoodieRecord<T> newRecord) throws 
IOException {
-    Schema schema = useWriterSchemaForCompaction ? writeSchemaWithMetaFields : 
writeSchema;
+    Schema schema = preserveMetadata ? writeSchemaWithMetaFields : writeSchema;
     // TODO Remove these unnecessary newInstance invocations
     HoodieRecord<T> savedRecord = newRecord.newInstance();
     super.writeInsertRecord(newRecord);
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java
index 7d19f454a92..92335d0965d 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java
@@ -81,7 +81,7 @@ public class FlinkMergeHandleWithChangeLog<T, I, K, O>
   }
 
   protected void writeInsertRecord(HoodieRecord<T> newRecord) throws 
IOException {
-    Schema schema = useWriterSchemaForCompaction ? writeSchemaWithMetaFields : 
writeSchema;
+    Schema schema = preserveMetadata ? writeSchemaWithMetaFields : writeSchema;
     // TODO Remove these unnecessary newInstance invocations
     HoodieRecord<T> savedRecord = newRecord.newInstance();
     super.writeInsertRecord(newRecord);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index 2172c7b1ae0..be149a046bc 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -932,6 +932,10 @@ public class HoodieAvroUtils {
     if (oldRecord == null) {
       return null;
     }
+    if (oldAvroSchema.equals(newSchema)) {
+      // there is no need to rewrite if the schema equals.
+      return oldRecord;
+    }
     // try to get real schema for union type
     Schema oldSchema = getActualSchemaFromUnion(oldAvroSchema, oldRecord);
     Object newRecord = rewriteRecordWithNewSchemaInternal(oldRecord, 
oldSchema, newSchema, renameCols, fieldNames);

Reply via email to