This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 8896864  [HUDI-3678] Fix record rewrite of create handle when 
'preserveMetadata' is true (#5088)
8896864 is described below

commit 8896864d7b8e8c1a9bf0e2a05353f70a1fabdf22
Author: Danny Chan <[email protected]>
AuthorDate: Fri Mar 25 11:48:50 2022 +0800

    [HUDI-3678] Fix record rewrite of create handle when 'preserveMetadata' is 
true (#5088)
---
 .../org/apache/hudi/io/HoodieCreateHandle.java     | 22 ++++++++++------------
 .../java/org/apache/hudi/io/HoodieMergeHandle.java | 20 ++++++++------------
 .../java/org/apache/hudi/io/HoodieWriteHandle.java |  4 ++--
 .../metadata/HoodieBackedTableMetadataWriter.java  |  6 +++++-
 .../hudi/client/TestUpdateSchemaEvolution.java     |  2 ++
 .../java/org/apache/hudi/avro/HoodieAvroUtils.java | 18 ++++--------------
 6 files changed, 31 insertions(+), 41 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
index 3e7e0b1..0bc3491 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
@@ -59,7 +59,7 @@ public class HoodieCreateHandle<T extends 
HoodieRecordPayload, I, K, O> extends
   protected long recordsDeleted = 0;
   private Map<String, HoodieRecord<T>> recordMap;
   private boolean useWriterSchema = false;
-  private boolean preserveHoodieMetadata = false;
+  private final boolean preserveMetadata;
 
   public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable<T, I, K, O> hoodieTable,
                             String partitionPath, String fileId, 
TaskContextSupplier taskContextSupplier) {
@@ -69,9 +69,9 @@ public class HoodieCreateHandle<T extends 
HoodieRecordPayload, I, K, O> extends
 
   public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable<T, I, K, O> hoodieTable,
                             String partitionPath, String fileId, 
TaskContextSupplier taskContextSupplier,
-                            boolean preserveHoodieMetadata) {
+                            boolean preserveMetadata) {
     this(config, instantTime, hoodieTable, partitionPath, fileId, 
Option.empty(),
-        taskContextSupplier, preserveHoodieMetadata);
+        taskContextSupplier, preserveMetadata);
   }
 
   public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable<T, I, K, O> hoodieTable,
@@ -82,10 +82,10 @@ public class HoodieCreateHandle<T extends 
HoodieRecordPayload, I, K, O> extends
 
   public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable<T, I, K, O> hoodieTable,
                             String partitionPath, String fileId, 
Option<Schema> overriddenSchema,
-                            TaskContextSupplier taskContextSupplier, boolean 
preserveHoodieMetadata) {
+                            TaskContextSupplier taskContextSupplier, boolean 
preserveMetadata) {
     super(config, instantTime, partitionPath, fileId, hoodieTable, 
overriddenSchema,
         taskContextSupplier);
-    this.preserveHoodieMetadata = preserveHoodieMetadata;
+    this.preserveMetadata = preserveMetadata;
     writeStatus.setFileId(fileId);
     writeStatus.setPartitionPath(partitionPath);
     writeStatus.setStat(new HoodieWriteStat());
@@ -111,7 +111,7 @@ public class HoodieCreateHandle<T extends 
HoodieRecordPayload, I, K, O> extends
   public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable<T, I, K, O> hoodieTable,
       String partitionPath, String fileId, Map<String, HoodieRecord<T>> 
recordMap,
       TaskContextSupplier taskContextSupplier) {
-    this(config, instantTime, hoodieTable, partitionPath, fileId, 
taskContextSupplier);
+    this(config, instantTime, hoodieTable, partitionPath, fileId, 
taskContextSupplier, config.isPreserveHoodieCommitMetadataForCompaction());
     this.recordMap = recordMap;
     this.useWriterSchema = true;
   }
@@ -137,13 +137,11 @@ public class HoodieCreateHandle<T extends 
HoodieRecordPayload, I, K, O> extends
           return;
         }
         // Convert GenericRecord to GenericRecord with hoodie commit metadata 
in schema
-        IndexedRecord recordWithMetadataInSchema = 
rewriteRecord((GenericRecord) avroRecord.get());
-        if (preserveHoodieMetadata) {
-          // do not preserve FILENAME_METADATA_FIELD
-          
recordWithMetadataInSchema.put(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.FILENAME_METADATA_FIELD),
 path.getName());
