This is an automated email from the ASF dual-hosted git repository.

vhs pushed a commit to branch release-1.0.2
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 224c6a3b2ac48336ce39ffc542ca1d4e92f03c7d
Author: Lokesh Jain <[email protected]>
AuthorDate: Thu Apr 10 06:30:53 2025 +0530

    [HUDI-9263] Archived timeline downgrade fails with 
EightToSevenDowngradeHandler (#13098)
    
    ---------
    
    Co-authored-by: sivabalan <[email protected]>
    (cherry picked from commit ea5b3c346d803067f7cdde03f63b6f8e3998b876)
---
 .../apache/hudi/io/TestHoodieTimelineArchiver.java | 112 +++++++++++++++++++--
 .../table/timeline/MetadataConversionUtils.java    |  37 +++++--
 2 files changed, 127 insertions(+), 22 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
index 2a12a34b7ce..2cab3c4736a 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
@@ -35,6 +35,7 @@ import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.WriteConcurrencyMode;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -43,6 +44,7 @@ import 
org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.LSMTimeline;
 import org.apache.hudi.common.table.timeline.TimelineUtils;
+import org.apache.hudi.common.table.timeline.versioning.v2.InstantComparatorV2;
 import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.common.testutils.FileCreateUtilsLegacy;
 import org.apache.hudi.common.testutils.HoodieMetadataTestTable;
@@ -70,6 +72,8 @@ import org.apache.hudi.storage.HoodieInstantWriter;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.HoodieSparkTable;
 import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.upgrade.SparkUpgradeDowngradeHelper;
+import org.apache.hudi.table.upgrade.UpgradeDowngrade;
 import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
 
 import org.junit.jupiter.api.AfterEach;
@@ -110,6 +114,11 @@ import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
 import static org.apache.hudi.HoodieTestCommitGenerator.getBaseFilename;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.CLEAN_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.ROLLBACK_ACTION;
 import static 
org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN;
 import static 
org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps;
 import static 
org.apache.hudi.common.table.timeline.MetadataConversionUtils.convertCommitMetadataToAvro;
@@ -566,7 +575,7 @@ public class TestHoodieTimelineArchiver extends 
HoodieSparkClientTestHarness {
     expectedActiveInstants = 
getActiveCommitInstants(Arrays.asList("00000005"));
     
expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000003", 
"00000004", "00000006", "00000007"), HoodieTimeline.REPLACE_COMMIT_ACTION));
     
expectedActiveInstants.addAll(getActiveSavepointedCommitInstants(Arrays.asList("00000003")));
-    
expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000008"),
 HoodieTimeline.CLEAN_ACTION));
+    
expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000008"),
 CLEAN_ACTION));
     verifyArchival(getAllArchivedCommitInstants(Arrays.asList("00000001", 
"00000002")),
         expectedActiveInstants, commitsAfterArchival, false);
 
@@ -581,7 +590,7 @@ public class TestHoodieTimelineArchiver extends 
HoodieSparkClientTestHarness {
       // retains the 2 commits - C3 and C7. Since minInstantsToKeep is 2, c3 
is retained. Archival is now blocked at
       // c7 since that is the replace commit after earliest savepoint c7 in 
cleaner
       expectedActiveInstants = 
getActiveCommitInstants(Arrays.asList("00000003", "00000007"), 
HoodieTimeline.REPLACE_COMMIT_ACTION);
-      
expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000008", 
"00000009"), HoodieTimeline.CLEAN_ACTION));
+      
expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000008", 
"00000009"), CLEAN_ACTION));
       
expectedActiveInstants.addAll(getActiveSavepointedCommitInstants(Arrays.asList("00000003")));
       List<HoodieInstant> archivedCommitInstants = 
getAllArchivedCommitInstants(Arrays.asList("00000001", "00000002", "00000005"));
       
archivedCommitInstants.addAll(getAllArchivedCommitInstants(Arrays.asList("00000004",
 "00000006"), HoodieTimeline.REPLACE_COMMIT_ACTION));
