This is an automated email from the ASF dual-hosted git repository. vhs pushed a commit to branch release-1.0.2 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 224c6a3b2ac48336ce39ffc542ca1d4e92f03c7d Author: Lokesh Jain <[email protected]> AuthorDate: Thu Apr 10 06:30:53 2025 +0530 [HUDI-9263] Archived timeline downgrade fails with EightToSevenDowngradeHandler (#13098) --------- Co-authored-by: sivabalan <[email protected]> (cherry picked from commit ea5b3c346d803067f7cdde03f63b6f8e3998b876) --- .../apache/hudi/io/TestHoodieTimelineArchiver.java | 112 +++++++++++++++++++-- .../table/timeline/MetadataConversionUtils.java | 37 +++++-- 2 files changed, 127 insertions(+), 22 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java index 2a12a34b7ce..2cab3c4736a 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java @@ -35,6 +35,7 @@ import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.WriteConcurrencyMode; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.HoodieTableVersion; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -43,6 +44,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.LSMTimeline; import org.apache.hudi.common.table.timeline.TimelineUtils; +import org.apache.hudi.common.table.timeline.versioning.v2.InstantComparatorV2; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.testutils.FileCreateUtilsLegacy; import org.apache.hudi.common.testutils.HoodieMetadataTestTable; @@ -70,6 +72,8 @@ import org.apache.hudi.storage.HoodieInstantWriter; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.upgrade.SparkUpgradeDowngradeHelper; +import org.apache.hudi.table.upgrade.UpgradeDowngrade; import org.apache.hudi.testutils.HoodieSparkClientTestHarness; import org.junit.jupiter.api.AfterEach; @@ -110,6 +114,11 @@ import java.util.stream.IntStream; import java.util.stream.Stream; import static org.apache.hudi.HoodieTestCommitGenerator.getBaseFilename; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.CLEAN_ACTION; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.ROLLBACK_ACTION; import static org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN; import static org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps; import static org.apache.hudi.common.table.timeline.MetadataConversionUtils.convertCommitMetadataToAvro; @@ -566,7 +575,7 @@ public class TestHoodieTimelineArchiver extends HoodieSparkClientTestHarness { expectedActiveInstants = getActiveCommitInstants(Arrays.asList("00000005")); expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000003", "00000004", "00000006", "00000007"), HoodieTimeline.REPLACE_COMMIT_ACTION)); expectedActiveInstants.addAll(getActiveSavepointedCommitInstants(Arrays.asList("00000003"))); - expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000008"), HoodieTimeline.CLEAN_ACTION)); + expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000008"), CLEAN_ACTION)); verifyArchival(getAllArchivedCommitInstants(Arrays.asList("00000001", "00000002")), expectedActiveInstants, commitsAfterArchival, false); @@ -581,7 +590,7 @@ public class TestHoodieTimelineArchiver extends HoodieSparkClientTestHarness { // retains the 2 commits - C3 and C7. Since minInstantsToKeep is 2, c3 is retained. Archival is now blocked at // c7 since that is the replace commit after earliest savepoint c7 in cleaner expectedActiveInstants = getActiveCommitInstants(Arrays.asList("00000003", "00000007"), HoodieTimeline.REPLACE_COMMIT_ACTION); - expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000008", "00000009"), HoodieTimeline.CLEAN_ACTION)); + expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000008", "00000009"), CLEAN_ACTION)); expectedActiveInstants.addAll(getActiveSavepointedCommitInstants(Arrays.asList("00000003"))); List<HoodieInstant> archivedCommitInstants = getAllArchivedCommitInstants(Arrays.asList("00000001", "00000002", "00000005")); archivedCommitInstants.addAll(getAllArchivedCommitInstants(Arrays.asList("00000004", "00000006"), HoodieTimeline.REPLACE_COMMIT_ACTION)); @@ -591,7 +600,7 @@ public class TestHoodieTimelineArchiver extends HoodieSparkClientTestHarness { expectedActiveInstants = getActiveCommitInstants(Arrays.asList("00000005")); expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000003", "00000004", "00000006", "00000007"), HoodieTimeline.REPLACE_COMMIT_ACTION)); expectedActiveInstants.addAll(getActiveSavepointedCommitInstants(Arrays.asList("00000003"))); - expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000008", "00000009"), HoodieTimeline.CLEAN_ACTION)); + expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000008", "00000009"), CLEAN_ACTION)); verifyArchival(getAllArchivedCommitInstants(Arrays.asList("00000001", "00000002")), expectedActiveInstants, commitsAfterArchival, false); } @@ -606,7 +615,7 @@ public class TestHoodieTimelineArchiver extends HoodieSparkClientTestHarness { if (archiveBeyondSavepoint) { // change from last state - Removal of savepoint instant from the active timeline since it is deleted expectedActiveInstants = getActiveCommitInstants(Arrays.asList("00000003", "00000007"), HoodieTimeline.REPLACE_COMMIT_ACTION); - expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000008", "00000009"), HoodieTimeline.CLEAN_ACTION)); + expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000008", "00000009"), CLEAN_ACTION)); List<HoodieInstant> archivedCommitInstants = getAllArchivedCommitInstants(Arrays.asList("00000001", "00000002", "00000005")); archivedCommitInstants.addAll(getAllArchivedCommitInstants(Arrays.asList("00000004", "00000006"), HoodieTimeline.REPLACE_COMMIT_ACTION)); verifyArchival(archivedCommitInstants, expectedActiveInstants, commitsAfterArchival, true); @@ -616,7 +625,7 @@ public class TestHoodieTimelineArchiver extends HoodieSparkClientTestHarness { // archival is triggered since clean also does not block it // c6 and c7 are retained since min instants to keep is 2 expectedActiveInstants = getActiveCommitInstants(Arrays.asList("00000006", "00000007"), HoodieTimeline.REPLACE_COMMIT_ACTION); - expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000008", "00000009"), HoodieTimeline.CLEAN_ACTION)); + expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000008", "00000009"), CLEAN_ACTION)); List<HoodieInstant> archivedCommitInstants = getAllArchivedCommitInstants(Arrays.asList("00000001", "00000002", "00000005")); archivedCommitInstants.addAll(getAllArchivedCommitInstants(Arrays.asList("00000003", "00000004"), HoodieTimeline.REPLACE_COMMIT_ACTION)); verifyArchival(archivedCommitInstants, expectedActiveInstants, commitsAfterArchival, false); @@ -742,6 +751,87 @@ public class TestHoodieTimelineArchiver extends HoodieSparkClientTestHarness { } } + @Test + public void testDowngradeArchivedTimeline() throws Exception { + HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(false, 1, 2, 5, HoodieTableType.MERGE_ON_READ); + + // do ingestion and trigger archive actions here. + Map<String, Integer> cleanStats = new HashMap<>(); + cleanStats.put("p1", 1); + cleanStats.put("p2", 2); + for (int i = 1; i < 17; i += 2) { + if (i == 3) { + testTable.doRollback(String.format("%08d", 1), String.format("%08d", 3)); + } else if (i == 5) { + testTable.doCluster(String.format("%08d", i), Collections.emptyMap(), Arrays.asList("p1", "p2"), 20); + } else if (i == 7 || i == 13) { + testTable.doCompaction(String.format("%08d", i), Arrays.asList("p1", "p2")); + } else { + testTable.doWriteOperation(String.format("%08d", i), WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2); + testTable.doClean(String.format("%08d", i + 1), cleanStats, Collections.emptyMap()); + } + } + testTable.doCompaction(String.format("%08d", 17), Arrays.asList("p1", "p2")); + + // 1 - dc, 2- clean, 3 - rollback, 5 -> clustering, 7 -> compaction, 9 -> dc, 10 -> clean. 11 -> dc, + // 12 -> clean. 13 -> compaction, 15 -> dc, 16 -> clean, 17 -> compaction + Pair<List<HoodieInstant>, List<HoodieInstant>> result = archiveAndGetCommitsList(writeConfig); + // after archival, only instants 16 and 17 are in active timeline. + List<HoodieInstant> expectedActiveInstants = new ArrayList<>(); + //List<String> expectedArchivedInstants = Arrays.asList(new String[]{String.format("%08d",1), String.format("%08d",2), String.format("%08d",12)}) + expectedActiveInstants.add(getHoodieInstant(CLEAN_ACTION, String.format("%08d",16))); + expectedActiveInstants.add(getHoodieInstant(COMMIT_ACTION, String.format("%08d",17))); + + // validate active instants + List<HoodieInstant> actualActiveInstants = new ArrayList<>(result.getRight()); + Collections.sort(actualActiveInstants); + Collections.sort(expectedActiveInstants); + assertEquals(expectedActiveInstants, actualActiveInstants); + + List<HoodieInstant> actualArchivedCommits = new ArrayList<>(result.getKey()); + actualArchivedCommits.removeAll(result.getValue()); + + List<HoodieInstant> expectedArchivedInstants = new ArrayList<>(); + expectedArchivedInstants.add(getHoodieInstant(DELTA_COMMIT_ACTION, String.format("%08d",1))); + expectedArchivedInstants.add(getHoodieInstant(CLEAN_ACTION, String.format("%08d",2))); + expectedArchivedInstants.add(getHoodieInstant(ROLLBACK_ACTION, String.format("%08d",3))); + expectedArchivedInstants.add(getHoodieInstant(REPLACE_COMMIT_ACTION, String.format("%08d",5))); + expectedArchivedInstants.add(getHoodieInstant(COMMIT_ACTION, String.format("%08d",7))); + expectedArchivedInstants.add(getHoodieInstant(DELTA_COMMIT_ACTION, String.format("%08d",9))); + expectedArchivedInstants.add(getHoodieInstant(CLEAN_ACTION, String.format("%08d",10))); + expectedArchivedInstants.add(getHoodieInstant(DELTA_COMMIT_ACTION, String.format("%08d",11))); + expectedArchivedInstants.add(getHoodieInstant(CLEAN_ACTION, String.format("%08d",12))); + expectedArchivedInstants.add(getHoodieInstant(COMMIT_ACTION, String.format("%08d",13))); + expectedArchivedInstants.add(getHoodieInstant(DELTA_COMMIT_ACTION, String.format("%08d",15))); + + // validate archived instants + Collections.sort(actualArchivedCommits); + Collections.sort(expectedArchivedInstants); + assertEquals(expectedArchivedInstants, actualArchivedCommits); + + // loading archived timeline instants + HoodieArchivedTimeline archivedTimeLine = metaClient.getArchivedTimeline(); + archivedTimeLine.loadCompletedInstantDetailsInMemory(); + + // Downgrade to table version 6 + new UpgradeDowngrade(metaClient, writeConfig, context, SparkUpgradeDowngradeHelper.getInstance()) + .run(HoodieTableVersion.SIX, null); + metaClient = HoodieTableMetaClient.reload(metaClient); + metaClient.getArchivedTimeline().loadCompletedInstantDetailsInMemory(); + HoodieTimeline downgradedArchivedTimeline = metaClient.getArchivedTimeline(); + // verify expected archived instants + expectedArchivedInstants.forEach(instant -> assertTrue(downgradedArchivedTimeline.containsInstant(instant))); + // verify the contents of older archived timeline and downgraded archived timeline + for (HoodieInstant instant : archivedTimeLine.getInstants()) { + assertTrue(Arrays.equals(archivedTimeLine.getInstantReader().getInstantDetails(instant).get(), + downgradedArchivedTimeline.getInstantReader().getInstantDetails(instant).get())); + } + } + + private HoodieInstant getHoodieInstant(String action, String instantTime) { + return new HoodieInstant(State.COMPLETED, action, instantTime, InstantComparatorV2.REQUESTED_TIME_BASED_COMPARATOR); + } + @ParameterizedTest @ValueSource(booleans = {false, true}) public void testArchivalWithMultiWriters(boolean enableMetadata) throws Exception { @@ -1129,7 +1219,7 @@ public class TestHoodieTimelineArchiver extends HoodieSparkClientTestHarness { private void verifyInflightInstants(HoodieTableMetaClient metaClient, int expectedTotalInstants) { HoodieTimeline timeline = metaClient.getActiveTimeline().reload() - .getTimelineOfActions(Collections.singleton(HoodieTimeline.CLEAN_ACTION)).filterInflights(); + .getTimelineOfActions(Collections.singleton(CLEAN_ACTION)).filterInflights(); assertEquals(expectedTotalInstants, timeline.countInstants(), "Loaded inflight clean actions and the count should match"); } @@ -1178,7 +1268,7 @@ public class TestHoodieTimelineArchiver extends HoodieSparkClientTestHarness { List<HoodieInstant> expectedActiveInstants = new ArrayList<>(getActiveCommitInstants(Arrays.asList("00000007", "00000008"))); List<HoodieInstant> expectedArchiveInstants = new ArrayList<>(); expectedArchiveInstants.addAll(getAllArchivedCommitInstants(Arrays.asList("00000001", "00000004", "00000006"))); - expectedArchiveInstants.addAll(getAllArchivedCommitInstants(Arrays.asList("00000002", "00000003", "00000005"), HoodieTimeline.CLEAN_ACTION)); + expectedArchiveInstants.addAll(getAllArchivedCommitInstants(Arrays.asList("00000002", "00000003", "00000005"), CLEAN_ACTION)); verifyArchival(expectedArchiveInstants, expectedActiveInstants, commitsAfterArchival, false); } @@ -1239,11 +1329,11 @@ public class TestHoodieTimelineArchiver extends HoodieSparkClientTestHarness { List<HoodieInstant> expectedActiveInstants = new ArrayList<>(); expectedActiveInstants.addAll(getActiveCommitInstants(Arrays.asList("00000009", "00000010", "00000011", "00000012"))); expectedActiveInstants.addAll( - getActiveCommitInstants(Arrays.asList("00000013", "00000014", "00000015", "00000016"), HoodieTimeline.CLEAN_ACTION)); + getActiveCommitInstants(Arrays.asList("00000013", "00000014", "00000015", "00000016"), CLEAN_ACTION)); List<HoodieInstant> expectedArchivedInstants = new ArrayList<>(); expectedArchivedInstants.addAll(getAllArchivedCommitInstants( Arrays.asList("00000001", "00000002", "00000003", "00000004", "00000005", "00000008"), HoodieTimeline.COMMIT_ACTION)); - expectedArchivedInstants.addAll(getAllArchivedCommitInstants(Arrays.asList("00000006"), HoodieTimeline.CLEAN_ACTION)); + expectedArchivedInstants.addAll(getAllArchivedCommitInstants(Arrays.asList("00000006"), CLEAN_ACTION)); expectedArchivedInstants.addAll(getAllArchivedCommitInstants(Arrays.asList("00000007"), HoodieTimeline.REPLACE_COMMIT_ACTION)); verifyArchival(expectedArchivedInstants, expectedActiveInstants, commitsAfterArchival, false); } @@ -1268,7 +1358,7 @@ public class TestHoodieTimelineArchiver extends HoodieSparkClientTestHarness { for (int i = 2; i < 5; i++) { String cleanInstant = metaClient.createNewInstantTime(); - instants.add(Pair.of(cleanInstant, HoodieTimeline.CLEAN_ACTION)); + instants.add(Pair.of(cleanInstant, CLEAN_ACTION)); testTable.doClean(cleanInstant, partitionToFileDeleteCount); } @@ -1324,7 +1414,7 @@ public class TestHoodieTimelineArchiver extends HoodieSparkClientTestHarness { List<HoodieInstant> expectedArchivedInstants = new ArrayList<>(); for (int i = 0; i < maxInstantsToKeep + 1; i++, startInstant++) { createCleanMetadata(String.format("%02d", startInstant), false, false, isEmpty || i % 2 == 0); - expectedArchivedInstants.add(INSTANT_GENERATOR.createNewInstant(State.COMPLETED, HoodieTimeline.CLEAN_ACTION, String.format("%02d", startInstant))); + expectedArchivedInstants.add(INSTANT_GENERATOR.createNewInstant(State.COMPLETED, CLEAN_ACTION, String.format("%02d", startInstant))); } for (int i = 0; i < maxInstantsToKeep + 1; i++, startInstant += 2) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/MetadataConversionUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/MetadataConversionUtils.java index b9a7d5dada0..06be0391f2c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/MetadataConversionUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/MetadataConversionUtils.java @@ -23,6 +23,8 @@ import org.apache.hudi.avro.model.HoodieArchivedMetaEntry; import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.avro.model.HoodieLSMTimelineInstant; import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata; +import org.apache.hudi.avro.model.HoodieRollbackMetadata; +import org.apache.hudi.avro.model.HoodieSavepointMetadata; import org.apache.hudi.common.model.ActionType; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; @@ -186,8 +188,9 @@ public class MetadataConversionUtils { break; } case HoodieTimeline.COMMIT_ACTION: { - getCommitMetadata(metaClient, hoodieInstant, HoodieCommitMetadata.class) - .ifPresent(commitMetadata -> archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadataToAvro(commitMetadata))); + HoodieCommitMetadata commitMetadata = metaClient.getCommitMetadataSerDe().deserialize(hoodieInstant, new ByteArrayInputStream(instantDetails.get()), + () -> instantDetails.get().length == 0, HoodieCommitMetadata.class); + archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadataToAvro(commitMetadata)); archivedMetaWrapper.setActionType(ActionType.commit.name()); if (planBytes.isPresent()) { @@ -198,8 +201,9 @@ public class MetadataConversionUtils { break; } case HoodieTimeline.DELTA_COMMIT_ACTION: { - getCommitMetadata(metaClient, hoodieInstant, HoodieCommitMetadata.class) - .ifPresent(commitMetadata -> archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadataToAvro(commitMetadata))); + HoodieCommitMetadata deltaCommitMetadata = metaClient.getCommitMetadataSerDe().deserialize(hoodieInstant, new ByteArrayInputStream(instantDetails.get()), + () -> instantDetails.get().length == 0, HoodieCommitMetadata.class); + archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadataToAvro(deltaCommitMetadata)); archivedMetaWrapper.setActionType(ActionType.deltacommit.name()); if (planBytes.isPresent()) { @@ -211,13 +215,14 @@ public class MetadataConversionUtils { } case HoodieTimeline.REPLACE_COMMIT_ACTION: case HoodieTimeline.CLUSTERING_ACTION: { - getCommitMetadata(metaClient, hoodieInstant, HoodieReplaceCommitMetadata.class) - .ifPresent(replaceCommitMetadata -> archivedMetaWrapper.setHoodieReplaceCommitMetadata(convertCommitMetadataToAvro(replaceCommitMetadata))); + HoodieCommitMetadata replaceCommitMetadata = metaClient.getCommitMetadataSerDe().deserialize(hoodieInstant, new ByteArrayInputStream(instantDetails.get()), + () -> instantDetails.get().length == 0, HoodieReplaceCommitMetadata.class); + archivedMetaWrapper.setHoodieReplaceCommitMetadata(convertCommitMetadataToAvro(replaceCommitMetadata)); // inflight replacecommit files have the same metadata body as HoodieCommitMetadata // so we could re-use it without further creating an inflight extension. // Or inflight replacecommit files are empty under clustering circumstance - Option<HoodieCommitMetadata> inflightCommitMetadata = getCommitMetadata(metaClient, hoodieInstant, HoodieCommitMetadata.class); + Option<HoodieCommitMetadata> inflightCommitMetadata = getInflightCommitMetadata(metaClient, hoodieInstant, instantDetails); if (inflightCommitMetadata.isPresent()) { archivedMetaWrapper.setHoodieInflightReplaceMetadata(convertCommitMetadataToAvro(inflightCommitMetadata.get())); } @@ -226,25 +231,25 @@ public class MetadataConversionUtils { } case HoodieTimeline.ROLLBACK_ACTION: { archivedMetaWrapper.setHoodieRollbackMetadata( - metaClient.getActiveTimeline().readRollbackMetadata(hoodieInstant)); + TimelineMetadataUtils.deserializeAvroMetadata(new ByteArrayInputStream(instantDetails.get()), HoodieRollbackMetadata.class)); archivedMetaWrapper.setActionType(ActionType.rollback.name()); break; } case HoodieTimeline.SAVEPOINT_ACTION: { archivedMetaWrapper.setHoodieSavePointMetadata( - metaClient.getActiveTimeline().readSavepointMetadata(hoodieInstant)); + TimelineMetadataUtils.deserializeAvroMetadata(new ByteArrayInputStream(instantDetails.get()), HoodieSavepointMetadata.class)); archivedMetaWrapper.setActionType(ActionType.savepoint.name()); break; } case HoodieTimeline.COMPACTION_ACTION: { // should be handled by commit_action branch though, this logic is redundant. - HoodieCompactionPlan plan = CompactionUtils.getCompactionPlan(metaClient, hoodieInstant); + HoodieCompactionPlan plan = CompactionUtils.getCompactionPlan(metaClient, new ByteArrayInputStream(planBytes.get())); archivedMetaWrapper.setHoodieCompactionPlan(plan); archivedMetaWrapper.setActionType(ActionType.compaction.name()); break; } case HoodieTimeline.LOG_COMPACTION_ACTION: { - HoodieCompactionPlan plan = CompactionUtils.getCompactionPlan(metaClient, hoodieInstant); + HoodieCompactionPlan plan = CompactionUtils.getCompactionPlan(metaClient, new ByteArrayInputStream(planBytes.get())); archivedMetaWrapper.setHoodieCompactionPlan(plan); archivedMetaWrapper.setActionType(ActionType.logcompaction.name()); break; @@ -256,6 +261,16 @@ public class MetadataConversionUtils { return archivedMetaWrapper; } + private static Option<HoodieCommitMetadata> getInflightCommitMetadata(HoodieTableMetaClient metaClient, HoodieInstant instant, + Option<byte[]> inflightContent) throws IOException { + if (!inflightContent.isPresent() || inflightContent.get().length == 0) { + // inflight files can be empty in some certain cases, e.g. when users opt in clustering + return Option.empty(); + } + return Option.of(metaClient.getCommitMetadataSerDe().deserialize(instant, new ByteArrayInputStream(inflightContent.get()), + () -> inflightContent.get().length == 0, HoodieCommitMetadata.class)); + } + public static HoodieLSMTimelineInstant createLSMTimelineInstant(ActiveAction activeAction, HoodieTableMetaClient metaClient) { HoodieLSMTimelineInstant lsmTimelineInstant = new HoodieLSMTimelineInstant(); lsmTimelineInstant.setInstantTime(activeAction.getInstantTime());