-          fileWriter.writeAvro(record.getRecordKey(), 
recordWithMetadataInSchema);
+        if (preserveMetadata) {
+          fileWriter.writeAvro(record.getRecordKey(),
+              rewriteRecordWithMetadata((GenericRecord) avroRecord.get(), 
path.getName()));
         } else {
-          fileWriter.writeAvroWithMetadata(recordWithMetadataInSchema, record);
+          fileWriter.writeAvroWithMetadata(rewriteRecord((GenericRecord) 
avroRecord.get()), record);
         }
         // update the new location of record, so we know where to find it next
         record.unseal();
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index d38f66a..cbcf382 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -61,8 +61,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import static 
org.apache.hudi.common.model.HoodieRecord.FILENAME_METADATA_FIELD_POS;
-
 @SuppressWarnings("Duplicates")
 /**
  * Handle to merge incoming records to those in storage.
@@ -264,7 +262,7 @@ public class HoodieMergeHandle<T extends 
HoodieRecordPayload, I, K, O> extends H
         isDelete = HoodieOperation.isDelete(hoodieRecord.getOperation());
       }
     }
-    return writeRecord(hoodieRecord, indexedRecord, isDelete, oldRecord);
+    return writeRecord(hoodieRecord, indexedRecord, isDelete);
   }
 
   protected void writeInsertRecord(HoodieRecord<T> hoodieRecord) throws 
IOException {
@@ -274,16 +272,16 @@ public class HoodieMergeHandle<T extends 
HoodieRecordPayload, I, K, O> extends H
     if (insertRecord.isPresent() && insertRecord.get().equals(IGNORE_RECORD)) {
       return;
     }
-    if (writeRecord(hoodieRecord, insertRecord, 
HoodieOperation.isDelete(hoodieRecord.getOperation()), null)) {
+    if (writeRecord(hoodieRecord, insertRecord, 
HoodieOperation.isDelete(hoodieRecord.getOperation()))) {
       insertRecordsWritten++;
     }
   }
 
   protected boolean writeRecord(HoodieRecord<T> hoodieRecord, 
Option<IndexedRecord> indexedRecord) {
-    return writeRecord(hoodieRecord, indexedRecord, false, null);
+    return writeRecord(hoodieRecord, indexedRecord, false);
   }
 
-  protected boolean writeRecord(HoodieRecord<T> hoodieRecord, 
Option<IndexedRecord> indexedRecord, boolean isDelete, GenericRecord oldRecord) 
{
+  protected boolean writeRecord(HoodieRecord<T> hoodieRecord, 
Option<IndexedRecord> indexedRecord, boolean isDelete) {
     Option recordMetadata = hoodieRecord.getData().getMetadata();
     if (!partitionPath.equals(hoodieRecord.getPartitionPath())) {
       HoodieUpsertException failureEx = new HoodieUpsertException("mismatched 
partition path, record partition: "
@@ -294,13 +292,11 @@ public class HoodieMergeHandle<T extends 
HoodieRecordPayload, I, K, O> extends H
     try {
       if (indexedRecord.isPresent() && !isDelete) {
         // Convert GenericRecord to GenericRecord with hoodie commit metadata 
in schema
-        IndexedRecord recordWithMetadataInSchema = 
rewriteRecord((GenericRecord) indexedRecord.get(), preserveMetadata, oldRecord);
-        if (preserveMetadata && useWriterSchema) { // useWriteSchema will be 
true only incase of compaction.
-          // do not preserve FILENAME_METADATA_FIELD
-          recordWithMetadataInSchema.put(FILENAME_METADATA_FIELD_POS, 
newFilePath.getName());
-          fileWriter.writeAvro(hoodieRecord.getRecordKey(), 
recordWithMetadataInSchema);
+        if (preserveMetadata && useWriterSchema) { // useWriteSchema will be 
true only in case of compaction.
+          fileWriter.writeAvro(hoodieRecord.getRecordKey(),
+              rewriteRecordWithMetadata((GenericRecord) indexedRecord.get(), 
newFilePath.getName()));
         } else {
-          fileWriter.writeAvroWithMetadata(recordWithMetadataInSchema, 
hoodieRecord);
+          fileWriter.writeAvroWithMetadata(rewriteRecord((GenericRecord) 
indexedRecord.get()), hoodieRecord);
         }
         recordsWritten++;
       } else {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
index b7e2d6a..89babc7 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
@@ -227,8 +227,8 @@ public abstract class HoodieWriteHandle<T extends 
HoodieRecordPayload, I, K, O>
     return HoodieAvroUtils.rewriteRecord(record, writeSchemaWithMetaFields);
   }
 
-  protected GenericRecord rewriteRecord(GenericRecord record, boolean 
copyOverMetaFields, GenericRecord fallbackRecord) {
-    return HoodieAvroUtils.rewriteRecord(record, writeSchemaWithMetaFields, 
copyOverMetaFields, fallbackRecord);
+  protected GenericRecord rewriteRecordWithMetadata(GenericRecord record, 
String fileName) {
+    return HoodieAvroUtils.rewriteRecordWithMetadata(record, 
writeSchemaWithMetaFields, fileName);
   }
 
   public abstract List<WriteStatus> close();
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 2f4bca8..ce167f7 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -259,7 +259,11 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
             .withInlineCompaction(false)
             
.withMaxNumDeltaCommitsBeforeCompaction(writeConfig.getMetadataCompactDeltaCommitMax())
             // we will trigger archive manually, to ensure only regular writer 
invokes it
-            .withAutoArchive(false).build())
+            .withAutoArchive(false)
+            // by default, the HFile does not keep the metadata fields, set up 
as false
+            // to always use the metadata of the new record.
+            .withPreserveCommitMetadata(false)
+            .build())
         .withParallelism(parallelism, parallelism)
         .withDeleteParallelism(parallelism)
         .withRollbackParallelism(parallelism)
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
index 70f5e9f..a592619 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
@@ -29,6 +29,7 @@ import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.testutils.RawTripTestPayload;
 import org.apache.hudi.common.util.BaseFileUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.io.HoodieCreateHandle;
@@ -77,6 +78,7 @@ public class TestUpdateSchemaEvolution extends 
HoodieClientTestHarness {
   private WriteStatus prepareFirstRecordCommit(List<String> recordsStrs) 
throws IOException {
     // Create a bunch of records with an old version of schema
     final HoodieWriteConfig config = 
makeHoodieClientConfig("/exampleSchema.avsc");
+    config.setValue(HoodieCompactionConfig.PRESERVE_COMMIT_METADATA, "false");
     final HoodieSparkTable table = HoodieSparkTable.create(config, context);
     final List<WriteStatus> statuses = jsc.parallelize(Arrays.asList(1)).map(x 
-> {
       List<HoodieRecord> insertRecords = new ArrayList<>();
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index 5cb18dc..e427422 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -382,23 +382,13 @@ public class HoodieAvroUtils {
     return newRecord;
   }
 
-  public static GenericRecord rewriteRecord(GenericRecord genericRecord, 
Schema newSchema, boolean copyOverMetaFields, GenericRecord fallbackRecord) {
+  public static GenericRecord rewriteRecordWithMetadata(GenericRecord 
genericRecord, Schema newSchema, String fileName) {
     GenericRecord newRecord = new GenericData.Record(newSchema);
-    boolean isSpecificRecord = genericRecord instanceof SpecificRecordBase;
     for (Schema.Field f : newSchema.getFields()) {
-      if (!(isSpecificRecord && isMetadataField(f.name()))) {
-        copyOldValueOrSetDefault(genericRecord, newRecord, f);
-      }
-      if (isMetadataField(f.name()) && copyOverMetaFields) {
-        // if meta field exists in primary generic record, copy over.
-        if (genericRecord.getSchema().getField(f.name()) != null) {
-          copyOldValueOrSetDefault(genericRecord, newRecord, f);
-        } else if (fallbackRecord != null && 
fallbackRecord.getSchema().getField(f.name()) != null) {
-          // if not, try to copy from the fallback record.
-          copyOldValueOrSetDefault(fallbackRecord, newRecord, f);
-        }
-      }
+      copyOldValueOrSetDefault(genericRecord, newRecord, f);
     }
+    // do not preserve FILENAME_METADATA_FIELD
+    newRecord.put(HoodieRecord.FILENAME_METADATA_FIELD_POS, fileName);
     if (!GenericData.get().validate(newSchema, newRecord)) {
       throw new SchemaCompatibilityException(
           "Unable to validate the rewritten record " + genericRecord + " 
against schema " + newSchema);

Reply via email to