@@ -591,7 +600,7 @@ public class TestHoodieTimelineArchiver extends 
HoodieSparkClientTestHarness {
       expectedActiveInstants = 
getActiveCommitInstants(Arrays.asList("00000005"));
       
expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000003", 
"00000004", "00000006", "00000007"), HoodieTimeline.REPLACE_COMMIT_ACTION));
       
expectedActiveInstants.addAll(getActiveSavepointedCommitInstants(Arrays.asList("00000003")));
-      
expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000008", 
"00000009"), HoodieTimeline.CLEAN_ACTION));
+      
expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000008", 
"00000009"), CLEAN_ACTION));
       verifyArchival(getAllArchivedCommitInstants(Arrays.asList("00000001", 
"00000002")),
           expectedActiveInstants, commitsAfterArchival, false);
     }
@@ -606,7 +615,7 @@ public class TestHoodieTimelineArchiver extends 
HoodieSparkClientTestHarness {
     if (archiveBeyondSavepoint) {
       // change from last state - Removal of savepoint instant from the active 
timeline since it is deleted
       expectedActiveInstants = 
getActiveCommitInstants(Arrays.asList("00000003", "00000007"), 
HoodieTimeline.REPLACE_COMMIT_ACTION);
-      
expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000008", 
"00000009"), HoodieTimeline.CLEAN_ACTION));
+      
expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000008", 
"00000009"), CLEAN_ACTION));
       List<HoodieInstant> archivedCommitInstants = 
getAllArchivedCommitInstants(Arrays.asList("00000001", "00000002", "00000005"));
       
archivedCommitInstants.addAll(getAllArchivedCommitInstants(Arrays.asList("00000004",
 "00000006"), HoodieTimeline.REPLACE_COMMIT_ACTION));
       verifyArchival(archivedCommitInstants, expectedActiveInstants, 
commitsAfterArchival, true);
@@ -616,7 +625,7 @@ public class TestHoodieTimelineArchiver extends 
HoodieSparkClientTestHarness {
       // archival is triggered since clean also does not block it
       // c6 and c7 are retained since min instants to keep is 2
       expectedActiveInstants = 
getActiveCommitInstants(Arrays.asList("00000006", "00000007"), 
HoodieTimeline.REPLACE_COMMIT_ACTION);
-      
expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000008", 
"00000009"), HoodieTimeline.CLEAN_ACTION));
+      
expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000008", 
"00000009"), CLEAN_ACTION));
       List<HoodieInstant> archivedCommitInstants = 
getAllArchivedCommitInstants(Arrays.asList("00000001", "00000002", "00000005"));
       
archivedCommitInstants.addAll(getAllArchivedCommitInstants(Arrays.asList("00000003",
 "00000004"), HoodieTimeline.REPLACE_COMMIT_ACTION));
       verifyArchival(archivedCommitInstants, expectedActiveInstants, 
commitsAfterArchival, false);
@@ -742,6 +751,87 @@ public class TestHoodieTimelineArchiver extends 
HoodieSparkClientTestHarness {
     }
   }
 
