Davis-Zhang-Onehouse commented on code in PR #12826:
URL: https://github.com/apache/hudi/pull/12826#discussion_r1975933782


##########
hudi-cli/src/main/java/org/apache/hudi/cli/commands/CleansCommand.java:
##########
@@ -77,8 +75,7 @@ public String showCleans(
     List<HoodieInstant> cleans = 
timeline.getReverseOrderedInstants().collect(Collectors.toList());
     List<Comparable[]> rows = new ArrayList<>();
     for (HoodieInstant clean : cleans) {
-      HoodieCleanMetadata cleanMetadata =
-          
TimelineMetadataUtils.deserializeHoodieCleanMetadata(timeline.getInstantDetails(clean).get());
+      HoodieCleanMetadata cleanMetadata = 
timeline.loadHoodieCleanMetadata(clean);

Review Comment:
   as discussed, that API can only return Object type and caller needs to cast 
explicitly. We choose not to go this route.



##########
hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java:
##########
@@ -125,9 +124,7 @@ public String compactionShow(
     HoodieTableMetaClient client = checkAndGetMetaClient();
     HoodieActiveTimeline activeTimeline = client.getActiveTimeline();
     InstantGenerator instantGenerator = client.getInstantGenerator();
-    HoodieCompactionPlan compactionPlan = 
TimelineMetadataUtils.deserializeCompactionPlan(
-        activeTimeline.readCompactionPlanAsBytes(
-            
instantGenerator.getCompactionRequestedInstant(compactionInstantTime)).get());
+    HoodieCompactionPlan compactionPlan = 
activeTimeline.loadCompactionPlan(instantGenerator.getCompactionRequestedInstant(compactionInstantTime));

Review Comment:
   same as above



##########
hudi-cli/src/main/java/org/apache/hudi/cli/commands/RestoresCommand.java:
##########
@@ -98,9 +96,7 @@ public String showRestore(
   private void addDetailsOfCompletedRestore(HoodieActiveTimeline 
activeTimeline, List<Comparable[]> rows,
                                             HoodieInstant restoreInstant) 
throws IOException {
     HoodieRestoreMetadata instantMetadata;
-    Option<byte[]> instantDetails = 
activeTimeline.getInstantDetails(restoreInstant);
-    instantMetadata = TimelineMetadataUtils
-            .deserializeAvroMetadata(instantDetails.get(), 
HoodieRestoreMetadata.class);
+    instantMetadata = activeTimeline.loadInstantContent(restoreInstant, 
HoodieRestoreMetadata.class);

Review Comment:
   done



##########
hudi-cli/src/main/java/org/apache/hudi/cli/commands/RollbacksCommand.java:
##########
@@ -97,8 +95,8 @@ public String showRollback(
       throws IOException {
     HoodieActiveTimeline activeTimeline = 
HoodieCLI.getTableMetaClient().getActiveTimeline();
     final List<Comparable[]> rows = new ArrayList<>();
-    HoodieRollbackMetadata metadata = 
TimelineMetadataUtils.deserializeAvroMetadata(
-        
activeTimeline.getInstantDetails(HoodieCLI.getTableMetaClient().createNewInstant(State.COMPLETED,
 ROLLBACK_ACTION, rollbackInstant)).get(),
+    HoodieRollbackMetadata metadata =  activeTimeline.loadInstantContent(

Review Comment:
   done



##########
hudi-cli/src/main/java/org/apache/hudi/cli/commands/StatsCommand.java:
##########
@@ -77,8 +77,7 @@ public String writeAmplificationStats(
     TimelineLayout layout = 
TimelineLayout.fromVersion(timeline.getTimelineLayoutVersion());
     for (HoodieInstant instantTime : timeline.getInstants()) {
       String waf = "0";
-      HoodieCommitMetadata commit = 
layout.getCommitMetadataSerDe().deserialize(instantTime, 
activeTimeline.getInstantDetails(instantTime).get(),
-          HoodieCommitMetadata.class);
+      HoodieCommitMetadata commit = 
activeTimeline.loadInstantContent(instantTime, HoodieCommitMetadata.class);

Review Comment:
   done



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SevenToEightUpgradeHandler.java:
##########
@@ -289,7 +289,7 @@ static boolean 
rewriteTimelineV1InstantFileToV2Format(HoodieInstant instant, Hoo
     boolean success = true;
     if (instant.getAction().equals(COMMIT_ACTION) || 
instant.getAction().equals(DELTA_COMMIT_ACTION) || 
(instant.getAction().equals(REPLACE_COMMIT_ACTION) && instant.isCompleted())) {
       Class<? extends HoodieCommitMetadata> clazz = 
instant.getAction().equals(REPLACE_COMMIT_ACTION) ? 
HoodieReplaceCommitMetadata.class : HoodieCommitMetadata.class;
-      HoodieCommitMetadata commitMetadata = 
commitMetadataSerDeV1.deserialize(instant, 
metaClient.getActiveTimeline().getInstantDetails(instant).get(), clazz);
+      HoodieCommitMetadata commitMetadata = 
metaClient.getActiveTimeline().loadInstantContent(instant,clazz);

Review Comment:
   need to add test



##########
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/HoodieWriterClientTestHarness.java:
##########
@@ -912,9 +906,8 @@ protected void testMetadataStatsOnCommit(boolean 
populateMetaFields, Function tr
 
     // Read from commit file
     HoodieInstant instant = INSTANT_GENERATOR.createNewInstant(COMPLETED, 
COMMIT_ACTION, instantTime0);
-    HoodieCommitMetadata metadata = 
metaClient.getCommitMetadataSerDe().deserialize(instant,
-            
createMetaClient().reloadActiveTimeline().getInstantDetails(instant).get(),
-            HoodieCommitMetadata.class);
+    HoodieActiveTimeline activeTimeline = 
createMetaClient().reloadActiveTimeline();
+    HoodieCommitMetadata metadata = activeTimeline.loadInstantContent(instant, 
HoodieCommitMetadata.class);

Review Comment:
   done



##########
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/TestCleanPlanner.java:
##########
@@ -587,49 +584,35 @@ private static HoodieFileGroup 
buildFileGroup(List<String> baseFileCommitTimes,
     return group;
   }
 
-  private static Option<byte[]> getSavepointBytes(String partition, 
List<String> paths) {
-    try {
-      Map<String, HoodieSavepointPartitionMetadata> partitionMetadata = new 
HashMap<>();
-      List<String> fileNames = paths.stream().map(path -> 
path.substring(path.lastIndexOf("/") + 1)).collect(Collectors.toList());
-      partitionMetadata.put(partition, new 
HoodieSavepointPartitionMetadata(partition, fileNames));
-      HoodieSavepointMetadata savepointMetadata =
-          new HoodieSavepointMetadata("user", 1L, "comments", 
partitionMetadata, 1);
-      return 
TimelineMetadataUtils.serializeSavepointMetadata(savepointMetadata);
-    } catch (IOException ex) {
-      throw new UncheckedIOException(ex);
-    }
+  private static HoodieSavepointMetadata getSavepointBytes(String partition, 
List<String> paths) {
+    Map<String, HoodieSavepointPartitionMetadata> partitionMetadata = new 
HashMap<>();
+    List<String> fileNames = paths.stream().map(path -> 
path.substring(path.lastIndexOf("/") + 1)).collect(Collectors.toList());
+    partitionMetadata.put(partition, new 
HoodieSavepointPartitionMetadata(partition, fileNames));
+    return new HoodieSavepointMetadata("user", 1L, "comments", 
partitionMetadata, 1);
   }
 
-  private static Pair<HoodieCleanMetadata, Option<byte[]>> 
getCleanCommitMetadata(List<String> partitions, String instantTime, String 
earliestCommitToRetain,
+  private static Pair<HoodieCleanMetadata, HoodieCleanMetadata> 
getCleanCommitMetadata(List<String> partitions, String instantTime, String 
earliestCommitToRetain,
                                                                                
   String lastCompletedTime, Set<String> savepointsToTrack, Option<String> 
earliestCommitToNotArchive) {
-    try {
-      Map<String, HoodieCleanPartitionMetadata> partitionMetadata = new 
HashMap<>();
-      partitions.forEach(partition -> partitionMetadata.put(partition, new 
HoodieCleanPartitionMetadata(partition, 
HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name(),
-          Collections.emptyList(), Collections.emptyList(), 
Collections.emptyList(), false)));
-      Map<String, String> extraMetadata = new HashMap<>();
-      extraMetadata.put(SAVEPOINTED_TIMESTAMPS, 
savepointsToTrack.stream().collect(Collectors.joining(",")));
-      HoodieCleanMetadata cleanMetadata = new HoodieCleanMetadata(instantTime, 
100L, 10, earliestCommitToRetain, lastCompletedTime, partitionMetadata,
-          CLEAN_METADATA_VERSION_2, Collections.EMPTY_MAP, 
extraMetadata.isEmpty() ? null : extraMetadata);
-      return Pair.of(cleanMetadata, 
TimelineMetadataUtils.serializeCleanMetadata(cleanMetadata));
-    } catch (IOException ex) {
-      throw new UncheckedIOException(ex);
-    }
+    Map<String, HoodieCleanPartitionMetadata> partitionMetadata = new 
HashMap<>();
+    partitions.forEach(partition -> partitionMetadata.put(partition, new 
HoodieCleanPartitionMetadata(partition, 
HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name(),
+        Collections.emptyList(), Collections.emptyList(), 
Collections.emptyList(), false)));
+    Map<String, String> extraMetadata = new HashMap<>();
+    extraMetadata.put(SAVEPOINTED_TIMESTAMPS, 
savepointsToTrack.stream().collect(Collectors.joining(",")));
+    HoodieCleanMetadata cleanMetadata = new HoodieCleanMetadata(instantTime, 
100L, 10, earliestCommitToRetain, lastCompletedTime, partitionMetadata,
+        CLEAN_METADATA_VERSION_2, Collections.EMPTY_MAP, 
extraMetadata.isEmpty() ? null : extraMetadata);
+    return Pair.of(cleanMetadata, cleanMetadata);
   }
 
-  private static Pair<HoodieSavepointMetadata, Option<byte[]>> 
getSavepointMetadata(List<String> partitions) {
-    try {
-      Map<String, HoodieSavepointPartitionMetadata> partitionMetadata = new 
HashMap<>();
-      partitions.forEach(partition -> partitionMetadata.put(partition, new 
HoodieSavepointPartitionMetadata(partition, Collections.emptyList())));
-      HoodieSavepointMetadata savepointMetadata =
-          new HoodieSavepointMetadata("user", 1L, "comments", 
partitionMetadata, 1);
-      return Pair.of(savepointMetadata, 
TimelineMetadataUtils.serializeSavepointMetadata(savepointMetadata));
-    } catch (IOException ex) {
-      throw new UncheckedIOException(ex);
-    }
+  private static Pair<HoodieSavepointMetadata, HoodieSavepointMetadata> 
getSavepointMetadata(List<String> partitions) {
+    Map<String, HoodieSavepointPartitionMetadata> partitionMetadata = new 
HashMap<>();
+    partitions.forEach(partition -> partitionMetadata.put(partition, new 
HoodieSavepointPartitionMetadata(partition, Collections.emptyList())));
+    HoodieSavepointMetadata savepointMetadata =
+        new HoodieSavepointMetadata("user", 1L, "comments", partitionMetadata, 
1);
+    return Pair.of(savepointMetadata, savepointMetadata);

Review Comment:
   fixed



##########
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestMetadataConversionUtils.java:
##########
@@ -194,8 +197,8 @@ public void testArchivedClean() throws Exception {
     HoodieLSMTimelineInstant archived = 
MetadataConversionUtils.createLSMTimelineInstant(getActiveInstant(newCommitTime),
 metaClient);
     assertEquals(newCommitTime, archived.getInstantTime());
     assertEquals(HoodieTimeline.CLEAN_ACTION, archived.getAction());
-    assertDoesNotThrow(() -> CleanerUtils.getCleanerMetadata(metaClient, 
archived.getMetadata().array()));
-    assertDoesNotThrow(() -> 
TimelineMetadataUtils.deserializeCleanerPlan(archived.getPlan().array()));
+    assertDoesNotThrow(() -> CleanerUtils.getCleanerMetadata(metaClient, new 
ByteArrayInputStream(archived.getMetadata().array())));
+    assertDoesNotThrow(() -> deserializeAvroMetadata(new 
ByteArrayInputStream(archived.getPlan().array()),  HoodieCleanerPlan.class));

Review Comment:
   it has to be kept as is because the byte[] is given, we have to use the low 
level deserialize API instead of going to hoodie timeline.



##########
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/TestCleanPlanner.java:
##########
@@ -587,49 +584,35 @@ private static HoodieFileGroup 
buildFileGroup(List<String> baseFileCommitTimes,
     return group;
   }
 
-  private static Option<byte[]> getSavepointBytes(String partition, 
List<String> paths) {
-    try {
-      Map<String, HoodieSavepointPartitionMetadata> partitionMetadata = new 
HashMap<>();
-      List<String> fileNames = paths.stream().map(path -> 
path.substring(path.lastIndexOf("/") + 1)).collect(Collectors.toList());
-      partitionMetadata.put(partition, new 
HoodieSavepointPartitionMetadata(partition, fileNames));
-      HoodieSavepointMetadata savepointMetadata =
-          new HoodieSavepointMetadata("user", 1L, "comments", 
partitionMetadata, 1);
-      return 
TimelineMetadataUtils.serializeSavepointMetadata(savepointMetadata);
-    } catch (IOException ex) {
-      throw new UncheckedIOException(ex);
-    }
+  private static HoodieSavepointMetadata getSavepointBytes(String partition, 
List<String> paths) {
+    Map<String, HoodieSavepointPartitionMetadata> partitionMetadata = new 
HashMap<>();
+    List<String> fileNames = paths.stream().map(path -> 
path.substring(path.lastIndexOf("/") + 1)).collect(Collectors.toList());
+    partitionMetadata.put(partition, new 
HoodieSavepointPartitionMetadata(partition, fileNames));
+    return new HoodieSavepointMetadata("user", 1L, "comments", 
partitionMetadata, 1);
   }
 
-  private static Pair<HoodieCleanMetadata, Option<byte[]>> 
getCleanCommitMetadata(List<String> partitions, String instantTime, String 
earliestCommitToRetain,
+  private static Pair<HoodieCleanMetadata, HoodieCleanMetadata> 
getCleanCommitMetadata(List<String> partitions, String instantTime, String 
earliestCommitToRetain,
                                                                                
   String lastCompletedTime, Set<String> savepointsToTrack, Option<String> 
earliestCommitToNotArchive) {
-    try {
-      Map<String, HoodieCleanPartitionMetadata> partitionMetadata = new 
HashMap<>();
-      partitions.forEach(partition -> partitionMetadata.put(partition, new 
HoodieCleanPartitionMetadata(partition, 
HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name(),
-          Collections.emptyList(), Collections.emptyList(), 
Collections.emptyList(), false)));
-      Map<String, String> extraMetadata = new HashMap<>();
-      extraMetadata.put(SAVEPOINTED_TIMESTAMPS, 
savepointsToTrack.stream().collect(Collectors.joining(",")));
-      HoodieCleanMetadata cleanMetadata = new HoodieCleanMetadata(instantTime, 
100L, 10, earliestCommitToRetain, lastCompletedTime, partitionMetadata,
-          CLEAN_METADATA_VERSION_2, Collections.EMPTY_MAP, 
extraMetadata.isEmpty() ? null : extraMetadata);
-      return Pair.of(cleanMetadata, 
TimelineMetadataUtils.serializeCleanMetadata(cleanMetadata));
-    } catch (IOException ex) {
-      throw new UncheckedIOException(ex);
-    }
+    Map<String, HoodieCleanPartitionMetadata> partitionMetadata = new 
HashMap<>();
+    partitions.forEach(partition -> partitionMetadata.put(partition, new 
HoodieCleanPartitionMetadata(partition, 
HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name(),
+        Collections.emptyList(), Collections.emptyList(), 
Collections.emptyList(), false)));
+    Map<String, String> extraMetadata = new HashMap<>();
+    extraMetadata.put(SAVEPOINTED_TIMESTAMPS, 
savepointsToTrack.stream().collect(Collectors.joining(",")));
+    HoodieCleanMetadata cleanMetadata = new HoodieCleanMetadata(instantTime, 
100L, 10, earliestCommitToRetain, lastCompletedTime, partitionMetadata,
+        CLEAN_METADATA_VERSION_2, Collections.EMPTY_MAP, 
extraMetadata.isEmpty() ? null : extraMetadata);
+    return Pair.of(cleanMetadata, cleanMetadata);

Review Comment:
   done



##########
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/HoodieWriterClientTestHarness.java:
##########
@@ -933,9 +926,8 @@ protected void testMetadataStatsOnCommit(boolean 
populateMetaFields, Function tr
     metaClient = createMetaClient();
     instant = INSTANT_GENERATOR.createNewInstant(COMPLETED, COMMIT_ACTION, 
instantTime1);
     // Read from commit file
-    metadata = metaClient.getCommitMetadataSerDe().deserialize(instant,
-            metaClient.reloadActiveTimeline().getInstantDetails(instant).get(),
-            HoodieCommitMetadata.class);
+    HoodieActiveTimeline activeTimeline1 = metaClient.reloadActiveTimeline();
+    metadata = activeTimeline1.loadInstantContent(instant, 
HoodieCommitMetadata.class);

Review Comment:
   done



##########
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestMetadataConversionUtils.java:
##########
@@ -244,17 +244,15 @@ public void testArchivedInsertOverwriteWithClustering() 
throws Exception {
     HoodieLSMTimelineInstant archived = 
MetadataConversionUtils.createLSMTimelineInstant(getActiveInstant(newCommitTime),
 metaClient);
     assertEquals(newCommitTime, archived.getInstantTime());
     assertEquals(HoodieTimeline.REPLACE_COMMIT_ACTION, archived.getAction());
-    assertDoesNotThrow(() -> 
HoodieReplaceCommitMetadata.fromBytes(archived.getMetadata().array(), 
HoodieReplaceCommitMetadata.class));
-    assertDoesNotThrow(() -> 
TimelineMetadataUtils.deserializeRequestedReplaceMetadata(archived.getPlan().array()));
+    assertDoesNotThrow(() -> deserializeAvroMetadata(new 
ByteArrayInputStream(archived.getPlan().array()), 
HoodieRequestedReplaceMetadata.class));

Review Comment:
   same



##########
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestMetadataConversionUtils.java:
##########
@@ -244,17 +244,15 @@ public void testArchivedInsertOverwriteWithClustering() 
throws Exception {
     HoodieLSMTimelineInstant archived = 
MetadataConversionUtils.createLSMTimelineInstant(getActiveInstant(newCommitTime),
 metaClient);
     assertEquals(newCommitTime, archived.getInstantTime());
     assertEquals(HoodieTimeline.REPLACE_COMMIT_ACTION, archived.getAction());
-    assertDoesNotThrow(() -> 
HoodieReplaceCommitMetadata.fromBytes(archived.getMetadata().array(), 
HoodieReplaceCommitMetadata.class));
-    assertDoesNotThrow(() -> 
TimelineMetadataUtils.deserializeRequestedReplaceMetadata(archived.getPlan().array()));
+    assertDoesNotThrow(() -> deserializeAvroMetadata(new 
ByteArrayInputStream(archived.getPlan().array()), 
HoodieRequestedReplaceMetadata.class));
 
     String newCommitTime2 = HoodieTestTable.makeNewCommitTime();
     createReplace(newCommitTime2, WriteOperationType.INSERT_OVERWRITE_TABLE, 
true);
     // test conversion to archived instant
     HoodieLSMTimelineInstant archived2 = 
MetadataConversionUtils.createLSMTimelineInstant(getActiveInstant(newCommitTime2),
 metaClient);
     assertEquals(newCommitTime2, archived2.getInstantTime());
     assertEquals(HoodieTimeline.REPLACE_COMMIT_ACTION, archived2.getAction());
-    assertDoesNotThrow(() -> 
HoodieReplaceCommitMetadata.fromBytes(archived2.getMetadata().array(), 
HoodieReplaceCommitMetadata.class));
-    assertDoesNotThrow(() -> 
TimelineMetadataUtils.deserializeRequestedReplaceMetadata(archived2.getPlan().array()));
+    assertDoesNotThrow(() -> deserializeAvroMetadata(new 
ByteArrayInputStream(archived2.getPlan().array()), 
HoodieRequestedReplaceMetadata.class));

Review Comment:
   same



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/MetadataConversionUtils.java:
##########
@@ -395,19 +387,15 @@ public static HoodieReplaceCommitMetadata 
convertReplaceCommitMetadataToPojo(org
   }
 
   /**
-   * Convert commit metadata from avro to json.
+   * Convert replacecommit metadata from avro to pojo.
    */
-  public static <T extends SpecificRecordBase> byte[] 
convertCommitMetadataToJsonBytes(T avroMetaData, Class<T> clazz) {
-    Schema avroSchema = clazz == 
org.apache.hudi.avro.model.HoodieReplaceCommitMetadata.class ? 
org.apache.hudi.avro.model.HoodieReplaceCommitMetadata.getClassSchema() :
-        org.apache.hudi.avro.model.HoodieCommitMetadata.getClassSchema();
-    try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
-      JsonEncoder jsonEncoder = new JsonEncoder(avroSchema, outputStream);
-      DatumWriter<GenericRecord> writer = avroMetaData instanceof 
SpecificRecord ? new SpecificDatumWriter<>(avroSchema) : new 
GenericDatumWriter<>(avroSchema);
-      writer.write(avroMetaData, jsonEncoder);
-      jsonEncoder.flush();
-      return outputStream.toByteArray();
-    } catch (IOException e) {
-      throw new HoodieIOException("Failed to convert to JSON.", e);
+  public static HoodieReplaceCommitMetadata 
convertReplaceCommitMetadataAvroToPojo(org.apache.hudi.avro.model.HoodieReplaceCommitMetadata
 replaceCommitMetadata) {

Review Comment:
   duplicated code, removed.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/MetadataConversionUtils.java:
##########
@@ -173,72 +175,73 @@ public static HoodieArchivedMetaEntry createMetaWrapper(
     archivedMetaWrapper.setActionState(HoodieInstant.State.COMPLETED.name());
     archivedMetaWrapper.setStateTransitionTime(completionTime);
     String actionType = 
lsmTimelineRecord.get(ArchivedTimelineV2.ACTION_ARCHIVED_META_FIELD).toString();
-    HoodieInstant instant = 
metaClient.getInstantGenerator().createNewInstant(HoodieInstant.State.COMPLETED,
 actionType, instantTime, completionTime);
+    HoodieInstant hoodieInstant = 
metaClient.getInstantGenerator().createNewInstant(HoodieInstant.State.COMPLETED,
 actionType, instantTime, completionTime);
     switch (actionType) {
       case HoodieTimeline.CLEAN_ACTION: {
-        
archivedMetaWrapper.setHoodieCleanMetadata(CleanerUtils.getCleanerMetadata(metaClient,
 instantDetails.get()));
-        
archivedMetaWrapper.setHoodieCleanerPlan(CleanerUtils.getCleanerPlan(metaClient,
 planBytes.get()));
+        
archivedMetaWrapper.setHoodieCleanMetadata(CleanerUtils.getCleanerMetadata(metaClient,
 new ByteArrayInputStream(instantDetails.get())));
+        
archivedMetaWrapper.setHoodieCleanerPlan(CleanerUtils.getCleanerPlan(metaClient,
 new ByteArrayInputStream(planBytes.get())));
         archivedMetaWrapper.setActionType(ActionType.clean.name());
         break;
       }
       case HoodieTimeline.COMMIT_ACTION: {
-        HoodieCommitMetadata commitMetadata = 
metaClient.getCommitMetadataSerDe().deserialize(instant, instantDetails.get(), 
HoodieCommitMetadata.class);
-        
archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadata(commitMetadata));
+        getCommitMetadata(metaClient, hoodieInstant, 
HoodieCommitMetadata.class)
+            .ifPresent(commitMetadata -> 
archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadata(commitMetadata)));
         archivedMetaWrapper.setActionType(ActionType.commit.name());
 
         if (planBytes.isPresent()) {
           // this should be a compaction
-          HoodieCompactionPlan plan = 
CompactionUtils.getCompactionPlan(metaClient, planBytes);
+          HoodieCompactionPlan plan = 
CompactionUtils.getCompactionPlan(metaClient, new 
ByteArrayInputStream(planBytes.get()));
           archivedMetaWrapper.setHoodieCompactionPlan(plan);
         }
         break;
       }
       case HoodieTimeline.DELTA_COMMIT_ACTION: {
-        HoodieCommitMetadata deltaCommitMetadata = 
metaClient.getCommitMetadataSerDe().deserialize(instant, instantDetails.get(), 
HoodieCommitMetadata.class);
-        
archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadata(deltaCommitMetadata));
+        getCommitMetadata(metaClient, hoodieInstant, 
HoodieCommitMetadata.class)
+            .ifPresent(commitMetadata -> 
archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadata(commitMetadata)));
         archivedMetaWrapper.setActionType(ActionType.deltacommit.name());
 
         if (planBytes.isPresent()) {
           // this should be a log compaction
-          HoodieCompactionPlan plan = 
CompactionUtils.getCompactionPlan(metaClient, planBytes);
+          HoodieCompactionPlan plan = 
CompactionUtils.getCompactionPlan(metaClient, new 
ByteArrayInputStream(planBytes.get()));
           archivedMetaWrapper.setHoodieCompactionPlan(plan);
         }
         break;
       }
       case HoodieTimeline.REPLACE_COMMIT_ACTION:
       case HoodieTimeline.CLUSTERING_ACTION: {
-        HoodieReplaceCommitMetadata replaceCommitMetadata = 
HoodieReplaceCommitMetadata.fromBytes(instantDetails.get(), 
HoodieReplaceCommitMetadata.class);
-        
archivedMetaWrapper.setHoodieReplaceCommitMetadata(convertReplaceCommitMetadataToAvro(replaceCommitMetadata));
+        getCommitMetadata(metaClient, hoodieInstant, 
HoodieReplaceCommitMetadata.class)
+            .ifPresent(replaceCommitMetadata -> 
archivedMetaWrapper.setHoodieReplaceCommitMetadata(convertReplaceCommitMetadataToAvro(replaceCommitMetadata)));
 
         // inflight replacecommit files have the same metadata body as 
HoodieCommitMetadata
         // so we could re-use it without further creating an inflight 
extension.
         // Or inflight replacecommit files are empty under clustering 
circumstance
-        Option<HoodieCommitMetadata> inflightCommitMetadata = 
getInflightCommitMetadata(metaClient, instant, instantDetails);
+        Option<HoodieCommitMetadata> inflightCommitMetadata = 
getCommitMetadata(metaClient, hoodieInstant, HoodieCommitMetadata.class);
         if (inflightCommitMetadata.isPresent()) {
           
archivedMetaWrapper.setHoodieInflightReplaceMetadata(convertCommitMetadata(inflightCommitMetadata.get()));
         }
-        archivedMetaWrapper.setActionType(ActionType.replacecommit.name());
+        archivedMetaWrapper.setActionType(
+            
hoodieInstant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION) ? 
ActionType.replacecommit.name() : ActionType.clustering.name());

Review Comment:
   done. we discussed offline and figured out the right way



##########
hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java:
##########
@@ -107,21 +109,18 @@ public static HoodieCleanMetadata 
convertCleanMetadata(String startCleanTime,
    */
   public static HoodieCleanMetadata getCleanerMetadata(HoodieTableMetaClient 
metaClient, HoodieInstant cleanInstant)
       throws IOException {
-    CleanMetadataMigrator metadataMigrator = new 
CleanMetadataMigrator(metaClient);
-    HoodieCleanMetadata cleanMetadata = 
TimelineMetadataUtils.deserializeHoodieCleanMetadata(
-        
metaClient.getActiveTimeline().readCleanerInfoAsBytes(cleanInstant).get());
-    return metadataMigrator.upgradeToLatest(cleanMetadata, 
cleanMetadata.getVersion());
+    HoodieCleanMetadata cleanMetadata = 
metaClient.getActiveTimeline().loadHoodieCleanMetadata(cleanInstant);
+    return upgradeCleanMetadata(metaClient, cleanMetadata);
   }
 
-  /**
-   * Get Latest Version of Hoodie Cleaner Metadata - Output of cleaner 
operation.
-   * @return Latest version of Clean metadata corresponding to clean instant
-   * @throws IOException
-   */
-  public static HoodieCleanMetadata getCleanerMetadata(HoodieTableMetaClient 
metaClient, byte[] details)
+  public static HoodieCleanMetadata getCleanerMetadata(HoodieTableMetaClient 
metaClient, InputStream inputStream)
       throws IOException {
+    HoodieCleanMetadata cleanMetadata = deserializeAvroMetadata(inputStream, 
HoodieCleanMetadata.class);
+    return upgradeCleanMetadata(metaClient, cleanMetadata);
+  }

Review Comment:
   we need this as some caller hand craft cleaner metadata bytes instead of 
reading from timeline, we need to expose API to handle this usage



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v2/ArchivedTimelineV2.java:
##########
@@ -156,7 +155,11 @@ public Option<byte[]> getInstantDetails(HoodieInstant 
instant) {
 
   @Override
   public InputStream getContentStream(HoodieInstant instant) {
-    return new ByteArrayInputStream(getInstantDetails(instant).orElseGet(() -> 
new byte[0]));
+    Option<InputStream> stream = getInputStreamOptionLegacy(this, instant);
+    if (stream.isEmpty()) {
+      return new ByteArrayInputStream(new byte[]{});
+    }
+    return stream.get();

Review Comment:
   requires consumer code change, like for archived timeline we don't do input 
stream as it is different logic



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ArchivedTimelineV1.java:
##########
@@ -166,7 +168,11 @@ public Option<byte[]> getInstantDetails(HoodieInstant 
instant) {
 
   @Override
   public InputStream getContentStream(HoodieInstant instant) {
-    return new ByteArrayInputStream(getInstantDetails(instant).orElseGet(() -> 
new byte[0]));
+    Option<InputStream> stream = getInputStreamOptionLegacy(this, instant);
+    if (stream.isEmpty()) {
+      return new ByteArrayInputStream(new byte[]{});
+    }
+    return stream.get();

Review Comment:
   no, we need to keep this the same behavior as day 1 as archival timeline 
also use it. for archive timeline we need extra effort to support stream IO



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/MetadataConversionUtils.java:
##########
@@ -332,24 +335,13 @@ public static HoodieArchivedMetaEntry 
createMetaWrapperForEmptyInstant(HoodieIns
     return archivedMetaWrapper;
   }
 
-  private static Option<HoodieCommitMetadata> 
getInflightCommitMetadata(HoodieTableMetaClient metaClient, HoodieInstant 
instant,
-                                                                        
Option<byte[]> inflightContent) throws IOException {
-    if (!inflightContent.isPresent() || inflightContent.get().length == 0) {
-      // inflight files can be empty in some certain cases, e.g. when users 
opt in clustering
+  private static <T extends HoodieCommitMetadata> Option<T> 
getCommitMetadata(HoodieTableMetaClient metaClient, HoodieInstant instant, 
Class<T> clazz) throws IOException {
+    T commitMetadata = 
metaClient.getActiveTimeline().loadInstantContent(instant, clazz);
+    // an empty file will return the default instance with an UNKNOWN 
operation type and in that case we return an empty option
+    if (commitMetadata.getOperationType() == WriteOperationType.UNKNOWN) {

Review Comment:
   in real world we should not have - a valid write operation should always set 
valid op type, test can inject this.



##########
hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java:
##########
@@ -169,22 +168,14 @@ public static Option<HoodieInstant> 
getEarliestCommitToRetain(
   public static HoodieCleanerPlan getCleanerPlan(HoodieTableMetaClient 
metaClient, HoodieInstant cleanInstant)
       throws IOException {
     CleanPlanMigrator cleanPlanMigrator = new CleanPlanMigrator(metaClient);
-    HoodieCleanerPlan cleanerPlan = 
TimelineMetadataUtils.deserializeAvroMetadata(
-        
metaClient.getActiveTimeline().readCleanerInfoAsBytes(cleanInstant).get(), 
HoodieCleanerPlan.class);
+    HoodieCleanerPlan cleanerPlan = 
metaClient.getActiveTimeline().loadCleanerPlan(cleanInstant);
     return cleanPlanMigrator.upgradeToLatest(cleanerPlan, 
cleanerPlan.getVersion());
   }
 
-  /**
-   * Get Latest version of cleaner plan corresponding to a clean instant.
-   *
-   * @param metaClient   Hoodie Table Meta Client
-   * @return Cleaner plan corresponding to clean instant
-   * @throws IOException
-   */
-  public static HoodieCleanerPlan getCleanerPlan(HoodieTableMetaClient 
metaClient, byte[] details)
+  public static HoodieCleanerPlan getCleanerPlan(HoodieTableMetaClient 
metaClient, InputStream in)

Review Comment:
   same, some caller craft arbitrary metadata and get bytes for 
deserialization, which requires this API. they do not read from timeline



##########
hudi-common/src/test/java/org/apache/hudi/common/table/timeline/versioning/BaseTestCommitMetadataSerDe.java:
##########
@@ -72,7 +85,7 @@ protected void testEmptyMetadataSerDe() throws Exception {
     assertTrue(serialized.isPresent());
 
     // Deserialize
-    HoodieCommitMetadata deserialized = serDe.deserialize(instant, 
serialized.get(), HoodieCommitMetadata.class);
+    HoodieCommitMetadata deserialized = serDe.deserialize(instant, new 
ByteArrayInputStream(serialized.get()), () -> true, HoodieCommitMetadata.class);

Review Comment:
   done



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java:
##########
@@ -244,13 +242,11 @@ private boolean isAnySavepointDeleted(HoodieCleanMetadata 
cleanMetadata) {
   private Stream<String> getPartitionsForInstants(HoodieInstant instant) {
     try {
       if (HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instant.getAction())) {
-        HoodieReplaceCommitMetadata replaceCommitMetadata = 
HoodieReplaceCommitMetadata.fromBytes(
-            hoodieTable.getActiveTimeline().getInstantDetails(instant).get(), 
HoodieReplaceCommitMetadata.class);
+        TimelineLayout layout = 
TimelineLayout.fromVersion(hoodieTable.getMetaClient().getTimelineLayoutVersion());

Review Comment:
   removed



##########
hudi-common/src/test/java/org/apache/hudi/common/table/timeline/versioning/BaseTestCommitMetadataSerDe.java:
##########
@@ -178,7 +289,7 @@ private HoodieWriteStat createTestWriteStat() {
     return writeStat;
   }
 
-  private void verifyCommitMetadata(HoodieCommitMetadata metadata) {
+  public void verifyCommitMetadata(HoodieCommitMetadata metadata) {

Review Comment:
   done



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java:
##########
@@ -189,18 +188,17 @@ protected Option<HoodieCleanerPlan> requestClean(String 
startCleanTime) {
     if (lastClean.isPresent()) {
       HoodieInstant cleanInstant = lastClean.get();
       HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
+      HoodieInstant cleanPlanInstant = new 
HoodieInstant(HoodieInstant.State.INFLIGHT, cleanInstant.getAction(), 
cleanInstant.requestedTime(), 
InstantComparatorV1.REQUESTED_TIME_BASED_COMPARATOR);

Review Comment:
   done



##########
hudi-common/src/test/java/org/apache/hudi/common/table/timeline/versioning/BaseTestCommitMetadataSerDe.java:
##########
@@ -194,7 +305,7 @@ private void 
verifyReplaceCommitMetadata(HoodieReplaceCommitMetadata metadata) {
     assertEquals("test-value-2", 
metadata.getExtraMetadata().get("test-key-2"));
   }
 
-  private void verifyWriteStat(HoodieWriteStat stat) {
+  public void verifyWriteStat(HoodieWriteStat stat) {

Review Comment:
   done



##########
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestMetadataConversionUtils.java:
##########
@@ -206,8 +209,7 @@ public void testArchivedReplace() throws Exception {
     HoodieLSMTimelineInstant archived = 
MetadataConversionUtils.createLSMTimelineInstant(getActiveInstant(newCommitTime),
 metaClient);
     assertEquals(newCommitTime, archived.getInstantTime());
     assertEquals(HoodieTimeline.REPLACE_COMMIT_ACTION, archived.getAction());
-    assertDoesNotThrow(() -> 
HoodieReplaceCommitMetadata.fromBytes(archived.getMetadata().array(), 
HoodieReplaceCommitMetadata.class));
-    assertDoesNotThrow(() -> 
TimelineMetadataUtils.deserializeRequestedReplaceMetadata(archived.getPlan().array()));
+    assertDoesNotThrow(() -> deserializeAvroMetadata(new 
ByteArrayInputStream(archived.getPlan().array()), 
HoodieRequestedReplaceMetadata.class));

Review Comment:
   HoodieReplaceCommitMetadata.fromBytes should not be used at all, so this 
coverage is not needed



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java:
##########
@@ -38,11 +38,12 @@
 import org.apache.hudi.exception.HoodieTimeTravelException;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
-

Review Comment:
   done



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/CommitMetadataSerDeV1.java:
##########
@@ -24,30 +24,41 @@
 import org.apache.hudi.common.util.JsonUtils;
 import org.apache.hudi.common.util.Option;
 
+import org.apache.avro.specific.SpecificRecordBase;
+
 import java.io.IOException;
+import java.io.InputStream;
+import java.util.function.BooleanSupplier;
 
-import static org.apache.hudi.common.util.StringUtils.fromUTF8Bytes;
+import static 
org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeAvroMetadata;
 
 public class CommitMetadataSerDeV1 implements CommitMetadataSerDe {
 
   @Override
-  public <T> T deserialize(HoodieInstant instant, byte[] bytes, Class<T> 
clazz) throws IOException {
+  public <T> T deserialize(HoodieInstant instant, InputStream inputStream, 
BooleanSupplier isEmptyInstant, Class<T> clazz) throws IOException {
     try {
-      if (bytes.length == 0) {
-        return clazz.newInstance();
+      // For commit metadata we need special case handling as they are using 
non avro type in memory.

Review Comment:
   done



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/MetadataConversionUtils.java:
##########
@@ -56,100 +52,106 @@
  */
 public class MetadataConversionUtils {
 
-  public static HoodieArchivedMetaEntry createMetaWrapper(HoodieInstant 
hoodieInstant, HoodieTableMetaClient metaClient) throws IOException {
-    Option<byte[]> instantDetails = 
metaClient.getActiveTimeline().getInstantDetails(hoodieInstant);
-    if (hoodieInstant.isCompleted() && instantDetails.get().length == 0) {
-      // in local FS and HDFS, there could be empty completed instants due to 
crash.
-      // let's add an entry to the archival, even if not for the plan.
-      return createMetaWrapperForEmptyInstant(hoodieInstant);
-    }
-    HoodieArchivedMetaEntry archivedMetaWrapper = new 
HoodieArchivedMetaEntry();
-    archivedMetaWrapper.setCommitTime(hoodieInstant.requestedTime());
-    archivedMetaWrapper.setActionState(hoodieInstant.getState().name());
-    
archivedMetaWrapper.setStateTransitionTime(hoodieInstant.getCompletionTime());
-    switch (hoodieInstant.getAction()) {
-      case HoodieTimeline.CLEAN_ACTION: {
-        if (hoodieInstant.isCompleted()) {
-          
archivedMetaWrapper.setHoodieCleanMetadata(CleanerUtils.getCleanerMetadata(metaClient,
 instantDetails.get()));
-        } else {
-          
archivedMetaWrapper.setHoodieCleanerPlan(CleanerUtils.getCleanerPlan(metaClient,
 instantDetails.get()));
+  public static HoodieArchivedMetaEntry createMetaWrapper(HoodieInstant 
hoodieInstant, HoodieTableMetaClient metaClient) {
+    try {
+      HoodieArchivedMetaEntry archivedMetaWrapper = new 
HoodieArchivedMetaEntry();
+      archivedMetaWrapper.setCommitTime(hoodieInstant.requestedTime());
+      archivedMetaWrapper.setActionState(hoodieInstant.getState().name());
+      
archivedMetaWrapper.setStateTransitionTime(hoodieInstant.getCompletionTime());
+      CommitMetadataSerDe serDe = metaClient.getCommitMetadataSerDe();
+      switch (hoodieInstant.getAction()) {
+        case HoodieTimeline.CLEAN_ACTION: {
+          if (hoodieInstant.isCompleted()) {
+            
archivedMetaWrapper.setHoodieCleanMetadata(CleanerUtils.getCleanerMetadata(metaClient,
 hoodieInstant));
+          } else {
+            
archivedMetaWrapper.setHoodieCleanerPlan(CleanerUtils.getCleanerPlan(metaClient,
 hoodieInstant));
+          }
+          archivedMetaWrapper.setActionType(ActionType.clean.name());
+          break;
         }
-        archivedMetaWrapper.setActionType(ActionType.clean.name());
-        break;
-      }
-      case HoodieTimeline.COMMIT_ACTION: {
-        HoodieCommitMetadata commitMetadata = 
metaClient.getCommitMetadataSerDe().deserialize(hoodieInstant, 
instantDetails.get(), HoodieCommitMetadata.class);
-        
archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadata(commitMetadata));
-        archivedMetaWrapper.setActionType(ActionType.commit.name());
-        break;
-      }
-      case HoodieTimeline.DELTA_COMMIT_ACTION: {
-        HoodieCommitMetadata deltaCommitMetadata = 
metaClient.getCommitMetadataSerDe().deserialize(hoodieInstant, 
instantDetails.get(), HoodieCommitMetadata.class);
-        
archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadata(deltaCommitMetadata));
-        archivedMetaWrapper.setActionType(ActionType.deltacommit.name());
-        break;
-      }
-      case HoodieTimeline.REPLACE_COMMIT_ACTION:
-      case HoodieTimeline.CLUSTERING_ACTION: {
-        if (hoodieInstant.isCompleted()) {
-          HoodieReplaceCommitMetadata replaceCommitMetadata = 
HoodieReplaceCommitMetadata.fromBytes(instantDetails.get(), 
HoodieReplaceCommitMetadata.class);
-          
archivedMetaWrapper.setHoodieReplaceCommitMetadata(convertReplaceCommitMetadataToAvro(replaceCommitMetadata));
-        } else if (hoodieInstant.isInflight()) {
-          // inflight replacecommit files have the same metadata body as 
HoodieCommitMetadata
-          // so we could re-use it without further creating an inflight 
extension.
-          // Or inflight replacecommit files are empty under clustering 
circumstance
-          Option<HoodieCommitMetadata> inflightCommitMetadata = 
getInflightCommitMetadata(metaClient, hoodieInstant, instantDetails);
-          if (inflightCommitMetadata.isPresent()) {
-            
archivedMetaWrapper.setHoodieInflightReplaceMetadata(convertCommitMetadata(inflightCommitMetadata.get()));
+        case HoodieTimeline.COMMIT_ACTION: {
+          getCommitMetadata(metaClient, hoodieInstant, 
HoodieCommitMetadata.class)
+              .ifPresent(commitMetadata -> 
archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadata(commitMetadata)));
+          archivedMetaWrapper.setActionType(ActionType.commit.name());
+          break;
+        }
+        case HoodieTimeline.DELTA_COMMIT_ACTION: {
+          getCommitMetadata(metaClient, hoodieInstant, 
HoodieCommitMetadata.class)
+              .ifPresent(deltaCommitMetadata -> 
archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadata(deltaCommitMetadata)));
+          archivedMetaWrapper.setActionType(ActionType.deltacommit.name());
+          break;
+        }
+        case HoodieTimeline.REPLACE_COMMIT_ACTION:
+        case HoodieTimeline.CLUSTERING_ACTION: {
+          if (hoodieInstant.isCompleted()) {
+            getCommitMetadata(metaClient, hoodieInstant, 
HoodieReplaceCommitMetadata.class)
+                .ifPresent(replaceCommitMetadata -> 
archivedMetaWrapper.setHoodieReplaceCommitMetadata(convertReplaceCommitMetadataToAvro(replaceCommitMetadata)));
+          } else if (hoodieInstant.isInflight()) {
+            // inflight replacecommit files have the same metadata body as 
HoodieCommitMetadata
+            // so we could re-use it without further creating an inflight 
extension.
+            // Or inflight replacecommit files are empty under clustering 
circumstance
+            Option<HoodieCommitMetadata> inflightCommitMetadata = 
getCommitMetadata(metaClient, hoodieInstant, HoodieCommitMetadata.class);
+            if (inflightCommitMetadata.isPresent()) {
+              
archivedMetaWrapper.setHoodieInflightReplaceMetadata(convertCommitMetadata(inflightCommitMetadata.get()));
+            }
+          } else {
+            // we may have cases with empty HoodieRequestedReplaceMetadata 
e.g. insert_overwrite_table or insert_overwrite
+            // without clustering. However, we should revisit the requested 
commit file standardization
+            Option<HoodieRequestedReplaceMetadata> requestedReplaceMetadata = 
Option.of(metaClient.getActiveTimeline()
+                .loadRequestedReplaceMetadata(hoodieInstant));
+            if (requestedReplaceMetadata.isPresent()) {
+              
archivedMetaWrapper.setHoodieRequestedReplaceMetadata(requestedReplaceMetadata.get());
+            }
           }
-        } else {
-          // we may have cases with empty HoodieRequestedReplaceMetadata e.g. 
insert_overwrite_table or insert_overwrite
-          // without clustering. However, we should revisit the requested 
commit file standardization
-          Option<HoodieRequestedReplaceMetadata> requestedReplaceMetadata = 
getRequestedReplaceMetadata(instantDetails);
-          if (requestedReplaceMetadata.isPresent()) {
-            
archivedMetaWrapper.setHoodieRequestedReplaceMetadata(requestedReplaceMetadata.get());
+          archivedMetaWrapper.setActionType(
+              
hoodieInstant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION) ? 
ActionType.replacecommit.name() : ActionType.clustering.name());
+          break;
+        }
+        case HoodieTimeline.ROLLBACK_ACTION: {
+          if (hoodieInstant.isCompleted()) {
+            
archivedMetaWrapper.setHoodieRollbackMetadata(metaClient.getActiveTimeline().loadInstantContent(hoodieInstant,
 HoodieRollbackMetadata.class));
           }
+          archivedMetaWrapper.setActionType(ActionType.rollback.name());
+          break;
         }
-        archivedMetaWrapper.setActionType(
-            
hoodieInstant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION) ? 
ActionType.replacecommit.name() : ActionType.clustering.name());
-        break;
-      }
-      case HoodieTimeline.ROLLBACK_ACTION: {
-        if (hoodieInstant.isCompleted()) {
-          
archivedMetaWrapper.setHoodieRollbackMetadata(TimelineMetadataUtils.deserializeAvroMetadata(instantDetails.get(),
 HoodieRollbackMetadata.class));
+        case HoodieTimeline.SAVEPOINT_ACTION: {
+          
archivedMetaWrapper.setHoodieSavePointMetadata(metaClient.getActiveTimeline().loadInstantContent(hoodieInstant,
 HoodieSavepointMetadata.class));
+          archivedMetaWrapper.setActionType(ActionType.savepoint.name());
+          break;
         }
-        archivedMetaWrapper.setActionType(ActionType.rollback.name());
-        break;
-      }
-      case HoodieTimeline.SAVEPOINT_ACTION: {
-        
archivedMetaWrapper.setHoodieSavePointMetadata(TimelineMetadataUtils.deserializeAvroMetadata(instantDetails.get(),
 HoodieSavepointMetadata.class));
-        archivedMetaWrapper.setActionType(ActionType.savepoint.name());
-        break;
-      }
-      case HoodieTimeline.COMPACTION_ACTION: {
-        if (hoodieInstant.isRequested()) {
-          HoodieCompactionPlan plan = 
CompactionUtils.getCompactionPlan(metaClient, instantDetails);
-          archivedMetaWrapper.setHoodieCompactionPlan(plan);
+        case HoodieTimeline.COMPACTION_ACTION: {
+          if (hoodieInstant.isRequested()) {
+            HoodieCompactionPlan plan = 
CompactionUtils.getCompactionPlan(metaClient, hoodieInstant);
+            archivedMetaWrapper.setHoodieCompactionPlan(plan);
+          }
+          archivedMetaWrapper.setActionType(ActionType.compaction.name());
+          break;
         }
-        archivedMetaWrapper.setActionType(ActionType.compaction.name());
-        break;
-      }
-      case HoodieTimeline.LOG_COMPACTION_ACTION: {
-        if (hoodieInstant.isRequested()) {
-          HoodieCompactionPlan plan = 
CompactionUtils.getCompactionPlan(metaClient, instantDetails);
-          archivedMetaWrapper.setHoodieCompactionPlan(plan);
+        case HoodieTimeline.LOG_COMPACTION_ACTION: {
+          if (hoodieInstant.isRequested()) {
+            HoodieCompactionPlan plan = 
CompactionUtils.getCompactionPlan(metaClient, hoodieInstant);
+            archivedMetaWrapper.setHoodieCompactionPlan(plan);
+          }
+          archivedMetaWrapper.setActionType(ActionType.logcompaction.name());
+          break;
+        }
+        default: {
+          throw new UnsupportedOperationException("Action not fully supported 
yet");
         }
-        archivedMetaWrapper.setActionType(ActionType.logcompaction.name());
-        break;
       }
-      default: {
-        throw new UnsupportedOperationException("Action not fully supported 
yet");
+      return archivedMetaWrapper;
+    } catch (IOException | HoodieIOException ex) {
+      if (metaClient.getActiveTimeline().isEmpty(hoodieInstant)) {
+        // in local FS and HDFS, there could be empty completed instants due 
to crash.
+        // let's add an entry to the archival, even if not for the plan.
+        return createMetaWrapperForEmptyInstant(hoodieInstant);
       }
+      throw new HoodieException(ex);
     }
-    return archivedMetaWrapper;
   }
 
   /**
+   * TODO(reviewers) - new code applied similar refactoring, please pay close 
attention.

Review Comment:
   done



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v2/ArchivedTimelineV2.java:
##########
@@ -249,4 +251,9 @@ public HoodieTimeline getWriteTimeline() {
             readCommits.containsKey(i.requestedTime()))
         .filter(s -> validActions.contains(s.getAction())), instantReader);
   }
+
+  @Override
+  public boolean isEmpty(HoodieInstant instant) {
+    return getInstantDetails(instant).isEmpty();

Review Comment:
   no, archived timeline we need extra effort to support input stream



##########
hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieCommitMetadata.java:
##########
@@ -168,7 +169,7 @@ public void testCommitMetadataSerde() throws Exception {
     HoodieInstant instant = 
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, "commit", 
"1");
     org.apache.hudi.common.model.HoodieCommitMetadata commitMetadata1 =
         COMMIT_METADATA_SER_DE.deserialize(instant,
-            serializedCommitMetadata, 
org.apache.hudi.common.model.HoodieCommitMetadata.class);
+            new ByteArrayInputStream(serializedCommitMetadata), () -> true, 
org.apache.hudi.common.model.HoodieCommitMetadata.class);

Review Comment:
   done



##########
hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java:
##########
@@ -169,22 +168,14 @@ public static Option<HoodieInstant> 
getEarliestCommitToRetain(
   public static HoodieCleanerPlan getCleanerPlan(HoodieTableMetaClient 
metaClient, HoodieInstant cleanInstant)
       throws IOException {
     CleanPlanMigrator cleanPlanMigrator = new CleanPlanMigrator(metaClient);
-    HoodieCleanerPlan cleanerPlan = 
TimelineMetadataUtils.deserializeAvroMetadata(
-        
metaClient.getActiveTimeline().readCleanerInfoAsBytes(cleanInstant).get(), 
HoodieCleanerPlan.class);
+    HoodieCleanerPlan cleanerPlan = 
metaClient.getActiveTimeline().loadCleanerPlan(cleanInstant);
     return cleanPlanMigrator.upgradeToLatest(cleanerPlan, 
cleanerPlan.getVersion());
   }
 
-  /**
-   * Get Latest version of cleaner plan corresponding to a clean instant.
-   *
-   * @param metaClient   Hoodie Table Meta Client
-   * @return Cleaner plan corresponding to clean instant
-   * @throws IOException
-   */
-  public static HoodieCleanerPlan getCleanerPlan(HoodieTableMetaClient 
metaClient, byte[] details)
+  public static HoodieCleanerPlan getCleanerPlan(HoodieTableMetaClient 
metaClient, InputStream in)
       throws IOException {
     CleanPlanMigrator cleanPlanMigrator = new CleanPlanMigrator(metaClient);
-    HoodieCleanerPlan cleanerPlan = 
TimelineMetadataUtils.deserializeAvroMetadata(details, HoodieCleanerPlan.class);
+    HoodieCleanerPlan cleanerPlan = 
TimelineMetadataUtils.deserializeAvroMetadata(in, HoodieCleanerPlan.class);

Review Comment:
   removed unused function



##########
hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java:
##########
@@ -190,22 +189,47 @@ private static Option<HoodieRequestedReplaceMetadata> 
getRequestedReplaceMetadat
       String action = factory instanceof InstantGeneratorV2 ? 
HoodieTimeline.CLUSTERING_ACTION : HoodieTimeline.REPLACE_COMMIT_ACTION;
       requestedInstant = 
factory.createNewInstant(HoodieInstant.State.REQUESTED, action, 
pendingReplaceOrClusterInstant.requestedTime());
     }
-    Option<byte[]> content = Option.empty();
     try {
-      content = timeline.getInstantDetails(requestedInstant);
+      // First assume the instant file is not empty and parse it.
+      return getHoodieRequestedReplaceMetadataOption(timeline, 
pendingReplaceOrClusterInstant, factory, requestedInstant);
+    } catch (Exception ex) {
+      // If anything goes wrong, check if this is empty file.
+      if (isEmptyReplaceOrClusteringInstant(timeline, 
pendingReplaceOrClusterInstant, factory, requestedInstant)) {
+        return Option.empty();
+      }
+      // If still no luck, throw the exception.
+      throw ex;
+    }
+  }
+
+  private static Option<HoodieRequestedReplaceMetadata> 
getHoodieRequestedReplaceMetadataOption(

Review Comment:
   done



##########
hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieCommitMetadata.java:
##########
@@ -182,7 +183,7 @@ public void testCommitMetadataSerde() throws Exception {
     byte[] v1Bytes = v1SerDe.serialize(commitMetadata1).get();
     System.out.println(new String(v1Bytes));
     org.apache.hudi.common.model.HoodieCommitMetadata commitMetadata2 =
-        COMMIT_METADATA_SER_DE.deserialize(legacyInstant, v1Bytes, 
org.apache.hudi.common.model.HoodieCommitMetadata.class);
+        COMMIT_METADATA_SER_DE.deserialize(legacyInstant, new 
ByteArrayInputStream(v1Bytes), () -> true, 
org.apache.hudi.common.model.HoodieCommitMetadata.class);

Review Comment:
   done



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/ConsistentBucketAssignFunction.java:
##########
@@ -135,9 +136,9 @@ private ConsistentBucketIdentifier 
getBucketIdentifier(String partition) {
   public void snapshotState(FunctionSnapshotContext functionSnapshotContext) 
throws Exception {
     HoodieTimeline timeline = 
writeClient.getHoodieTable().getActiveTimeline().getCompletedReplaceTimeline().findInstantsAfter(lastRefreshInstant);
     if (!timeline.empty()) {
+      TimelineLayout layout = 
TimelineLayout.fromVersion(timeline.getTimelineLayoutVersion());

Review Comment:
   removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to