yihua commented on code in PR #12826:
URL: https://github.com/apache/hudi/pull/12826#discussion_r1974311163


##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java:
##########
@@ -183,6 +195,129 @@ public interface HoodieTimeline extends 
HoodieInstantReader, Serializable {
    */
   HoodieTimeline getWriteTimeline();
 
+  /**
+   * Load and deserialize the content of an instant into the specified class 
type.
+   *
+   * @param instant The instant to load content from
+   * @param clazz The target class to deserialize into
+   * @return Deserialized instant content
+   * @throws IOException when reading instant content fails
+   */
+  default <T> T loadInstantContent(HoodieInstant instant, Class<T> clazz) 
throws IOException {

Review Comment:
   nit: `readInstantContent` would be a better name



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java:
##########
@@ -183,6 +195,129 @@ public interface HoodieTimeline extends 
HoodieInstantReader, Serializable {
    */
   HoodieTimeline getWriteTimeline();
 
+  /**
+   * Load and deserialize the content of an instant into the specified class 
type.
+   *
+   * @param instant The instant to load content from
+   * @param clazz The target class to deserialize into
+   * @return Deserialized instant content
+   * @throws IOException when reading instant content fails
+   */
+  default <T> T loadInstantContent(HoodieInstant instant, Class<T> clazz) 
throws IOException {
+    TimelineLayout layout = 
TimelineLayout.fromVersion(getTimelineLayoutVersion());
+    return layout.getCommitMetadataSerDe().deserialize(instant, 
getInstantContentStream(instant), () -> isEmpty(instant), clazz);
+  }
+
+  /**
+   * Load and deserialize cleaner plan from an instant.
+   *
+   * @param instant The instant containing the cleaner plan
+   * @return Deserialized HoodieCleanerPlan
+   * @throws IOException when reading instant content fails
+   */
+  default HoodieCleanerPlan loadCleanerPlan(HoodieInstant instant) throws 
IOException {
+    return deserializeAvroMetadata(getInstantContentStream(instant), 
HoodieCleanerPlan.class);
+  }
+
+  /**
+   * Load and deserialize compaction plan from an instant.
+   *
+   * @param instant The instant containing the compaction plan
+   * @return Deserialized HoodieCompactionPlan
+   * @throws IOException when reading instant content fails
+   */
+  default HoodieCompactionPlan loadCompactionPlan(HoodieInstant instant) 
throws IOException {
+    return deserializeAvroMetadata(getInstantContentStream(instant), 
HoodieCompactionPlan.class);
+  }
+
+  /**
+   * Load and deserialize clean metadata from an instant.
+   *
+   * @param instant The instant containing the clean metadata
+   * @return Deserialized HoodieCleanMetadata
+   * @throws IOException when reading instant content fails
+   */
+  default HoodieCleanMetadata loadHoodieCleanMetadata(HoodieInstant instant) 
throws IOException {
+    return deserializeAvroMetadata(getInstantContentStream(instant), 
HoodieCleanMetadata.class);
+  }
+
+  /**
+   * Load and deserialize rollback metadata from an instant.
+   *
+   * @param instant The instant containing the rollback metadata
+   * @return Deserialized HoodieRollbackMetadata
+   * @throws IOException when reading instant content fails
+   */
+  default HoodieRollbackMetadata loadHoodieRollbackMetadata(HoodieInstant 
instant) throws IOException {
+    return deserializeAvroMetadata(getInstantContentStream(instant), 
HoodieRollbackMetadata.class);
+  }
+
+  /**
+   * Load and deserialize restore metadata from an instant.
+   *
+   * @param instant The instant containing the restore metadata
+   * @return Deserialized HoodieRestoreMetadata
+   * @throws IOException when reading instant content fails
+   */
+  default HoodieRestoreMetadata loadHoodieRestoreMetadata(HoodieInstant 
instant) throws IOException {
+    return deserializeAvroMetadata(getInstantContentStream(instant), 
HoodieRestoreMetadata.class);
+  }
+
+  /**
+   * Load and deserialize savepoint metadata from an instant.
+   *
+   * @param instant The instant containing the savepoint metadata
+   * @return Deserialized HoodieSavepointMetadata
+   * @throws IOException when reading instant content fails
+   */
+  default HoodieSavepointMetadata loadHoodieSavepointMetadata(HoodieInstant 
instant) throws IOException {
+    return deserializeAvroMetadata(getInstantContentStream(instant), 
HoodieSavepointMetadata.class);
+  }
+
+  /**
+   * Load and deserialize requested replace metadata from an instant.
+   *
+   * @param instant The instant containing the requested replace metadata
+   * @return Deserialized HoodieRequestedReplaceMetadata
+   * @throws IOException when reading instant content fails
+   */
+  default HoodieRequestedReplaceMetadata 
loadRequestedReplaceMetadata(HoodieInstant instant) throws IOException {
+    return deserializeAvroMetadata(getInstantContentStream(instant), 
HoodieRequestedReplaceMetadata.class);
+  }
+
+  /**
+   * Load and deserialize index plan from an instant.
+   *
+   * @param instant The instant containing the index plan
+   * @return Deserialized HoodieIndexPlan
+   * @throws IOException when reading instant content fails
+   */
+  default HoodieIndexPlan loadIndexPlan(HoodieInstant instant) throws 
IOException {
+    return deserializeAvroMetadata(getInstantContentStream(instant), 
HoodieIndexPlan.class);
+  }
+
+  /**
+   * Load and deserialize commit metadata in Avro format from an instant.
+   *
+   * @param instant The instant containing the commit metadata
+   * @return Deserialized HoodieCommitMetadata
+   * @throws IOException when reading instant content fails
+   */
+  default HoodieCommitMetadata loadCommitMetadataAvro(HoodieInstant instant) 
throws IOException {

Review Comment:
   ```suggestion
     default HoodieCommitMetadata readCommitMetadataToAvro(HoodieInstant 
instant) throws IOException {
   ```



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java:
##########
@@ -183,6 +195,129 @@ public interface HoodieTimeline extends 
HoodieInstantReader, Serializable {
    */
   HoodieTimeline getWriteTimeline();
 
+  /**
+   * Load and deserialize the content of an instant into the specified class 
type.
+   *
+   * @param instant The instant to load content from
+   * @param clazz The target class to deserialize into
+   * @return Deserialized instant content
+   * @throws IOException when reading instant content fails
+   */
+  default <T> T loadInstantContent(HoodieInstant instant, Class<T> clazz) 
throws IOException {
+    TimelineLayout layout = 
TimelineLayout.fromVersion(getTimelineLayoutVersion());
+    return layout.getCommitMetadataSerDe().deserialize(instant, 
getInstantContentStream(instant), () -> isEmpty(instant), clazz);
+  }
+
+  /**
+   * Load and deserialize cleaner plan from an instant.
+   *
+   * @param instant The instant containing the cleaner plan
+   * @return Deserialized HoodieCleanerPlan
+   * @throws IOException when reading instant content fails
+   */
+  default HoodieCleanerPlan loadCleanerPlan(HoodieInstant instant) throws 
IOException {
+    return deserializeAvroMetadata(getInstantContentStream(instant), 
HoodieCleanerPlan.class);
+  }
+
+  /**
+   * Load and deserialize compaction plan from an instant.
+   *
+   * @param instant The instant containing the compaction plan
+   * @return Deserialized HoodieCompactionPlan
+   * @throws IOException when reading instant content fails
+   */
+  default HoodieCompactionPlan loadCompactionPlan(HoodieInstant instant) 
throws IOException {
+    return deserializeAvroMetadata(getInstantContentStream(instant), 
HoodieCompactionPlan.class);
+  }
+
+  /**
+   * Load and deserialize clean metadata from an instant.
+   *
+   * @param instant The instant containing the clean metadata
+   * @return Deserialized HoodieCleanMetadata
+   * @throws IOException when reading instant content fails
+   */
+  default HoodieCleanMetadata loadHoodieCleanMetadata(HoodieInstant instant) 
throws IOException {
+    return deserializeAvroMetadata(getInstantContentStream(instant), 
HoodieCleanMetadata.class);
+  }
+
+  /**
+   * Load and deserialize rollback metadata from an instant.
+   *
+   * @param instant The instant containing the rollback metadata
+   * @return Deserialized HoodieRollbackMetadata
+   * @throws IOException when reading instant content fails
+   */
+  default HoodieRollbackMetadata loadHoodieRollbackMetadata(HoodieInstant 
instant) throws IOException {
+    return deserializeAvroMetadata(getInstantContentStream(instant), 
HoodieRollbackMetadata.class);
+  }
+
+  /**
+   * Load and deserialize restore metadata from an instant.
+   *
+   * @param instant The instant containing the restore metadata
+   * @return Deserialized HoodieRestoreMetadata
+   * @throws IOException when reading instant content fails
+   */
+  default HoodieRestoreMetadata loadHoodieRestoreMetadata(HoodieInstant 
instant) throws IOException {
+    return deserializeAvroMetadata(getInstantContentStream(instant), 
HoodieRestoreMetadata.class);
+  }
+
+  /**
+   * Load and deserialize savepoint metadata from an instant.
+   *
+   * @param instant The instant containing the savepoint metadata
+   * @return Deserialized HoodieSavepointMetadata
+   * @throws IOException when reading instant content fails
+   */
+  default HoodieSavepointMetadata loadHoodieSavepointMetadata(HoodieInstant 
instant) throws IOException {
+    return deserializeAvroMetadata(getInstantContentStream(instant), 
HoodieSavepointMetadata.class);
+  }
+
+  /**
+   * Load and deserialize requested replace metadata from an instant.
+   *
+   * @param instant The instant containing the requested replace metadata
+   * @return Deserialized HoodieRequestedReplaceMetadata
+   * @throws IOException when reading instant content fails
+   */
+  default HoodieRequestedReplaceMetadata 
loadRequestedReplaceMetadata(HoodieInstant instant) throws IOException {
+    return deserializeAvroMetadata(getInstantContentStream(instant), 
HoodieRequestedReplaceMetadata.class);
+  }
+
+  /**
+   * Load and deserialize index plan from an instant.
+   *
+   * @param instant The instant containing the index plan
+   * @return Deserialized HoodieIndexPlan
+   * @throws IOException when reading instant content fails
+   */
+  default HoodieIndexPlan loadIndexPlan(HoodieInstant instant) throws 
IOException {
+    return deserializeAvroMetadata(getInstantContentStream(instant), 
HoodieIndexPlan.class);
+  }
+
+  /**
+   * Load and deserialize commit metadata in Avro format from an instant.
+   *
+   * @param instant The instant containing the commit metadata
+   * @return Deserialized HoodieCommitMetadata
+   * @throws IOException when reading instant content fails
+   */
+  default HoodieCommitMetadata loadCommitMetadataAvro(HoodieInstant instant) 
throws IOException {
+    return deserializeAvroMetadata(getInstantContentStream(instant), 
HoodieCommitMetadata.class);
+  }
+
+  /**
+   * Load and deserialize replace commit metadata in Avro format from an 
instant.
+   *
+   * @param instant The instant containing the replace commit metadata
+   * @return Deserialized HoodieReplaceCommitMetadata
+   * @throws IOException when reading instant content fails
+   */
+  default HoodieReplaceCommitMetadata 
loadReplaceCommitMetadataAvro(HoodieInstant instant) throws IOException {

Review Comment:
   ```suggestion
     default HoodieReplaceCommitMetadata 
readReplaceCommitMetadataToAvro(HoodieInstant instant) throws IOException {
   ```



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java:
##########
@@ -183,6 +195,129 @@ public interface HoodieTimeline extends 
HoodieInstantReader, Serializable {
    */
   HoodieTimeline getWriteTimeline();
 
+  /**
+   * Load and deserialize the content of an instant into the specified class 
type.
+   *
+   * @param instant The instant to load content from
+   * @param clazz The target class to deserialize into
+   * @return Deserialized instant content
+   * @throws IOException when reading instant content fails
+   */
+  default <T> T loadInstantContent(HoodieInstant instant, Class<T> clazz) 
throws IOException {
+    TimelineLayout layout = 
TimelineLayout.fromVersion(getTimelineLayoutVersion());
+    return layout.getCommitMetadataSerDe().deserialize(instant, 
getInstantContentStream(instant), () -> isEmpty(instant), clazz);
+  }
+
+  /**
+   * Load and deserialize cleaner plan from an instant.
+   *
+   * @param instant The instant containing the cleaner plan
+   * @return Deserialized HoodieCleanerPlan
+   * @throws IOException when reading instant content fails
+   */
+  default HoodieCleanerPlan loadCleanerPlan(HoodieInstant instant) throws 
IOException {
+    return deserializeAvroMetadata(getInstantContentStream(instant), 
HoodieCleanerPlan.class);
+  }
+
+  /**
+   * Load and deserialize compaction plan from an instant.
+   *
+   * @param instant The instant containing the compaction plan
+   * @return Deserialized HoodieCompactionPlan
+   * @throws IOException when reading instant content fails
+   */
+  default HoodieCompactionPlan loadCompactionPlan(HoodieInstant instant) 
throws IOException {
+    return deserializeAvroMetadata(getInstantContentStream(instant), 
HoodieCompactionPlan.class);
+  }
+
+  /**
+   * Load and deserialize clean metadata from an instant.
+   *
+   * @param instant The instant containing the clean metadata
+   * @return Deserialized HoodieCleanMetadata
+   * @throws IOException when reading instant content fails
+   */
+  default HoodieCleanMetadata loadHoodieCleanMetadata(HoodieInstant instant) 
throws IOException {

Review Comment:
   We can get rid of `Hoodie` in the method name.
   ```suggestion
     default HoodieCleanMetadata loadCleanMetadata(HoodieInstant instant) 
throws IOException {
   ```



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/MetadataConversionUtils.java:
##########
@@ -332,24 +335,13 @@ public static HoodieArchivedMetaEntry 
createMetaWrapperForEmptyInstant(HoodieIns
     return archivedMetaWrapper;
   }
 
-  private static Option<HoodieCommitMetadata> 
getInflightCommitMetadata(HoodieTableMetaClient metaClient, HoodieInstant 
instant,
-                                                                        
Option<byte[]> inflightContent) throws IOException {
-    if (!inflightContent.isPresent() || inflightContent.get().length == 0) {
-      // inflight files can be empty in some certain cases, e.g. when users 
opt in clustering
+  private static <T extends HoodieCommitMetadata> Option<T> 
getCommitMetadata(HoodieTableMetaClient metaClient, HoodieInstant instant, 
Class<T> clazz) throws IOException {
+    T commitMetadata = 
metaClient.getActiveTimeline().loadInstantContent(instant, clazz);
+    // an empty file will return the default instance with an UNKNOWN 
operation type and in that case we return an empty option
+    if (commitMetadata.getOperationType() == WriteOperationType.UNKNOWN) {

Review Comment:
   Could non-empty commit metadata also have `UNKNOWN` write operation?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/MetadataConversionUtils.java:
##########
@@ -173,72 +175,73 @@ public static HoodieArchivedMetaEntry createMetaWrapper(
     archivedMetaWrapper.setActionState(HoodieInstant.State.COMPLETED.name());
     archivedMetaWrapper.setStateTransitionTime(completionTime);
     String actionType = 
lsmTimelineRecord.get(ArchivedTimelineV2.ACTION_ARCHIVED_META_FIELD).toString();
-    HoodieInstant instant = 
metaClient.getInstantGenerator().createNewInstant(HoodieInstant.State.COMPLETED,
 actionType, instantTime, completionTime);
+    HoodieInstant hoodieInstant = 
metaClient.getInstantGenerator().createNewInstant(HoodieInstant.State.COMPLETED,
 actionType, instantTime, completionTime);
     switch (actionType) {
       case HoodieTimeline.CLEAN_ACTION: {
-        
archivedMetaWrapper.setHoodieCleanMetadata(CleanerUtils.getCleanerMetadata(metaClient,
 instantDetails.get()));
-        
archivedMetaWrapper.setHoodieCleanerPlan(CleanerUtils.getCleanerPlan(metaClient,
 planBytes.get()));
+        
archivedMetaWrapper.setHoodieCleanMetadata(CleanerUtils.getCleanerMetadata(metaClient,
 new ByteArrayInputStream(instantDetails.get())));
+        
archivedMetaWrapper.setHoodieCleanerPlan(CleanerUtils.getCleanerPlan(metaClient,
 new ByteArrayInputStream(planBytes.get())));
         archivedMetaWrapper.setActionType(ActionType.clean.name());
         break;
       }
       case HoodieTimeline.COMMIT_ACTION: {
-        HoodieCommitMetadata commitMetadata = 
metaClient.getCommitMetadataSerDe().deserialize(instant, instantDetails.get(), 
HoodieCommitMetadata.class);
-        
archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadata(commitMetadata));
+        getCommitMetadata(metaClient, hoodieInstant, 
HoodieCommitMetadata.class)
+            .ifPresent(commitMetadata -> 
archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadata(commitMetadata)));
         archivedMetaWrapper.setActionType(ActionType.commit.name());
 
         if (planBytes.isPresent()) {
           // this should be a compaction
-          HoodieCompactionPlan plan = 
CompactionUtils.getCompactionPlan(metaClient, planBytes);
+          HoodieCompactionPlan plan = 
CompactionUtils.getCompactionPlan(metaClient, new 
ByteArrayInputStream(planBytes.get()));
           archivedMetaWrapper.setHoodieCompactionPlan(plan);
         }
         break;
       }
       case HoodieTimeline.DELTA_COMMIT_ACTION: {
-        HoodieCommitMetadata deltaCommitMetadata = 
metaClient.getCommitMetadataSerDe().deserialize(instant, instantDetails.get(), 
HoodieCommitMetadata.class);
-        
archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadata(deltaCommitMetadata));
+        getCommitMetadata(metaClient, hoodieInstant, 
HoodieCommitMetadata.class)
+            .ifPresent(commitMetadata -> 
archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadata(commitMetadata)));
         archivedMetaWrapper.setActionType(ActionType.deltacommit.name());
 
         if (planBytes.isPresent()) {
           // this should be a log compaction
-          HoodieCompactionPlan plan = 
CompactionUtils.getCompactionPlan(metaClient, planBytes);
+          HoodieCompactionPlan plan = 
CompactionUtils.getCompactionPlan(metaClient, new 
ByteArrayInputStream(planBytes.get()));
           archivedMetaWrapper.setHoodieCompactionPlan(plan);
         }
         break;
       }
       case HoodieTimeline.REPLACE_COMMIT_ACTION:
       case HoodieTimeline.CLUSTERING_ACTION: {
-        HoodieReplaceCommitMetadata replaceCommitMetadata = 
HoodieReplaceCommitMetadata.fromBytes(instantDetails.get(), 
HoodieReplaceCommitMetadata.class);
-        
archivedMetaWrapper.setHoodieReplaceCommitMetadata(convertReplaceCommitMetadataToAvro(replaceCommitMetadata));
+        getCommitMetadata(metaClient, hoodieInstant, 
HoodieReplaceCommitMetadata.class)
+            .ifPresent(replaceCommitMetadata -> 
archivedMetaWrapper.setHoodieReplaceCommitMetadata(convertReplaceCommitMetadataToAvro(replaceCommitMetadata)));
 
         // inflight replacecommit files have the same metadata body as 
HoodieCommitMetadata
         // so we could re-use it without further creating an inflight 
extension.
         // Or inflight replacecommit files are empty under clustering 
circumstance
-        Option<HoodieCommitMetadata> inflightCommitMetadata = 
getInflightCommitMetadata(metaClient, instant, instantDetails);
+        Option<HoodieCommitMetadata> inflightCommitMetadata = 
getCommitMetadata(metaClient, hoodieInstant, HoodieCommitMetadata.class);
         if (inflightCommitMetadata.isPresent()) {
           
archivedMetaWrapper.setHoodieInflightReplaceMetadata(convertCommitMetadata(inflightCommitMetadata.get()));
         }
-        archivedMetaWrapper.setActionType(ActionType.replacecommit.name());
+        archivedMetaWrapper.setActionType(
+            
hoodieInstant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION) ? 
ActionType.replacecommit.name() : ActionType.clustering.name());

Review Comment:
   This is for V1 so there is no clustering action for requested or inflight.  
So let's keep using `ActionType.replacecommit.name()` here.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java:
##########
@@ -188,20 +186,20 @@ public static List<String> 
getAffectedPartitions(HoodieTimeline timeline) {
           }
         case HoodieTimeline.CLEAN_ACTION:
           try {
-            HoodieCleanMetadata cleanMetadata = 
TimelineMetadataUtils.deserializeHoodieCleanMetadata(timeline.getInstantDetails(s).get());
+            HoodieCleanMetadata cleanMetadata = 
timeline.loadHoodieCleanMetadata(s);
             return cleanMetadata.getPartitionMetadata().keySet().stream();
           } catch (IOException e) {
             throw new HoodieIOException("Failed to get partitions cleaned at " 
+ s, e);
           }
         case HoodieTimeline.ROLLBACK_ACTION:
           try {
-            return 
TimelineMetadataUtils.deserializeHoodieRollbackMetadata(timeline.getInstantDetails(s).get()).getPartitionMetadata().keySet().stream();
+            return 
timeline.loadHoodieRollbackMetadata(s).getPartitionMetadata().keySet().stream();
           } catch (IOException e) {
             throw new HoodieIOException("Failed to get partitions rolledback 
at " + s, e);
           }
         case HoodieTimeline.RESTORE_ACTION:
           try {
-            HoodieRestoreMetadata restoreMetadata = 
TimelineMetadataUtils.deserializeAvroMetadata(timeline.getInstantDetails(s).get(),
 HoodieRestoreMetadata.class);
+            HoodieRestoreMetadata restoreMetadata = 
timeline.loadInstantContent(s, HoodieRestoreMetadata.class);

Review Comment:
   As discussed with @Davis-Zhang-Onehouse , we should have wrapper method in 
the timeline to read specific metadata and avoid calling 
`loadInstantContent(instant, clazz)` directly.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/MetadataConversionUtils.java:
##########
@@ -395,19 +387,15 @@ public static HoodieReplaceCommitMetadata 
convertReplaceCommitMetadataToPojo(org
   }
 
   /**
-   * Convert commit metadata from avro to json.
+   * Convert replacecommit metadata from avro to pojo.
    */
-  public static <T extends SpecificRecordBase> byte[] 
convertCommitMetadataToJsonBytes(T avroMetaData, Class<T> clazz) {
-    Schema avroSchema = clazz == 
org.apache.hudi.avro.model.HoodieReplaceCommitMetadata.class ? 
org.apache.hudi.avro.model.HoodieReplaceCommitMetadata.getClassSchema() :
-        org.apache.hudi.avro.model.HoodieCommitMetadata.getClassSchema();
-    try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
-      JsonEncoder jsonEncoder = new JsonEncoder(avroSchema, outputStream);
-      DatumWriter<GenericRecord> writer = avroMetaData instanceof 
SpecificRecord ? new SpecificDatumWriter<>(avroSchema) : new 
GenericDatumWriter<>(avroSchema);
-      writer.write(avroMetaData, jsonEncoder);
-      jsonEncoder.flush();
-      return outputStream.toByteArray();
-    } catch (IOException e) {
-      throw new HoodieIOException("Failed to convert to JSON.", e);
+  public static HoodieReplaceCommitMetadata 
convertReplaceCommitMetadataAvroToPojo(org.apache.hudi.avro.model.HoodieReplaceCommitMetadata
 replaceCommitMetadata) {

Review Comment:
   ```suggestion
     public static HoodieReplaceCommitMetadata 
convertReplaceCommitMetadataToPojo(org.apache.hudi.avro.model.HoodieReplaceCommitMetadata
 replaceCommitMetadata) {
   ```



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java:
##########
@@ -38,11 +38,12 @@
 import org.apache.hudi.exception.HoodieTimeTravelException;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
-

Review Comment:
   keep import grouping



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/MetadataConversionUtils.java:
##########
@@ -56,100 +52,106 @@
  */
 public class MetadataConversionUtils {
 
-  public static HoodieArchivedMetaEntry createMetaWrapper(HoodieInstant 
hoodieInstant, HoodieTableMetaClient metaClient) throws IOException {
-    Option<byte[]> instantDetails = 
metaClient.getActiveTimeline().getInstantDetails(hoodieInstant);
-    if (hoodieInstant.isCompleted() && instantDetails.get().length == 0) {
-      // in local FS and HDFS, there could be empty completed instants due to 
crash.
-      // let's add an entry to the archival, even if not for the plan.
-      return createMetaWrapperForEmptyInstant(hoodieInstant);
-    }
-    HoodieArchivedMetaEntry archivedMetaWrapper = new 
HoodieArchivedMetaEntry();
-    archivedMetaWrapper.setCommitTime(hoodieInstant.requestedTime());
-    archivedMetaWrapper.setActionState(hoodieInstant.getState().name());
-    
archivedMetaWrapper.setStateTransitionTime(hoodieInstant.getCompletionTime());
-    switch (hoodieInstant.getAction()) {
-      case HoodieTimeline.CLEAN_ACTION: {
-        if (hoodieInstant.isCompleted()) {
-          
archivedMetaWrapper.setHoodieCleanMetadata(CleanerUtils.getCleanerMetadata(metaClient,
 instantDetails.get()));
-        } else {
-          
archivedMetaWrapper.setHoodieCleanerPlan(CleanerUtils.getCleanerPlan(metaClient,
 instantDetails.get()));
+  public static HoodieArchivedMetaEntry createMetaWrapper(HoodieInstant 
hoodieInstant, HoodieTableMetaClient metaClient) {
+    try {
+      HoodieArchivedMetaEntry archivedMetaWrapper = new 
HoodieArchivedMetaEntry();
+      archivedMetaWrapper.setCommitTime(hoodieInstant.requestedTime());
+      archivedMetaWrapper.setActionState(hoodieInstant.getState().name());
+      
archivedMetaWrapper.setStateTransitionTime(hoodieInstant.getCompletionTime());
+      CommitMetadataSerDe serDe = metaClient.getCommitMetadataSerDe();
+      switch (hoodieInstant.getAction()) {
+        case HoodieTimeline.CLEAN_ACTION: {
+          if (hoodieInstant.isCompleted()) {
+            
archivedMetaWrapper.setHoodieCleanMetadata(CleanerUtils.getCleanerMetadata(metaClient,
 hoodieInstant));
+          } else {
+            
archivedMetaWrapper.setHoodieCleanerPlan(CleanerUtils.getCleanerPlan(metaClient,
 hoodieInstant));
+          }
+          archivedMetaWrapper.setActionType(ActionType.clean.name());
+          break;
         }
-        archivedMetaWrapper.setActionType(ActionType.clean.name());
-        break;
-      }
-      case HoodieTimeline.COMMIT_ACTION: {
-        HoodieCommitMetadata commitMetadata = 
metaClient.getCommitMetadataSerDe().deserialize(hoodieInstant, 
instantDetails.get(), HoodieCommitMetadata.class);
-        
archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadata(commitMetadata));
-        archivedMetaWrapper.setActionType(ActionType.commit.name());
-        break;
-      }
-      case HoodieTimeline.DELTA_COMMIT_ACTION: {
-        HoodieCommitMetadata deltaCommitMetadata = 
metaClient.getCommitMetadataSerDe().deserialize(hoodieInstant, 
instantDetails.get(), HoodieCommitMetadata.class);
-        
archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadata(deltaCommitMetadata));
-        archivedMetaWrapper.setActionType(ActionType.deltacommit.name());
-        break;
-      }
-      case HoodieTimeline.REPLACE_COMMIT_ACTION:
-      case HoodieTimeline.CLUSTERING_ACTION: {
-        if (hoodieInstant.isCompleted()) {
-          HoodieReplaceCommitMetadata replaceCommitMetadata = 
HoodieReplaceCommitMetadata.fromBytes(instantDetails.get(), 
HoodieReplaceCommitMetadata.class);
-          
archivedMetaWrapper.setHoodieReplaceCommitMetadata(convertReplaceCommitMetadataToAvro(replaceCommitMetadata));
-        } else if (hoodieInstant.isInflight()) {
-          // inflight replacecommit files have the same metadata body as 
HoodieCommitMetadata
-          // so we could re-use it without further creating an inflight 
extension.
-          // Or inflight replacecommit files are empty under clustering 
circumstance
-          Option<HoodieCommitMetadata> inflightCommitMetadata = 
getInflightCommitMetadata(metaClient, hoodieInstant, instantDetails);
-          if (inflightCommitMetadata.isPresent()) {
-            
archivedMetaWrapper.setHoodieInflightReplaceMetadata(convertCommitMetadata(inflightCommitMetadata.get()));
+        case HoodieTimeline.COMMIT_ACTION: {
+          getCommitMetadata(metaClient, hoodieInstant, 
HoodieCommitMetadata.class)
+              .ifPresent(commitMetadata -> 
archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadata(commitMetadata)));
+          archivedMetaWrapper.setActionType(ActionType.commit.name());
+          break;
+        }
+        case HoodieTimeline.DELTA_COMMIT_ACTION: {
+          getCommitMetadata(metaClient, hoodieInstant, 
HoodieCommitMetadata.class)
+              .ifPresent(deltaCommitMetadata -> 
archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadata(deltaCommitMetadata)));
+          archivedMetaWrapper.setActionType(ActionType.deltacommit.name());
+          break;
+        }
+        case HoodieTimeline.REPLACE_COMMIT_ACTION:
+        case HoodieTimeline.CLUSTERING_ACTION: {
+          if (hoodieInstant.isCompleted()) {
+            getCommitMetadata(metaClient, hoodieInstant, 
HoodieReplaceCommitMetadata.class)
+                .ifPresent(replaceCommitMetadata -> 
archivedMetaWrapper.setHoodieReplaceCommitMetadata(convertReplaceCommitMetadataToAvro(replaceCommitMetadata)));
+          } else if (hoodieInstant.isInflight()) {
+            // inflight replacecommit files have the same metadata body as 
HoodieCommitMetadata
+            // so we could re-use it without further creating an inflight 
extension.
+            // Or inflight replacecommit files are empty under clustering 
circumstance
+            Option<HoodieCommitMetadata> inflightCommitMetadata = 
getCommitMetadata(metaClient, hoodieInstant, HoodieCommitMetadata.class);
+            if (inflightCommitMetadata.isPresent()) {
+              
archivedMetaWrapper.setHoodieInflightReplaceMetadata(convertCommitMetadata(inflightCommitMetadata.get()));
+            }
+          } else {
+            // we may have cases with empty HoodieRequestedReplaceMetadata 
e.g. insert_overwrite_table or insert_overwrite
+            // without clustering. However, we should revisit the requested 
commit file standardization
+            Option<HoodieRequestedReplaceMetadata> requestedReplaceMetadata = 
Option.of(metaClient.getActiveTimeline()
+                .loadRequestedReplaceMetadata(hoodieInstant));
+            if (requestedReplaceMetadata.isPresent()) {
+              
archivedMetaWrapper.setHoodieRequestedReplaceMetadata(requestedReplaceMetadata.get());
+            }
           }
-        } else {
-          // we may have cases with empty HoodieRequestedReplaceMetadata e.g. 
insert_overwrite_table or insert_overwrite
-          // without clustering. However, we should revisit the requested 
commit file standardization
-          Option<HoodieRequestedReplaceMetadata> requestedReplaceMetadata = 
getRequestedReplaceMetadata(instantDetails);
-          if (requestedReplaceMetadata.isPresent()) {
-            
archivedMetaWrapper.setHoodieRequestedReplaceMetadata(requestedReplaceMetadata.get());
+          archivedMetaWrapper.setActionType(
+              
hoodieInstant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION) ? 
ActionType.replacecommit.name() : ActionType.clustering.name());
+          break;
+        }
+        case HoodieTimeline.ROLLBACK_ACTION: {
+          if (hoodieInstant.isCompleted()) {
+            
archivedMetaWrapper.setHoodieRollbackMetadata(metaClient.getActiveTimeline().loadInstantContent(hoodieInstant,
 HoodieRollbackMetadata.class));
           }
+          archivedMetaWrapper.setActionType(ActionType.rollback.name());
+          break;
         }
-        archivedMetaWrapper.setActionType(
-            
hoodieInstant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION) ? 
ActionType.replacecommit.name() : ActionType.clustering.name());
-        break;
-      }
-      case HoodieTimeline.ROLLBACK_ACTION: {
-        if (hoodieInstant.isCompleted()) {
-          
archivedMetaWrapper.setHoodieRollbackMetadata(TimelineMetadataUtils.deserializeAvroMetadata(instantDetails.get(),
 HoodieRollbackMetadata.class));
+        case HoodieTimeline.SAVEPOINT_ACTION: {
+          
archivedMetaWrapper.setHoodieSavePointMetadata(metaClient.getActiveTimeline().loadInstantContent(hoodieInstant,
 HoodieSavepointMetadata.class));
+          archivedMetaWrapper.setActionType(ActionType.savepoint.name());
+          break;
         }
-        archivedMetaWrapper.setActionType(ActionType.rollback.name());
-        break;
-      }
-      case HoodieTimeline.SAVEPOINT_ACTION: {
-        
archivedMetaWrapper.setHoodieSavePointMetadata(TimelineMetadataUtils.deserializeAvroMetadata(instantDetails.get(),
 HoodieSavepointMetadata.class));
-        archivedMetaWrapper.setActionType(ActionType.savepoint.name());
-        break;
-      }
-      case HoodieTimeline.COMPACTION_ACTION: {
-        if (hoodieInstant.isRequested()) {
-          HoodieCompactionPlan plan = 
CompactionUtils.getCompactionPlan(metaClient, instantDetails);
-          archivedMetaWrapper.setHoodieCompactionPlan(plan);
+        case HoodieTimeline.COMPACTION_ACTION: {
+          if (hoodieInstant.isRequested()) {
+            HoodieCompactionPlan plan = 
CompactionUtils.getCompactionPlan(metaClient, hoodieInstant);
+            archivedMetaWrapper.setHoodieCompactionPlan(plan);
+          }
+          archivedMetaWrapper.setActionType(ActionType.compaction.name());
+          break;
         }
-        archivedMetaWrapper.setActionType(ActionType.compaction.name());
-        break;
-      }
-      case HoodieTimeline.LOG_COMPACTION_ACTION: {
-        if (hoodieInstant.isRequested()) {
-          HoodieCompactionPlan plan = 
CompactionUtils.getCompactionPlan(metaClient, instantDetails);
-          archivedMetaWrapper.setHoodieCompactionPlan(plan);
+        case HoodieTimeline.LOG_COMPACTION_ACTION: {
+          if (hoodieInstant.isRequested()) {
+            HoodieCompactionPlan plan = 
CompactionUtils.getCompactionPlan(metaClient, hoodieInstant);
+            archivedMetaWrapper.setHoodieCompactionPlan(plan);
+          }
+          archivedMetaWrapper.setActionType(ActionType.logcompaction.name());
+          break;
+        }
+        default: {
+          throw new UnsupportedOperationException("Action not fully supported 
yet");
         }
-        archivedMetaWrapper.setActionType(ActionType.logcompaction.name());
-        break;
       }
-      default: {
-        throw new UnsupportedOperationException("Action not fully supported 
yet");
+      return archivedMetaWrapper;
+    } catch (IOException | HoodieIOException ex) {
+      if (metaClient.getActiveTimeline().isEmpty(hoodieInstant)) {
+        // in local FS and HDFS, there could be empty completed instants due 
to crash.
+        // let's add an entry to the archival, even if not for the plan.
+        return createMetaWrapperForEmptyInstant(hoodieInstant);
       }
+      throw new HoodieException(ex);
     }
-    return archivedMetaWrapper;
   }
 
   /**
+   * TODO(reviewers) - new code applied similar refactoring, please pay close 
attention.

Review Comment:
   remove?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/BaseHoodieTimeline.java:
##########
@@ -556,31 +550,14 @@ public Option<HoodieInstant> 
getFirstNonSavepointCommitByCompletionTime() {
 
   @Override
   public Option<byte[]> getInstantDetails(HoodieInstant instant) {
-    return  getInstantReader().getInstantDetails(instant);
+    return getInstantReader().getInstantDetails(instant);

Review Comment:
   Sg



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java:
##########
@@ -604,4 +602,22 @@ public static boolean isDeletePartition(WriteOperationType 
operation) {
         || operation == WriteOperationType.INSERT_OVERWRITE_TABLE
         || operation == WriteOperationType.INSERT_OVERWRITE;
   }
+
+  public static boolean isEmpty(HoodieTableMetaClient metaClient, 
HoodieInstant instant) {
+    try {
+      return metaClient.getStorage()
+          .getPathInfo(new StoragePath(metaClient.getTimelinePath(), 
metaClient.getInstantFileNameGenerator().getFileName(instant)))
+          .getLength() == 0;
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to check emptiness of instant " + 
instant, e);
+    }
+  }
+
+  public static Option<InputStream> getInputStreamOptionLegacy(HoodieTimeline 
timeline, HoodieInstant instant) {

Review Comment:
   Let's have a JIRA to remove this in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to