+  @Test
+  public void testDowngradeArchivedTimeline() throws Exception {
+    HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(false, 1, 
2, 5, HoodieTableType.MERGE_ON_READ);
+
+    // do ingestion and trigger archive actions here.
+    Map<String, Integer> cleanStats = new HashMap<>();
+    cleanStats.put("p1", 1);
+    cleanStats.put("p2", 2);
+    for (int i = 1; i < 17; i += 2) {
+      if (i == 3) {
+        testTable.doRollback(String.format("%08d", 1), String.format("%08d", 
3));
+      } else if (i == 5) {
+        testTable.doCluster(String.format("%08d", i), Collections.emptyMap(), 
Arrays.asList("p1", "p2"), 20);
+      } else if (i == 7 || i == 13) {
+        testTable.doCompaction(String.format("%08d", i), Arrays.asList("p1", 
"p2"));
+      } else {
+        testTable.doWriteOperation(String.format("%08d", i), 
WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : 
Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
+        testTable.doClean(String.format("%08d", i + 1), cleanStats, 
Collections.emptyMap());
+      }
+    }
+    testTable.doCompaction(String.format("%08d", 17), Arrays.asList("p1", 
"p2"));
+
+    // 1 - dc, 2- clean, 3 - rollback, 5 -> clustering, 7 -> compaction, 9 -> 
dc, 10 -> clean. 11 -> dc,
+    // 12 -> clean. 13 -> compaction, 15 -> dc, 16 -> clean, 17 -> compaction
+    Pair<List<HoodieInstant>, List<HoodieInstant>> result = 
archiveAndGetCommitsList(writeConfig);
+    // after archival, only instants 16 and 17 are in active timeline.
+    List<HoodieInstant> expectedActiveInstants = new ArrayList<>();
+    //List<String> expectedArchivedInstants = Arrays.asList(new 
String[]{String.format("%08d",1), String.format("%08d",2), 
String.format("%08d",12)})
+    expectedActiveInstants.add(getHoodieInstant(CLEAN_ACTION, 
String.format("%08d",16)));
+    expectedActiveInstants.add(getHoodieInstant(COMMIT_ACTION, 
String.format("%08d",17)));
+
+    // validate active instants
+    List<HoodieInstant> actualActiveInstants = new 
ArrayList<>(result.getRight());
+    Collections.sort(actualActiveInstants);
+    Collections.sort(expectedActiveInstants);
+    assertEquals(expectedActiveInstants, actualActiveInstants);
+
+    List<HoodieInstant> actualArchivedCommits = new 
ArrayList<>(result.getKey());
+    actualArchivedCommits.removeAll(result.getValue());
+
+    List<HoodieInstant> expectedArchivedInstants = new ArrayList<>();
+    expectedArchivedInstants.add(getHoodieInstant(DELTA_COMMIT_ACTION, 
String.format("%08d",1)));
+    expectedArchivedInstants.add(getHoodieInstant(CLEAN_ACTION, 
String.format("%08d",2)));
+    expectedArchivedInstants.add(getHoodieInstant(ROLLBACK_ACTION, 
String.format("%08d",3)));
+    expectedArchivedInstants.add(getHoodieInstant(REPLACE_COMMIT_ACTION, 
String.format("%08d",5)));
+    expectedArchivedInstants.add(getHoodieInstant(COMMIT_ACTION, 
String.format("%08d",7)));
+    expectedArchivedInstants.add(getHoodieInstant(DELTA_COMMIT_ACTION, 
String.format("%08d",9)));
+    expectedArchivedInstants.add(getHoodieInstant(CLEAN_ACTION, 
String.format("%08d",10)));
+    expectedArchivedInstants.add(getHoodieInstant(DELTA_COMMIT_ACTION, 
String.format("%08d",11)));
+    expectedArchivedInstants.add(getHoodieInstant(CLEAN_ACTION, 
String.format("%08d",12)));
+    expectedArchivedInstants.add(getHoodieInstant(COMMIT_ACTION, 
String.format("%08d",13)));
+    expectedArchivedInstants.add(getHoodieInstant(DELTA_COMMIT_ACTION, 
String.format("%08d",15)));
+
+    // validate archived instants
+    Collections.sort(actualArchivedCommits);
+    Collections.sort(expectedArchivedInstants);
+    assertEquals(expectedArchivedInstants, actualArchivedCommits);
+
+    // loading archived timeline instants
+    HoodieArchivedTimeline archivedTimeLine = metaClient.getArchivedTimeline();
+    archivedTimeLine.loadCompletedInstantDetailsInMemory();
+
+    // Downgrade to table version 6
+    new UpgradeDowngrade(metaClient, writeConfig, context, 
SparkUpgradeDowngradeHelper.getInstance())
+        .run(HoodieTableVersion.SIX, null);
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    metaClient.getArchivedTimeline().loadCompletedInstantDetailsInMemory();
+    HoodieTimeline downgradedArchivedTimeline = 
metaClient.getArchivedTimeline();
+    // verify expected archived instants
+    expectedArchivedInstants.forEach(instant -> 
assertTrue(downgradedArchivedTimeline.containsInstant(instant)));
+    // verify the contents of older archived timeline and downgraded archived 
timeline
+    for (HoodieInstant instant : archivedTimeLine.getInstants()) {
+      
assertTrue(Arrays.equals(archivedTimeLine.getInstantReader().getInstantDetails(instant).get(),
+          
downgradedArchivedTimeline.getInstantReader().getInstantDetails(instant).get()));
+    }
+  }
+
+  private HoodieInstant getHoodieInstant(String action, String instantTime) {
+    return new HoodieInstant(State.COMPLETED, action, instantTime, 
InstantComparatorV2.REQUESTED_TIME_BASED_COMPARATOR);
+  }
+
   @ParameterizedTest
   @ValueSource(booleans = {false, true})
   public void testArchivalWithMultiWriters(boolean enableMetadata) throws 
