This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 271a71b648e [HUDI-6560] Avoid to read instant details 2 times for 
archiving (#9227)
271a71b648e is described below

commit 271a71b648e06c603e83cb454c943907129696a9
Author: Danny Chan <[email protected]>
AuthorDate: Wed Jul 19 19:05:33 2023 +0800

    [HUDI-6560] Avoid to read instant details 2 times for archiving (#9227)
---
 .../apache/hudi/client/HoodieTimelineArchiver.java | 12 +----
 .../hudi/client/utils/MetadataConversionUtils.java | 51 ++++++++++++----------
 .../org/apache/hudi/common/util/CleanerUtils.java  | 26 +++++++++++
 .../apache/hudi/common/util/CompactionUtils.java   | 10 ++++-
 4 files changed, 62 insertions(+), 37 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
index b1a82c1ed03..3d41e3011bd 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java
@@ -686,13 +686,7 @@ public class HoodieTimelineArchiver<T extends 
HoodieAvroPayload, I, K, O> {
       for (HoodieInstant hoodieInstant : instants) {
         try {
           deleteAnyLeftOverMarkers(context, hoodieInstant);
-          // in local FS and HDFS, there could be empty completed instants due 
to crash.
-          if (table.getActiveTimeline().isEmpty(hoodieInstant) && 
hoodieInstant.isCompleted()) {
-            // lets add an entry to the archival, even if not for the plan.
-            records.add(createAvroRecordFromEmptyInstant(hoodieInstant));
-          } else {
-            records.add(convertToAvroRecord(hoodieInstant));
-          }
+          records.add(convertToAvroRecord(hoodieInstant));
           if (records.size() >= this.config.getCommitArchivalBatchSize()) {
             writeToFile(wrapperSchema, records);
           }
@@ -732,8 +726,4 @@ public class HoodieTimelineArchiver<T extends 
HoodieAvroPayload, I, K, O> {
       throws IOException {
     return MetadataConversionUtils.createMetaWrapper(hoodieInstant, 
metaClient);
   }
-
-  private IndexedRecord createAvroRecordFromEmptyInstant(HoodieInstant 
hoodieInstant) throws IOException {
-    return 
MetadataConversionUtils.createMetaWrapperForEmptyInstant(hoodieInstant);
-  }
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java
index e068d4b0432..cfd47ab2b37 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java
@@ -47,6 +47,12 @@ import org.apache.hudi.common.util.Option;
 public class MetadataConversionUtils {
 
   public static HoodieArchivedMetaEntry createMetaWrapper(HoodieInstant 
hoodieInstant, HoodieTableMetaClient metaClient) throws IOException {
+    Option<byte[]> instantDetails = 
metaClient.getActiveTimeline().getInstantDetails(hoodieInstant);
+    if (hoodieInstant.isCompleted() && instantDetails.get().length == 0) {
+      // in local FS and HDFS, there could be empty completed instants due to 
crash.
+      // let's add an entry to the archival, even if not for the plan.
+      return createMetaWrapperForEmptyInstant(hoodieInstant);
+    }
     HoodieArchivedMetaEntry archivedMetaWrapper = new 
HoodieArchivedMetaEntry();
     archivedMetaWrapper.setCommitTime(hoodieInstant.getTimestamp());
     archivedMetaWrapper.setActionState(hoodieInstant.getState().name());
@@ -54,44 +60,41 @@ public class MetadataConversionUtils {
     switch (hoodieInstant.getAction()) {
       case HoodieTimeline.CLEAN_ACTION: {
         if (hoodieInstant.isCompleted()) {
-          
archivedMetaWrapper.setHoodieCleanMetadata(CleanerUtils.getCleanerMetadata(metaClient,
 hoodieInstant));
+          
archivedMetaWrapper.setHoodieCleanMetadata(CleanerUtils.getCleanerMetadata(metaClient,
 instantDetails.get()));
         } else {
-          
archivedMetaWrapper.setHoodieCleanerPlan(CleanerUtils.getCleanerPlan(metaClient,
 hoodieInstant));
+          
archivedMetaWrapper.setHoodieCleanerPlan(CleanerUtils.getCleanerPlan(metaClient,
 instantDetails.get()));
         }
         archivedMetaWrapper.setActionType(ActionType.clean.name());
         break;
       }
       case HoodieTimeline.COMMIT_ACTION: {
-        HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
-                
.fromBytes(metaClient.getActiveTimeline().getInstantDetails(hoodieInstant).get(),
 HoodieCommitMetadata.class);
+        HoodieCommitMetadata commitMetadata = 
HoodieCommitMetadata.fromBytes(instantDetails.get(), 
HoodieCommitMetadata.class);
         
archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadata(commitMetadata));
         archivedMetaWrapper.setActionType(ActionType.commit.name());
         break;
       }
       case HoodieTimeline.DELTA_COMMIT_ACTION: {
-        HoodieCommitMetadata deltaCommitMetadata = HoodieCommitMetadata
-                
.fromBytes(metaClient.getActiveTimeline().getInstantDetails(hoodieInstant).get(),
 HoodieCommitMetadata.class);
+        HoodieCommitMetadata deltaCommitMetadata = 
HoodieCommitMetadata.fromBytes(instantDetails.get(), 
HoodieCommitMetadata.class);
         
archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadata(deltaCommitMetadata));
         archivedMetaWrapper.setActionType(ActionType.deltacommit.name());
         break;
       }
       case HoodieTimeline.REPLACE_COMMIT_ACTION: {
         if (hoodieInstant.isCompleted()) {
-          HoodieReplaceCommitMetadata replaceCommitMetadata = 
HoodieReplaceCommitMetadata
-              
.fromBytes(metaClient.getActiveTimeline().getInstantDetails(hoodieInstant).get(),
 HoodieReplaceCommitMetadata.class);
+          HoodieReplaceCommitMetadata replaceCommitMetadata = 
HoodieReplaceCommitMetadata.fromBytes(instantDetails.get(), 
HoodieReplaceCommitMetadata.class);
           
archivedMetaWrapper.setHoodieReplaceCommitMetadata(ReplaceArchivalHelper.convertReplaceCommitMetadata(replaceCommitMetadata));
         } else if (hoodieInstant.isInflight()) {
-          // inflight replacecommit files have the same meta data body as 
HoodieCommitMetadata
+          // inflight replacecommit files have the same metadata body as 
HoodieCommitMetadata
           // so we could re-use it without further creating an inflight 
extension.
           // Or inflight replacecommit files are empty under clustering 
circumstance
-          Option<HoodieCommitMetadata> inflightCommitMetadata = 
getInflightReplaceMetadata(metaClient, hoodieInstant);
+          Option<HoodieCommitMetadata> inflightCommitMetadata = 
getInflightCommitMetadata(instantDetails);
           if (inflightCommitMetadata.isPresent()) {
             
archivedMetaWrapper.setHoodieInflightReplaceMetadata(convertCommitMetadata(inflightCommitMetadata.get()));
           }
         } else {
           // we may have cases with empty HoodieRequestedReplaceMetadata e.g. 
insert_overwrite_table or insert_overwrite
           // without clustering. However, we should revisit the requested 
commit file standardization
-          Option<HoodieRequestedReplaceMetadata> requestedReplaceMetadata = 
getRequestedReplaceMetadata(metaClient, hoodieInstant);
+          Option<HoodieRequestedReplaceMetadata> requestedReplaceMetadata = 
getRequestedReplaceMetadata(instantDetails);
           if (requestedReplaceMetadata.isPresent()) {
             
archivedMetaWrapper.setHoodieRequestedReplaceMetadata(requestedReplaceMetadata.get());
           }
@@ -101,27 +104,29 @@ public class MetadataConversionUtils {
       }
       case HoodieTimeline.ROLLBACK_ACTION: {
         if (hoodieInstant.isCompleted()) {
-          
archivedMetaWrapper.setHoodieRollbackMetadata(TimelineMetadataUtils.deserializeAvroMetadata(
-                  
metaClient.getActiveTimeline().getInstantDetails(hoodieInstant).get(), 
HoodieRollbackMetadata.class));
+          
archivedMetaWrapper.setHoodieRollbackMetadata(TimelineMetadataUtils.deserializeAvroMetadata(instantDetails.get(),
 HoodieRollbackMetadata.class));
         }
         archivedMetaWrapper.setActionType(ActionType.rollback.name());
         break;
       }
       case HoodieTimeline.SAVEPOINT_ACTION: {
-        
archivedMetaWrapper.setHoodieSavePointMetadata(TimelineMetadataUtils.deserializeAvroMetadata(
-                
metaClient.getActiveTimeline().getInstantDetails(hoodieInstant).get(), 
HoodieSavepointMetadata.class));
+        
archivedMetaWrapper.setHoodieSavePointMetadata(TimelineMetadataUtils.deserializeAvroMetadata(instantDetails.get(),
 HoodieSavepointMetadata.class));
         archivedMetaWrapper.setActionType(ActionType.savepoint.name());
         break;
       }
       case HoodieTimeline.COMPACTION_ACTION: {
-        HoodieCompactionPlan plan = 
CompactionUtils.getCompactionPlan(metaClient, hoodieInstant.getTimestamp());
-        archivedMetaWrapper.setHoodieCompactionPlan(plan);
+        if (hoodieInstant.isRequested()) {
+          HoodieCompactionPlan plan = 
CompactionUtils.getCompactionPlan(metaClient, instantDetails);
+          archivedMetaWrapper.setHoodieCompactionPlan(plan);
+        }
         archivedMetaWrapper.setActionType(ActionType.compaction.name());
         break;
       }
       case HoodieTimeline.LOG_COMPACTION_ACTION: {
-        HoodieCompactionPlan plan = 
CompactionUtils.getLogCompactionPlan(metaClient, hoodieInstant.getTimestamp());
-        archivedMetaWrapper.setHoodieCompactionPlan(plan);
+        if (hoodieInstant.isRequested()) {
+          HoodieCompactionPlan plan = 
CompactionUtils.getCompactionPlan(metaClient, instantDetails);
+          archivedMetaWrapper.setHoodieCompactionPlan(plan);
+        }
         archivedMetaWrapper.setActionType(ActionType.logcompaction.name());
         break;
       }
@@ -132,7 +137,7 @@ public class MetadataConversionUtils {
     return archivedMetaWrapper;
   }
 
-  public static HoodieArchivedMetaEntry 
createMetaWrapperForEmptyInstant(HoodieInstant hoodieInstant) throws 
IOException {
+  public static HoodieArchivedMetaEntry 
createMetaWrapperForEmptyInstant(HoodieInstant hoodieInstant) {
     HoodieArchivedMetaEntry archivedMetaWrapper = new 
HoodieArchivedMetaEntry();
     archivedMetaWrapper.setCommitTime(hoodieInstant.getTimestamp());
     archivedMetaWrapper.setActionState(hoodieInstant.getState().name());
@@ -173,8 +178,7 @@ public class MetadataConversionUtils {
     return archivedMetaWrapper;
   }
 
-  public static Option<HoodieCommitMetadata> 
getInflightReplaceMetadata(HoodieTableMetaClient metaClient, HoodieInstant 
instant) throws IOException {
-    Option<byte[]> inflightContent = 
metaClient.getActiveTimeline().getInstantDetails(instant);
+  private static Option<HoodieCommitMetadata> 
getInflightCommitMetadata(Option<byte[]> inflightContent) throws IOException {
     if (!inflightContent.isPresent() || inflightContent.get().length == 0) {
       // inflight files can be empty in some certain cases, e.g. when users 
opt in clustering
       return Option.empty();
@@ -182,8 +186,7 @@ public class MetadataConversionUtils {
     return Option.of(HoodieCommitMetadata.fromBytes(inflightContent.get(), 
HoodieCommitMetadata.class));
   }
 
-  private static Option<HoodieRequestedReplaceMetadata> 
getRequestedReplaceMetadata(HoodieTableMetaClient metaClient, HoodieInstant 
instant) throws IOException {
-    Option<byte[]> requestedContent = 
metaClient.getActiveTimeline().getInstantDetails(instant);
+  private static Option<HoodieRequestedReplaceMetadata> 
getRequestedReplaceMetadata(Option<byte[]> requestedContent) throws IOException 
{
     if (!requestedContent.isPresent() || requestedContent.get().length == 0) {
       // requested commit files can be empty in some certain cases, e.g. 
insert_overwrite or insert_overwrite_table.
       // However, it appears requested files are supposed to contain meta data 
and we should revisit the standardization
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java
index 2f57f5455c3..899bd673665 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java
@@ -110,6 +110,18 @@ public class CleanerUtils {
     return metadataMigrator.upgradeToLatest(cleanMetadata, 
cleanMetadata.getVersion());
   }
 
+  /**
+   * Get Latest Version of Hoodie Cleaner Metadata - Output of cleaner 
operation.
+   * @return Latest version of Clean metadata corresponding to clean instant
+   * @throws IOException
+   */
+  public static HoodieCleanMetadata getCleanerMetadata(HoodieTableMetaClient 
metaClient, byte[] details)
+      throws IOException {
+    CleanMetadataMigrator metadataMigrator = new 
CleanMetadataMigrator(metaClient);
+    HoodieCleanMetadata cleanMetadata = 
TimelineMetadataUtils.deserializeHoodieCleanMetadata(details);
+    return metadataMigrator.upgradeToLatest(cleanMetadata, 
cleanMetadata.getVersion());
+  }
+
   public static Option<HoodieInstant> getEarliestCommitToRetain(
       HoodieTimeline commitsTimeline, HoodieCleaningPolicy cleaningPolicy, int 
commitsRetained,
       Instant latestInstant, int hoursRetained, HoodieTimelineTimeZone 
timeZone) {
@@ -159,6 +171,20 @@ public class CleanerUtils {
     return cleanPlanMigrator.upgradeToLatest(cleanerPlan, 
cleanerPlan.getVersion());
   }
 
+  /**
+   * Get Latest version of cleaner plan corresponding to a clean instant.
+   *
+   * @param metaClient   Hoodie Table Meta Client
+   * @return Cleaner plan corresponding to clean instant
+   * @throws IOException
+   */
+  public static HoodieCleanerPlan getCleanerPlan(HoodieTableMetaClient 
metaClient, byte[] details)
+      throws IOException {
+    CleanPlanMigrator cleanPlanMigrator = new CleanPlanMigrator(metaClient);
+    HoodieCleanerPlan cleanerPlan = 
TimelineMetadataUtils.deserializeAvroMetadata(details, HoodieCleanerPlan.class);
+    return cleanPlanMigrator.upgradeToLatest(cleanerPlan, 
cleanerPlan.getVersion());
+  }
+
   /**
    * Convert list of cleanFileInfo instances to list of avro-generated 
HoodieCleanFileInfo instances.
    * @param cleanFileInfoList
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java
index 03fd0688665..0f41f1314e1 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java
@@ -186,10 +186,16 @@ public class CompactionUtils {
    * Util method to fetch both compaction and log compaction plan from 
requestedInstant.
    */
   private static HoodieCompactionPlan getCompactionPlan(HoodieTableMetaClient 
metaClient, HoodieInstant requestedInstant) {
+    return getCompactionPlan(metaClient, 
metaClient.getActiveTimeline().readCompactionPlanAsBytes(requestedInstant));
+  }
+
+  /**
+   * Util method to fetch both compaction and log compaction plan from 
requestedInstant.
+   */
+  public static HoodieCompactionPlan getCompactionPlan(HoodieTableMetaClient 
metaClient, Option<byte[]> planContent) {
     CompactionPlanMigrator migrator = new CompactionPlanMigrator(metaClient);
     try {
-      HoodieCompactionPlan compactionPlan = 
TimelineMetadataUtils.deserializeCompactionPlan(
-          
metaClient.getActiveTimeline().readCompactionPlanAsBytes(requestedInstant).get());
+      HoodieCompactionPlan compactionPlan = 
TimelineMetadataUtils.deserializeCompactionPlan(planContent.get());
       return migrator.upgradeToLatest(compactionPlan, 
compactionPlan.getVersion());
     } catch (IOException e) {
       throw new HoodieException(e);

Reply via email to