Exception {
@@ -1129,7 +1219,7 @@ public class TestHoodieTimelineArchiver extends 
HoodieSparkClientTestHarness {
 
   private void verifyInflightInstants(HoodieTableMetaClient metaClient, int 
expectedTotalInstants) {
     HoodieTimeline timeline = metaClient.getActiveTimeline().reload()
-        
.getTimelineOfActions(Collections.singleton(HoodieTimeline.CLEAN_ACTION)).filterInflights();
+        
.getTimelineOfActions(Collections.singleton(CLEAN_ACTION)).filterInflights();
     assertEquals(expectedTotalInstants, timeline.countInstants(),
         "Loaded inflight clean actions and the count should match");
   }
@@ -1178,7 +1268,7 @@ public class TestHoodieTimelineArchiver extends 
HoodieSparkClientTestHarness {
         List<HoodieInstant> expectedActiveInstants = new 
ArrayList<>(getActiveCommitInstants(Arrays.asList("00000007", "00000008")));
         List<HoodieInstant> expectedArchiveInstants = new ArrayList<>();
         
expectedArchiveInstants.addAll(getAllArchivedCommitInstants(Arrays.asList("00000001",
 "00000004", "00000006")));
-        
expectedArchiveInstants.addAll(getAllArchivedCommitInstants(Arrays.asList("00000002",
 "00000003", "00000005"), HoodieTimeline.CLEAN_ACTION));
+        
expectedArchiveInstants.addAll(getAllArchivedCommitInstants(Arrays.asList("00000002",
 "00000003", "00000005"), CLEAN_ACTION));
 
         verifyArchival(expectedArchiveInstants, expectedActiveInstants, 
commitsAfterArchival, false);
       }
@@ -1239,11 +1329,11 @@ public class TestHoodieTimelineArchiver extends 
HoodieSparkClientTestHarness {
         List<HoodieInstant> expectedActiveInstants = new ArrayList<>();
         
expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000009", 
"00000010", "00000011", "00000012")));
         expectedActiveInstants.addAll(
-            getActiveCommitInstants(Arrays.asList("00000013", "00000014", 
"00000015", "00000016"), HoodieTimeline.CLEAN_ACTION));
+            getActiveCommitInstants(Arrays.asList("00000013", "00000014", 
"00000015", "00000016"), CLEAN_ACTION));
         List<HoodieInstant> expectedArchivedInstants = new ArrayList<>();
         expectedArchivedInstants.addAll(getAllArchivedCommitInstants(
             Arrays.asList("00000001", "00000002", "00000003", "00000004", 
"00000005", "00000008"), HoodieTimeline.COMMIT_ACTION));
-        
expectedArchivedInstants.addAll(getAllArchivedCommitInstants(Arrays.asList("00000006"),
 HoodieTimeline.CLEAN_ACTION));
+        
expectedArchivedInstants.addAll(getAllArchivedCommitInstants(Arrays.asList("00000006"),
 CLEAN_ACTION));
         
expectedArchivedInstants.addAll(getAllArchivedCommitInstants(Arrays.asList("00000007"),
 HoodieTimeline.REPLACE_COMMIT_ACTION));
         verifyArchival(expectedArchivedInstants, expectedActiveInstants, 
commitsAfterArchival, false);
       }
@@ -1268,7 +1358,7 @@ public class TestHoodieTimelineArchiver extends 
HoodieSparkClientTestHarness {
 
     for (int i = 2; i < 5; i++) {
       String cleanInstant = metaClient.createNewInstantTime();
-      instants.add(Pair.of(cleanInstant, HoodieTimeline.CLEAN_ACTION));
+      instants.add(Pair.of(cleanInstant, CLEAN_ACTION));
       testTable.doClean(cleanInstant, partitionToFileDeleteCount);
     }
 
@@ -1324,7 +1414,7 @@ public class TestHoodieTimelineArchiver extends 
HoodieSparkClientTestHarness {
     List<HoodieInstant> expectedArchivedInstants = new ArrayList<>();
     for (int i = 0; i < maxInstantsToKeep + 1; i++, startInstant++) {
       createCleanMetadata(String.format("%02d", startInstant), false, false, 
isEmpty || i % 2 == 0);
-      
expectedArchivedInstants.add(INSTANT_GENERATOR.createNewInstant(State.COMPLETED,
 HoodieTimeline.CLEAN_ACTION, String.format("%02d", startInstant)));
+      
expectedArchivedInstants.add(INSTANT_GENERATOR.createNewInstant(State.COMPLETED,
 CLEAN_ACTION, String.format("%02d", startInstant)));
     }
 
     for (int i = 0; i < maxInstantsToKeep + 1; i++, startInstant += 2) {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/MetadataConversionUtils.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/MetadataConversionUtils.java
index b9a7d5dada0..06be0391f2c 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/MetadataConversionUtils.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/MetadataConversionUtils.java
@@ -23,6 +23,8 @@ import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.avro.model.HoodieLSMTimelineInstant;
 import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.avro.model.HoodieSavepointMetadata;
 import org.apache.hudi.common.model.ActionType;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
@@ -186,8 +188,9 @@ public class MetadataConversionUtils {
         break;
       }
       case HoodieTimeline.COMMIT_ACTION: {
-        getCommitMetadata(metaClient, hoodieInstant, 
HoodieCommitMetadata.class)
-            .ifPresent(commitMetadata -> 
archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadataToAvro(commitMetadata)));
+        HoodieCommitMetadata commitMetadata = 
metaClient.getCommitMetadataSerDe().deserialize(hoodieInstant, new 
ByteArrayInputStream(instantDetails.get()),
+            () -> instantDetails.get().length == 0, 
HoodieCommitMetadata.class);
+        
archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadataToAvro(commitMetadata));
         archivedMetaWrapper.setActionType(ActionType.commit.name());
 
         if (planBytes.isPresent()) {
@@ -198,8 +201,9 @@ public class MetadataConversionUtils {
         break;
       }
       case HoodieTimeline.DELTA_COMMIT_ACTION: {
-        getCommitMetadata(metaClient, hoodieInstant, 
HoodieCommitMetadata.class)
-            .ifPresent(commitMetadata -> 
archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadataToAvro(commitMetadata)));
+        HoodieCommitMetadata deltaCommitMetadata = 
metaClient.getCommitMetadataSerDe().deserialize(hoodieInstant, new 
ByteArrayInputStream(instantDetails.get()),
+            () -> instantDetails.get().length == 0, 
HoodieCommitMetadata.class);
+        
archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadataToAvro(deltaCommitMetadata));
         archivedMetaWrapper.setActionType(ActionType.deltacommit.name());
 
         if (planBytes.isPresent()) {
@@ -211,13 +215,14 @@ public class MetadataConversionUtils {
       }
       case HoodieTimeline.REPLACE_COMMIT_ACTION:
       case HoodieTimeline.CLUSTERING_ACTION: {
-        getCommitMetadata(metaClient, hoodieInstant, 
HoodieReplaceCommitMetadata.class)
-            .ifPresent(replaceCommitMetadata -> 
archivedMetaWrapper.setHoodieReplaceCommitMetadata(convertCommitMetadataToAvro(replaceCommitMetadata)));
+        HoodieCommitMetadata replaceCommitMetadata = 
metaClient.getCommitMetadataSerDe().deserialize(hoodieInstant, new 
ByteArrayInputStream(instantDetails.get()),
+            () -> instantDetails.get().length == 0, 
HoodieReplaceCommitMetadata.class);
+        
archivedMetaWrapper.setHoodieReplaceCommitMetadata(convertCommitMetadataToAvro(replaceCommitMetadata));
 
         // inflight replacecommit files have the same metadata body as 
HoodieCommitMetadata
         // so we could re-use it without further creating an inflight 
extension.
         // Or inflight replacecommit files are empty under clustering 
circumstance
-        Option<HoodieCommitMetadata> inflightCommitMetadata = 
getCommitMetadata(metaClient, hoodieInstant, HoodieCommitMetadata.class);
+        Option<HoodieCommitMetadata> inflightCommitMetadata = 
getInflightCommitMetadata(metaClient, hoodieInstant, instantDetails);
         if (inflightCommitMetadata.isPresent()) {
           
archivedMetaWrapper.setHoodieInflightReplaceMetadata(convertCommitMetadataToAvro(inflightCommitMetadata.get()));
         }
@@ -226,25 +231,25 @@ public class MetadataConversionUtils {
       }
       case HoodieTimeline.ROLLBACK_ACTION: {
         archivedMetaWrapper.setHoodieRollbackMetadata(
-            
metaClient.getActiveTimeline().readRollbackMetadata(hoodieInstant));
+            TimelineMetadataUtils.deserializeAvroMetadata(new 
ByteArrayInputStream(instantDetails.get()), HoodieRollbackMetadata.class));
         archivedMetaWrapper.setActionType(ActionType.rollback.name());
         break;
       }
       case HoodieTimeline.SAVEPOINT_ACTION: {
         archivedMetaWrapper.setHoodieSavePointMetadata(
-            
metaClient.getActiveTimeline().readSavepointMetadata(hoodieInstant));
+            TimelineMetadataUtils.deserializeAvroMetadata(new 
ByteArrayInputStream(instantDetails.get()), HoodieSavepointMetadata.class));
         archivedMetaWrapper.setActionType(ActionType.savepoint.name());
         break;
       }
       case HoodieTimeline.COMPACTION_ACTION: {
         // should be handled by commit_action branch though, this logic is 
redundant.
-        HoodieCompactionPlan plan = 
CompactionUtils.getCompactionPlan(metaClient, hoodieInstant);
+        HoodieCompactionPlan plan = 
CompactionUtils.getCompactionPlan(metaClient, new 
ByteArrayInputStream(planBytes.get()));
         archivedMetaWrapper.setHoodieCompactionPlan(plan);
         archivedMetaWrapper.setActionType(ActionType.compaction.name());
         break;
       }
       case HoodieTimeline.LOG_COMPACTION_ACTION: {
-        HoodieCompactionPlan plan = 
CompactionUtils.getCompactionPlan(metaClient, hoodieInstant);
+        HoodieCompactionPlan plan = 
CompactionUtils.getCompactionPlan(metaClient, new 
ByteArrayInputStream(planBytes.get()));
         archivedMetaWrapper.setHoodieCompactionPlan(plan);
         archivedMetaWrapper.setActionType(ActionType.logcompaction.name());
         break;
@@ -256,6 +261,16 @@ public class MetadataConversionUtils {
     return archivedMetaWrapper;
   }
 
+  private static Option<HoodieCommitMetadata> 
getInflightCommitMetadata(HoodieTableMetaClient metaClient, HoodieInstant 
instant,
+                                                                        
Option<byte[]> inflightContent) throws IOException {
+    if (!inflightContent.isPresent() || inflightContent.get().length == 0) {
+      // inflight files can be empty in some certain cases, e.g. when users 
opt in clustering
+      return Option.empty();
+    }
+    return Option.of(metaClient.getCommitMetadataSerDe().deserialize(instant, 
new ByteArrayInputStream(inflightContent.get()),
+        () -> inflightContent.get().length == 0, HoodieCommitMetadata.class));
+  }
+
   public static HoodieLSMTimelineInstant createLSMTimelineInstant(ActiveAction 
activeAction, HoodieTableMetaClient metaClient) {
     HoodieLSMTimelineInstant lsmTimelineInstant = new 
HoodieLSMTimelineInstant();
     lsmTimelineInstant.setInstantTime(activeAction.getInstantTime());

Reply via email to