This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 687bf6b04511331d51dad398657554986d9ab13c Author: Sivabalan Narayanan <[email protected]> AuthorDate: Wed May 15 07:44:17 2024 -0700 [HUDI-7532] Include only compaction instants for lastCompaction in getDeltaCommitsSinceLatestCompaction (#10915) * Fixing schedule compaction bug * Addressing comments * Fixing CDC tests --- .../hudi/cli/commands/CompactionCommand.java | 2 +- .../hudi/cli/commands/FileSystemViewCommand.java | 2 +- .../hudi/cli/commands/HoodieLogFileCommand.java | 2 +- .../apache/hudi/cli/commands/RepairsCommand.java | 4 +- .../org/apache/hudi/cli/commands/StatsCommand.java | 2 +- .../java/org/apache/hudi/cli/utils/CommitUtil.java | 2 +- .../apache/hudi/cli/commands/TestTableCommand.java | 6 +-- .../hudi/cli/integ/ITTestSavepointsCommand.java | 6 +-- .../index/bucket/ConsistentBucketIndexUtils.java | 2 +- .../metadata/HoodieBackedTableMetadataWriter.java | 2 +- .../table/action/commit/JavaUpsertPartitioner.java | 2 +- .../hudi/client/TestJavaHoodieBackedMetadata.java | 14 +++--- .../TestHoodieJavaClientOnCopyOnWriteStorage.java | 2 +- .../testutils/HoodieJavaClientTestHarness.java | 2 +- .../java/org/apache/hudi/client/TestMultiFS.java | 4 +- .../hudi/client/TestTableSchemaEvolution.java | 4 +- .../functional/TestHoodieBackedMetadata.java | 14 +++--- .../TestHoodieClientOnCopyOnWriteStorage.java | 2 +- .../org/apache/hudi/io/TestHoodieMergeHandle.java | 8 +-- .../java/org/apache/hudi/table/TestCleaner.java | 2 +- .../hudi/table/TestHoodieMergeOnReadTable.java | 6 +-- .../table/action/compact/TestInlineCompaction.java | 6 +-- .../TestCopyOnWriteRollbackActionExecutor.java | 2 +- ...dieSparkMergeOnReadTableInsertUpdateDelete.java | 4 +- .../TestHoodieSparkMergeOnReadTableRollback.java | 6 +-- .../hudi/testutils/HoodieClientTestBase.java | 2 +- .../SparkClientFunctionalTestHarness.java | 4 +- .../hudi/common/table/HoodieTableMetaClient.java | 6 +-- .../table/timeline/HoodieDefaultTimeline.java | 11 +++- .../apache/hudi/common/util/CompactionUtils.java | 3 +- .../hudi/metadata/HoodieBackedTableMetadata.java | 2 +- .../common/table/TestHoodieTableMetaClient.java | 8 +-- .../hudi/common/table/TestTimelineUtils.java | 12 ++--- .../table/timeline/TestHoodieActiveTimeline.java | 44 ++++++++++------ .../hudi/common/util/TestCompactionUtils.java | 58 ++++++++++++++++++++++ .../RepairAddpartitionmetaProcedure.scala | 2 +- .../RepairMigratePartitionMetaProcedure.scala | 2 +- .../ShowHoodieLogFileRecordsProcedure.scala | 2 +- .../StatsWriteAmplificationProcedure.scala | 2 +- .../procedures/ValidateHoodieSyncProcedure.scala | 2 +- .../src/test/java/HoodieJavaStreamingApp.java | 4 +- .../hudi/functional/TestMORDataSourceStorage.scala | 2 +- .../hudi/functional/TestStructuredStreaming.scala | 2 +- .../functional/cdc/TestCDCDataFrameSuite.scala | 26 +++++----- .../sql/hudi/procedure/TestRepairsProcedure.scala | 8 +-- .../deltastreamer/HoodieDeltaStreamerTestBase.java | 4 +- .../deltastreamer/TestHoodieDeltaStreamer.java | 2 +- 47 files changed, 197 insertions(+), 119 deletions(-) diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java index 1679a327007..6a297e868e0 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java @@ -316,7 +316,7 @@ public class CompactionCommand { .filter(pair -> pair.getRight() != null) .collect(Collectors.toList()); - Set<String> committedInstants = timeline.getCommitTimeline().filterCompletedInstants() + Set<String> committedInstants = timeline.getCommitAndReplaceTimeline().filterCompletedInstants() .getInstantsAsStream().map(HoodieInstant::getTimestamp).collect(Collectors.toSet()); List<Comparable[]> rows = new ArrayList<>(); diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/FileSystemViewCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/FileSystemViewCommand.java index cbb2ae2177c..e9a3a3c922a 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/FileSystemViewCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/FileSystemViewCommand.java @@ -247,7 +247,7 @@ public class FileSystemViewCommand { HoodieTimeline timeline; if (basefileOnly) { - timeline = metaClient.getActiveTimeline().getCommitTimeline(); + timeline = metaClient.getActiveTimeline().getCommitAndReplaceTimeline(); } else if (excludeCompaction) { timeline = metaClient.getActiveTimeline().getCommitsTimeline(); } else { diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java index 307ca81cea0..b4c72021ee6 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java @@ -232,7 +232,7 @@ public class HoodieLogFileCommand { .withReaderSchema(readerSchema) .withLatestInstantTime( client.getActiveTimeline() - .getCommitTimeline().lastInstant().get().getTimestamp()) + .getCommitAndReplaceTimeline().lastInstant().get().getTimestamp()) .withReverseReader( Boolean.parseBoolean( HoodieCompactionConfig.COMPACTION_REVERSE_LOG_READ_ENABLE.defaultValue())) diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java index 8783e749057..2418976c4e4 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java @@ -118,7 +118,7 @@ public class RepairsCommand { HoodieTableMetaClient client = HoodieCLI.getTableMetaClient(); String latestCommit = - client.getActiveTimeline().getCommitTimeline().lastInstant().get().getTimestamp(); + client.getActiveTimeline().getCommitAndReplaceTimeline().lastInstant().get().getTimestamp(); List<String> partitionPaths = FSUtils.getAllPartitionFoldersThreeLevelsDown(HoodieCLI.storage, client.getBasePath()); StoragePath basePath = client.getBasePathV2(); @@ -239,7 +239,7 @@ public class RepairsCommand { Option<StoragePath> baseFormatFile = HoodiePartitionMetadata.baseFormatMetaPathIfExists(HoodieCLI.storage, partition); String latestCommit = - client.getActiveTimeline().getCommitTimeline().lastInstant().get().getTimestamp(); + client.getActiveTimeline().getCommitAndReplaceTimeline().lastInstant().get().getTimestamp(); String[] row = new String[] { partitionPath, diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/StatsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/StatsCommand.java index f8e60ba8cee..9f859bf72bf 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/StatsCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/StatsCommand.java @@ -69,7 +69,7 @@ public class StatsCommand { long totalRecordsWritten = 0; HoodieActiveTimeline activeTimeline = HoodieCLI.getTableMetaClient().getActiveTimeline(); - HoodieTimeline timeline = activeTimeline.getCommitTimeline().filterCompletedInstants(); + HoodieTimeline timeline = activeTimeline.getCommitAndReplaceTimeline().filterCompletedInstants(); List<Comparable[]> rows = new ArrayList<>(); DecimalFormat df = new DecimalFormat("#.00"); diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/CommitUtil.java b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/CommitUtil.java index 21910fd956d..12322617fb2 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/CommitUtil.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/CommitUtil.java @@ -36,7 +36,7 @@ public class CommitUtil { public static long countNewRecords(HoodieTableMetaClient metaClient, List<String> commitsToCatchup) throws IOException { long totalNew = 0; - HoodieTimeline timeline = metaClient.reloadActiveTimeline().getCommitTimeline().filterCompletedInstants(); + HoodieTimeline timeline = metaClient.reloadActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants(); for (String commit : commitsToCatchup) { HoodieCommitMetadata c = HoodieCommitMetadata.fromBytes( timeline.getInstantDetails(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, commit)).get(), diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestTableCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestTableCommand.java index c3bbbef0cf4..87bb2b7d406 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestTableCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestTableCommand.java @@ -192,7 +192,7 @@ public class TestTableCommand extends CLIFunctionalTestHarness { assertTrue(prepareTable()); HoodieTimeline timeline = - HoodieCLI.getTableMetaClient().getActiveTimeline().getCommitTimeline().filterCompletedInstants(); + HoodieCLI.getTableMetaClient().getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants(); assertEquals(0, timeline.countInstants(), "There should have no instant at first"); // generate four savepoints @@ -203,14 +203,14 @@ public class TestTableCommand extends CLIFunctionalTestHarness { // Before refresh, no instant timeline = - HoodieCLI.getTableMetaClient().getActiveTimeline().getCommitTimeline().filterCompletedInstants(); + HoodieCLI.getTableMetaClient().getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants(); assertEquals(0, timeline.countInstants(), "there should have no instant"); Object result = shell.evaluate(() -> command); assertTrue(ShellEvaluationResultUtil.isSuccess(result)); timeline = - HoodieCLI.getTableMetaClient().getActiveTimeline().getCommitTimeline().filterCompletedInstants(); + HoodieCLI.getTableMetaClient().getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants(); // After refresh, there are 4 instants assertEquals(4, timeline.countInstants(), "there should have 4 instants"); diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestSavepointsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestSavepointsCommand.java index 8f1d07b4eb5..ced1cf7a3ef 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestSavepointsCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestSavepointsCommand.java @@ -137,7 +137,7 @@ public class ITTestSavepointsCommand extends HoodieCLIIntegrationTestBase { assertEquals(1, timeline.getRestoreTimeline().countInstants()); // 103 instant had rollback - assertFalse(timeline.getCommitTimeline().containsInstant( + assertFalse(timeline.getCommitAndReplaceTimeline().containsInstant( new HoodieInstant(HoodieInstant.State.COMPLETED, "commit", "103"))); } @@ -182,9 +182,9 @@ public class ITTestSavepointsCommand extends HoodieCLIIntegrationTestBase { assertEquals(1, timeline.getRestoreTimeline().countInstants()); // 103 and 104 instant had rollback - assertFalse(timeline.getCommitTimeline().containsInstant( + assertFalse(timeline.getCommitAndReplaceTimeline().containsInstant( new HoodieInstant(HoodieInstant.State.COMPLETED, "commit", "103"))); - assertFalse(timeline.getCommitTimeline().containsInstant( + assertFalse(timeline.getCommitAndReplaceTimeline().containsInstant( new HoodieInstant(HoodieInstant.State.COMPLETED, "commit", "104"))); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java index 99b5d833f50..6023b17ce0d 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java @@ -143,7 +143,7 @@ public class ConsistentBucketIndexUtils { && maxCommitMetaFileTs.equals(HoodieConsistentHashingMetadata.getTimestampFromFile(maxMetadataFile.getPath().getName()))) { return loadMetadataFromGivenFile(table, maxMetadataFile); } - HoodieTimeline completedCommits = metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants(); + HoodieTimeline completedCommits = metaClient.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants(); // fix the in-consistency between un-committed and committed hashing metadata files. List<FileStatus> fixed = new ArrayList<>(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index dd292830a85..46323954a5b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -1330,7 +1330,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> implements HoodieTableM protected void cleanIfNecessary(BaseHoodieWriteClient writeClient, String instantTime) { Option<HoodieInstant> lastCompletedCompactionInstant = metadataMetaClient.reloadActiveTimeline() - .getCommitTimeline().filterCompletedInstants().lastInstant(); + .getCommitAndReplaceTimeline().filterCompletedInstants().lastInstant(); if (lastCompletedCompactionInstant.isPresent() && metadataMetaClient.getActiveTimeline().filterCompletedInstants() .findInstantsAfter(lastCompletedCompactionInstant.get().getTimestamp()).countInstants() < 3) { diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertPartitioner.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertPartitioner.java index 8703ffb9de0..7084ae013e4 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertPartitioner.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertPartitioner.java @@ -132,7 +132,7 @@ public class JavaUpsertPartitioner<T> implements Partitioner { // for new inserts, compute buckets depending on how many records we have for each partition Set<String> partitionPaths = profile.getPartitionPaths(); long averageRecordSize = - averageBytesPerRecord(table.getMetaClient().getActiveTimeline().getCommitTimeline().filterCompletedInstants(), + averageBytesPerRecord(table.getMetaClient().getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants(), config); LOG.info("AvgRecordSize => " + averageRecordSize); diff --git a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java index 1c26fb82001..d697c192221 100644 --- a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java +++ b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java @@ -1716,7 +1716,7 @@ public class TestJavaHoodieBackedMetadata extends TestHoodieMetadataBase { assertTrue(metadataMetaClient.getActiveTimeline().containsInstant(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "0000004"))); // Compaction may occur if the commits completed in order - assertTrue(metadataMetaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().countInstants() <= 1); + assertTrue(metadataMetaClient.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants().countInstants() <= 1); // Validation validateMetadata(writeClients[0]); @@ -1763,7 +1763,7 @@ public class TestJavaHoodieBackedMetadata extends TestHoodieMetadataBase { // 6 commits and 2 cleaner commits. assertEquals(metadataMetaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().countInstants(), 8); - assertTrue(metadataMetaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().countInstants() <= 1); + assertTrue(metadataMetaClient.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants().countInstants() <= 1); // Validation validateMetadata(writeClient); } @@ -2034,7 +2034,7 @@ public class TestJavaHoodieBackedMetadata extends TestHoodieMetadataBase { // There should not be any compaction yet and we have not performed more than maxDeltaCommitsBeforeCompaction // deltacommits (1 will be due to bootstrap) HoodieActiveTimeline metadataTimeline = metadataMetaClient.reloadActiveTimeline(); - assertEquals(metadataTimeline.getCommitTimeline().filterCompletedInstants().countInstants(), 0); + assertEquals(metadataTimeline.getCommitAndReplaceTimeline().filterCompletedInstants().countInstants(), 0); assertEquals(metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants(), maxDeltaCommitsBeforeCompaction - 1); assertEquals(datasetMetaClient.getArchivedTimeline().reload().countInstants(), 0); @@ -2044,7 +2044,7 @@ public class TestJavaHoodieBackedMetadata extends TestHoodieMetadataBase { client.startCommitWithTime(newCommitTime); client.insert(records, newCommitTime); metadataTimeline = metadataMetaClient.reloadActiveTimeline(); - assertEquals(metadataTimeline.getCommitTimeline().filterCompletedInstants().countInstants(), 1); + assertEquals(metadataTimeline.getCommitAndReplaceTimeline().filterCompletedInstants().countInstants(), 1); assertEquals(metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants(), maxDeltaCommitsBeforeCompaction + 1); assertEquals(datasetMetaClient.getArchivedTimeline().reload().countInstants(), 0); @@ -2065,7 +2065,7 @@ public class TestJavaHoodieBackedMetadata extends TestHoodieMetadataBase { // Ensure no more compactions took place due to the leftover inflight commit metadataTimeline = metadataMetaClient.reloadActiveTimeline(); - assertEquals(metadataTimeline.getCommitTimeline().filterCompletedInstants().countInstants(), 1); + assertEquals(metadataTimeline.getCommitAndReplaceTimeline().filterCompletedInstants().countInstants(), 1); assertEquals(metadataTimeline.getDeltaCommitTimeline().filterCompletedInstants().countInstants(), ((2 * maxDeltaCommitsBeforeCompaction) + (maxDeltaCommitsBeforeCompaction /* clean from dataset */) + 1)/* clean in metadata table */); @@ -2080,7 +2080,7 @@ public class TestJavaHoodieBackedMetadata extends TestHoodieMetadataBase { // Ensure compactions took place metadataTimeline = metadataMetaClient.reloadActiveTimeline(); - assertEquals(metadataTimeline.getCommitTimeline().filterCompletedInstants().countInstants(), 2); + assertEquals(metadataTimeline.getCommitAndReplaceTimeline().filterCompletedInstants().countInstants(), 2); assertEquals(metadataTimeline.getDeltaCommitTimeline().filterCompletedInstants().countInstants(), ((2 * maxDeltaCommitsBeforeCompaction) + (maxDeltaCommitsBeforeCompaction + 1 /* clean from dataset */) + 2 /* clean in metadata table */)); assertTrue(datasetMetaClient.getArchivedTimeline().reload().countInstants() > 0); @@ -2428,7 +2428,7 @@ public class TestJavaHoodieBackedMetadata extends TestHoodieMetadataBase { client.upsert(records, newCommitTime); } } - assertEquals(metaClient.reloadActiveTimeline().getCommitTimeline().filterCompletedInstants().countInstants(), 3); + assertEquals(metaClient.reloadActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants().countInstants(), 3); try (HoodieJavaWriteClient client = new HoodieJavaWriteClient(engineContext, writeConfig)) { // Perform a clean diff --git a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java index 6f5352e2a34..0d4b77ec43d 100644 --- a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java @@ -520,7 +520,7 @@ public class TestHoodieJavaClientOnCopyOnWriteStorage extends HoodieJavaClientTe 0, 150); HoodieActiveTimeline activeTimeline = new HoodieActiveTimeline(metaClient, false); - List<HoodieInstant> instants = activeTimeline.getCommitTimeline().getInstants(); + List<HoodieInstant> instants = activeTimeline.getCommitAndReplaceTimeline().getInstants(); assertEquals(5, instants.size()); assertEquals(new HoodieInstant(COMPLETED, COMMIT_ACTION, "001"), instants.get(0)); diff --git a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java index 430f8f01a5e..1e43a4d3840 100644 --- a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java +++ b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java @@ -867,7 +867,7 @@ public abstract class HoodieJavaClientTestHarness extends HoodieWriterClientTest // verify that there is a commit HoodieTableMetaClient metaClient = createMetaClient(); - HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline(); + HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitAndReplaceTimeline(); if (assertForCommit) { assertEquals(3, timeline.findInstantsAfter(initCommitTime, Integer.MAX_VALUE).countInstants(), diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiFS.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiFS.java index 007097a0a6c..230f684d165 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiFS.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiFS.java @@ -135,7 +135,7 @@ public class TestMultiFS extends HoodieSparkClientTestHarness { // Read from hdfs FileSystem fs = HadoopFSUtils.getFs(dfsBasePath, HoodieTestUtils.getDefaultStorageConf()); HoodieTableMetaClient metaClient = HoodieTestUtils.createMetaClient(HadoopFSUtils.getStorageConf(fs.getConf()), dfsBasePath); - HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline(); + HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitAndReplaceTimeline(); Dataset<Row> readRecords = HoodieClientTestUtils.readCommit(dfsBasePath, sqlContext, timeline, readCommitTime); assertEquals(readRecords.count(), records.size()); @@ -156,7 +156,7 @@ public class TestMultiFS extends HoodieSparkClientTestHarness { LOG.info("Reading from path: " + tablePath); fs = HadoopFSUtils.getFs(tablePath, HoodieTestUtils.getDefaultStorageConf()); metaClient = HoodieTestUtils.createMetaClient(new HadoopStorageConfiguration(fs.getConf()), tablePath); - timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline(); + timeline = new HoodieActiveTimeline(metaClient).getCommitAndReplaceTimeline(); Dataset<Row> localReadRecords = HoodieClientTestUtils.readCommit(tablePath, sqlContext, timeline, writeCommitTime); assertEquals(localReadRecords.count(), localRecords.size()); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java index aeb0627744e..9ed2dce3ce5 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java @@ -309,7 +309,7 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase { (String s, Integer a) -> evolvedRecords, SparkRDDWriteClient::insert, true, numRecords, 3 * numRecords, 6, false); // new commit - HoodieTimeline curTimeline = metaClient.reloadActiveTimeline().getCommitTimeline().filterCompletedInstants(); + HoodieTimeline curTimeline = metaClient.reloadActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants(); assertTrue(curTimeline.lastInstant().get().getTimestamp().equals("006")); checkReadRecords("000", 3 * numRecords); @@ -333,7 +333,7 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase { private void checkReadRecords(String instantTime, int numExpectedRecords) throws IOException { if (tableType == HoodieTableType.COPY_ON_WRITE) { - HoodieTimeline timeline = metaClient.reloadActiveTimeline().getCommitTimeline(); + HoodieTimeline timeline = metaClient.reloadActiveTimeline().getCommitAndReplaceTimeline(); assertEquals(numExpectedRecords, HoodieClientTestUtils.countRecordsOptionallySince(jsc, basePath, sqlContext, timeline, Option.of(instantTime))); } else { // TODO: This code fails to read records under the following conditions: diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index 30b1b63998d..3dfb61c2cea 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -2131,7 +2131,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { assertTrue(metadataMetaClient.getActiveTimeline().containsInstant(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "0000004"))); // Compaction may occur if the commits completed in order - assertTrue(metadataMetaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().countInstants() <= 1); + assertTrue(metadataMetaClient.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants().countInstants() <= 1); // Validation validateMetadata(writeClients[0]); @@ -2179,7 +2179,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { // 6 commits and 2 cleaner commits. assertEquals(metadataMetaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().countInstants(), 8); - assertTrue(metadataMetaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().countInstants() <= 1); + assertTrue(metadataMetaClient.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants().countInstants() <= 1); // Validation validateMetadata(writeClient); } @@ -2444,7 +2444,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { // There should not be any compaction yet and we have not performed more than maxDeltaCommitsBeforeCompaction // deltacommits (1 will be due to bootstrap) HoodieActiveTimeline metadataTimeline = metadataMetaClient.reloadActiveTimeline(); - assertEquals(metadataTimeline.getCommitTimeline().filterCompletedInstants().countInstants(), 0); + assertEquals(metadataTimeline.getCommitAndReplaceTimeline().filterCompletedInstants().countInstants(), 0); assertEquals(metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants(), maxDeltaCommitsBeforeCompaction - 1); assertEquals(datasetMetaClient.getArchivedTimeline().reload().countInstants(), 0); @@ -2454,7 +2454,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { client.startCommitWithTime(newCommitTime); client.insert(jsc.parallelize(records, 1), newCommitTime).collect(); metadataTimeline = metadataMetaClient.reloadActiveTimeline(); - assertEquals(metadataTimeline.getCommitTimeline().filterCompletedInstants().countInstants(), 1); + assertEquals(metadataTimeline.getCommitAndReplaceTimeline().filterCompletedInstants().countInstants(), 1); assertEquals(metadataTimeline.getCommitsTimeline().filterCompletedInstants().countInstants(), maxDeltaCommitsBeforeCompaction + 1); assertEquals(datasetMetaClient.getArchivedTimeline().reload().countInstants(), 0); @@ -2475,7 +2475,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { // Ensure no more compactions took place due to the leftover inflight commit metadataTimeline = metadataMetaClient.reloadActiveTimeline(); - assertEquals(metadataTimeline.getCommitTimeline().filterCompletedInstants().countInstants(), 1); + assertEquals(metadataTimeline.getCommitAndReplaceTimeline().filterCompletedInstants().countInstants(), 1); assertEquals(metadataTimeline.getDeltaCommitTimeline().filterCompletedInstants().countInstants(), ((2 * maxDeltaCommitsBeforeCompaction) + (maxDeltaCommitsBeforeCompaction /* clean from dataset */) + 1)/* clean in metadata table */); @@ -2490,7 +2490,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { // Ensure compactions took place metadataTimeline = metadataMetaClient.reloadActiveTimeline(); - assertEquals(metadataTimeline.getCommitTimeline().filterCompletedInstants().countInstants(), 2); + assertEquals(metadataTimeline.getCommitAndReplaceTimeline().filterCompletedInstants().countInstants(), 2); assertEquals(metadataTimeline.getDeltaCommitTimeline().filterCompletedInstants().countInstants(), ((2 * maxDeltaCommitsBeforeCompaction) + (maxDeltaCommitsBeforeCompaction + 1 /* clean from dataset */) + 2 /* clean in metadata table */)); assertTrue(datasetMetaClient.getArchivedTimeline().reload().countInstants() > 0); @@ -3120,7 +3120,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { client.upsert(jsc.parallelize(records, 1), newCommitTime).collect(); } } - assertEquals(metaClient.reloadActiveTimeline().getCommitTimeline().filterCompletedInstants().countInstants(), 3); + assertEquals(metaClient.reloadActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants().countInstants(), 3); try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, writeConfig)) { // Perform a clean diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java index 0db85ae69c1..74e998349ea 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java @@ -685,7 +685,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { 0, 150); HoodieActiveTimeline activeTimeline = new HoodieActiveTimeline(metaClient, false); - List<HoodieInstant> instants = activeTimeline.getCommitTimeline().getInstants(); + List<HoodieInstant> instants = activeTimeline.getCommitAndReplaceTimeline().getInstants(); assertEquals(5, instants.size()); assertEquals(new HoodieInstant(COMPLETED, COMMIT_ACTION, "001"), instants.get(0)); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java index c451f4bd938..ad612ee5c9b 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java @@ -121,7 +121,7 @@ public class TestHoodieMergeHandle extends HoodieSparkClientTestHarness { // verify that there is a commit metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline(); + HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitAndReplaceTimeline(); assertEquals(1, timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants(), "Expecting a single commit."); assertEquals(newCommitTime, timeline.lastInstant().get().getTimestamp(), "Latest commit should be 001"); @@ -147,7 +147,7 @@ public class TestHoodieMergeHandle extends HoodieSparkClientTestHarness { // verify that there are 2 commits metaClient = HoodieTableMetaClient.reload(metaClient); - timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline(); + timeline = new HoodieActiveTimeline(metaClient).getCommitAndReplaceTimeline(); assertEquals(2, timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants(), "Expecting two commits."); assertEquals(newCommitTime, timeline.lastInstant().get().getTimestamp(), "Latest commit should be 002"); Dataset<Row> dataSet = getRecords(); @@ -167,7 +167,7 @@ public class TestHoodieMergeHandle extends HoodieSparkClientTestHarness { // verify that there are now 3 commits metaClient = HoodieTableMetaClient.reload(metaClient); - timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline(); + timeline = new HoodieActiveTimeline(metaClient).getCommitAndReplaceTimeline(); assertEquals(3, timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants(), "Expecting three commits."); assertEquals(newCommitTime, timeline.lastInstant().get().getTimestamp(), "Latest commit should be 003"); dataSet = getRecords(); @@ -197,7 +197,7 @@ public class TestHoodieMergeHandle extends HoodieSparkClientTestHarness { assertNoWriteErrors(statuses); // verify there are now 4 commits - timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline(); + timeline = new HoodieActiveTimeline(metaClient).getCommitAndReplaceTimeline(); assertEquals(4, timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants(), "Expecting four commits."); assertEquals(timeline.lastInstant().get().getTimestamp(), newCommitTime, "Latest commit should be 004"); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java index 723fa6b1614..2de9f5d3784 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java @@ -154,7 +154,7 @@ public class TestCleaner extends HoodieCleanerTestBase { assertNoWriteErrors(statuses.collect()); // verify that there is a commit metaClient = HoodieTableMetaClient.reload(metaClient); - HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline(); + HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitAndReplaceTimeline(); assertEquals(1, timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants(), "Expecting a single commit."); // Should have 100 records in table (check using Index), all in locations marked at commit HoodieTable table = HoodieSparkTable.create(client.getConfig(), context, metaClient); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java index f037f46a309..9e1f4277c57 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java @@ -161,7 +161,7 @@ public class TestHoodieMergeOnReadTable extends SparkClientFunctionalTestHarness assertTrue(deltaCommit.isPresent()); assertEquals("001", deltaCommit.get().getTimestamp(), "Delta commit should be 001"); - Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); + Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitAndReplaceTimeline().firstInstant(); assertFalse(commit.isPresent()); List<StoragePathInfo> allFiles = listAllBaseFilesInPath(hoodieTable); @@ -195,7 +195,7 @@ public class TestHoodieMergeOnReadTable extends SparkClientFunctionalTestHarness assertTrue(deltaCommit.isPresent()); assertEquals("002", deltaCommit.get().getTimestamp(), "Latest Delta commit should be 002"); - commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); + commit = metaClient.getActiveTimeline().getCommitAndReplaceTimeline().firstInstant(); assertFalse(commit.isPresent()); allFiles = listAllBaseFilesInPath(hoodieTable); @@ -653,7 +653,7 @@ public class TestHoodieMergeOnReadTable extends SparkClientFunctionalTestHarness assertTrue(deltaCommit.isPresent()); assertEquals("001", deltaCommit.get().getTimestamp(), "Delta commit should be 001"); - Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); + Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitAndReplaceTimeline().firstInstant(); assertFalse(commit.isPresent()); List<StoragePathInfo> allFiles = listAllBaseFilesInPath(hoodieTable); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java index 209d70e499a..f271356bcb9 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java @@ -270,7 +270,7 @@ public class TestInlineCompaction extends CompactionTestBase { // Then: 1 delta commit is done, the failed compaction is retried metaClient = createMetaClient(cfg.getBasePath()); assertEquals(4, metaClient.getActiveTimeline().getWriteTimeline().countInstants()); - assertEquals(instantTime2, metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().firstInstant().get().getTimestamp()); + assertEquals(instantTime2, metaClient.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants().firstInstant().get().getTimestamp()); } @Test @@ -308,7 +308,7 @@ public class TestInlineCompaction extends CompactionTestBase { metaClient = createMetaClient(cfg.getBasePath()); // 2 delta commits at the beginning. 1 compaction, 1 delta commit following it. assertEquals(4, metaClient.getActiveTimeline().getWriteTimeline().countInstants()); - assertEquals(instantTime, metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().firstInstant().get().getTimestamp()); + assertEquals(instantTime, metaClient.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants().firstInstant().get().getTimestamp()); } @Test @@ -345,6 +345,6 @@ public class TestInlineCompaction extends CompactionTestBase { // Then: 1 delta commit is done, the failed compaction is retried metaClient = createMetaClient(cfg.getBasePath()); assertEquals(4, metaClient.getActiveTimeline().getWriteTimeline().countInstants()); - assertEquals(instantTime, metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().firstInstant().get().getTimestamp()); + assertEquals(instantTime, metaClient.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants().firstInstant().get().getTimestamp()); } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java index 00ff11b57d0..e78ed757e8f 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java @@ -289,7 +289,7 @@ public class TestCopyOnWriteRollbackActionExecutor extends HoodieClientRollbackT //2. rollback HoodieInstant commitInstant; if (isUsingMarkers) { - commitInstant = table.getActiveTimeline().getCommitTimeline().filterInflights().lastInstant().get(); + commitInstant = table.getActiveTimeline().getCommitAndReplaceTimeline().filterInflights().lastInstant().get(); } else { commitInstant = table.getCompletedCommitTimeline().lastInstant().get(); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java index 8e85208af6f..dd1d6c2431a 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java @@ -284,7 +284,7 @@ public class TestHoodieSparkMergeOnReadTableInsertUpdateDelete extends SparkClie assertTrue(deltaCommit.isPresent()); assertEquals("001", deltaCommit.get().getTimestamp(), "Delta commit should be 001"); - Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); + Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitAndReplaceTimeline().firstInstant(); assertFalse(commit.isPresent()); List<StoragePathInfo> allFiles = listAllBaseFilesInPath(hoodieTable); @@ -327,7 +327,7 @@ public class TestHoodieSparkMergeOnReadTableInsertUpdateDelete extends SparkClie assertTrue(deltaCommit.isPresent()); assertEquals("004", deltaCommit.get().getTimestamp(), "Latest Delta commit should be 004"); - commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); + commit = metaClient.getActiveTimeline().getCommitAndReplaceTimeline().firstInstant(); assertFalse(commit.isPresent()); allFiles = listAllBaseFilesInPath(hoodieTable); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java index 10d26f83698..c08026946c0 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java @@ -123,7 +123,7 @@ public class TestHoodieSparkMergeOnReadTableRollback extends TestHoodieSparkRoll client.commit(newCommitTime, jsc().parallelize(statuses)); metaClient = HoodieTableMetaClient.reload(metaClient); - Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); + Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitAndReplaceTimeline().firstInstant(); assertTrue(commit.isPresent()); assertEquals("001", commit.get().getTimestamp(), "commit should be 001"); @@ -199,7 +199,7 @@ public class TestHoodieSparkMergeOnReadTableRollback extends TestHoodieSparkRoll assertTrue(deltaCommit.isPresent()); assertEquals("000000001", deltaCommit.get().getTimestamp(), "Delta commit should be 000000001"); - Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); + Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitAndReplaceTimeline().firstInstant(); assertFalse(commit.isPresent()); List<StoragePathInfo> allFiles = listAllBaseFilesInPath(hoodieTable); @@ -505,7 +505,7 @@ public class TestHoodieSparkMergeOnReadTableRollback extends TestHoodieSparkRoll assertEquals(200, getTotalRecordsWritten(instantCommitMetadataPairOpt.get().getValue())); Option<HoodieInstant> commit = - metaClient.getActiveTimeline().getCommitTimeline().firstInstant(); + metaClient.getActiveTimeline().getCommitAndReplaceTimeline().firstInstant(); assertFalse(commit.isPresent()); HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context(), metaClient); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java index 09aff48224d..b41c15a9898 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java @@ -529,7 +529,7 @@ public class HoodieClientTestBase extends HoodieSparkClientTestHarness { // verify that there is a commit HoodieTableMetaClient metaClient = HoodieTestUtils.createMetaClient(storageConf, basePath); - HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline(); + HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitAndReplaceTimeline(); if (assertForCommit) { assertEquals(3, timeline.findInstantsAfter(initCommitTime, Integer.MAX_VALUE).countInstants(), diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java index e45578211cb..79dda856367 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java @@ -289,7 +289,7 @@ public class SparkClientFunctionalTestHarness implements SparkProvider, HoodieMe "Delta commit should be specified value"); Option<HoodieInstant> commit = - reloadedMetaClient.getActiveTimeline().getCommitTimeline().lastInstant(); + reloadedMetaClient.getActiveTimeline().getCommitAndReplaceTimeline().lastInstant(); assertFalse(commit.isPresent()); List<StoragePathInfo> allFiles = listAllBaseFilesInPath(hoodieTable); @@ -337,7 +337,7 @@ public class SparkClientFunctionalTestHarness implements SparkProvider, HoodieMe "Latest Delta commit should match specified time"); Option<HoodieInstant> commit = - reloadedMetaClient.getActiveTimeline().getCommitTimeline().firstInstant(); + reloadedMetaClient.getActiveTimeline().getCommitAndReplaceTimeline().firstInstant(); assertFalse(commit.isPresent()); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java index 319cbdfbb4a..436a8c221fe 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java @@ -547,7 +547,7 @@ public class HoodieTableMetaClient implements Serializable { public HoodieTimeline getCommitsTimeline() { switch (this.getTableType()) { case COPY_ON_WRITE: - return getActiveTimeline().getCommitTimeline(); + return getActiveTimeline().getCommitAndReplaceTimeline(); case MERGE_ON_READ: // We need to include the parquet files written out in delta commits // Include commit action to be able to start doing a MOR over a COW table - no @@ -567,7 +567,7 @@ public class HoodieTableMetaClient implements Serializable { public HoodieTimeline getCommitsAndCompactionTimeline() { switch (this.getTableType()) { case COPY_ON_WRITE: - return getActiveTimeline().getCommitTimeline(); + return getActiveTimeline().getCommitAndReplaceTimeline(); case MERGE_ON_READ: return getActiveTimeline().getWriteTimeline(); default: @@ -583,7 +583,7 @@ public class HoodieTableMetaClient implements Serializable { case COPY_ON_WRITE: case MERGE_ON_READ: // We need to include the parquet files written out in delta commits in tagging - return getActiveTimeline().getCommitTimeline(); + return getActiveTimeline().getCommitAndReplaceTimeline(); default: throw new HoodieException("Unsupported table type :" + this.getTableType()); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java index 68cf428d364..12ea0085d51 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java @@ -318,13 +318,20 @@ public class HoodieDefaultTimeline implements HoodieTimeline { } /** - * Get only pure commits (inflight and completed) in the active timeline. + * Get only pure commit and replace commits (inflight and completed) in the active timeline. */ - public HoodieTimeline getCommitTimeline() { + public HoodieTimeline getCommitAndReplaceTimeline() { //TODO: Make sure this change does not break existing functionality. return getTimelineOfActions(CollectionUtils.createSet(COMMIT_ACTION, REPLACE_COMMIT_ACTION)); } + /** + * Get only pure commits (inflight and completed) in the active timeline. + */ + public HoodieTimeline getCommitTimeline() { + return getTimelineOfActions(CollectionUtils.createSet(COMMIT_ACTION)); + } + /** * Get only the delta commits (inflight and completed) in the active timeline. */ diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java index 0f41f1314e1..4ef30a2656a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java @@ -285,8 +285,7 @@ public class CompactionUtils { */ public static Option<Pair<HoodieTimeline, HoodieInstant>> getDeltaCommitsSinceLatestCompaction( HoodieActiveTimeline activeTimeline) { - Option<HoodieInstant> lastCompaction = activeTimeline.getCommitTimeline() - .filterCompletedInstants().lastInstant(); + Option<HoodieInstant> lastCompaction = activeTimeline.getCommitTimeline().filterCompletedInstants().lastInstant(); HoodieTimeline deltaCommits = activeTimeline.getDeltaCommitTimeline(); HoodieInstant latestInstant; diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java index efdb1baf23d..2cb42af683b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java @@ -593,7 +593,7 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata { @Override public Option<String> getLatestCompactionTime() { if (metadataMetaClient != null) { - Option<HoodieInstant> latestCompaction = metadataMetaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().lastInstant(); + Option<HoodieInstant> latestCompaction = metadataMetaClient.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants().lastInstant(); if (latestCompaction.isPresent()) { return Option.of(latestCompaction.get().getTimestamp()); } diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java index 9bbc72289f5..0b90889cfa7 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java @@ -86,7 +86,7 @@ public class TestHoodieTableMetaClient extends HoodieCommonTestHarness { @Test public void checkCommitTimeline() { HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); - HoodieTimeline activeCommitTimeline = activeTimeline.getCommitTimeline(); + HoodieTimeline activeCommitTimeline = activeTimeline.getCommitAndReplaceTimeline(); assertTrue(activeCommitTimeline.empty(), "Should be empty commit timeline"); HoodieInstant instant = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, "1"); @@ -95,12 +95,12 @@ public class TestHoodieTableMetaClient extends HoodieCommonTestHarness { // Commit timeline should not auto-reload every time getActiveCommitTimeline(), it should be cached activeTimeline = metaClient.getActiveTimeline(); - activeCommitTimeline = activeTimeline.getCommitTimeline(); + activeCommitTimeline = activeTimeline.getCommitAndReplaceTimeline(); assertTrue(activeCommitTimeline.empty(), "Should be empty commit timeline"); - HoodieInstant completedInstant = HoodieTimeline.getCompletedInstant(instant); activeTimeline = activeTimeline.reload(); - activeCommitTimeline = activeTimeline.getCommitTimeline(); + HoodieInstant completedInstant = activeTimeline.getCommitsTimeline().getInstantsAsStream().findFirst().get(); + activeCommitTimeline = activeTimeline.getCommitAndReplaceTimeline(); assertFalse(activeCommitTimeline.empty(), "Should be the 1 commit we made"); assertEquals(completedInstant, activeCommitTimeline.getInstantsAsStream().findFirst().get(), "Commit should be 1"); diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java index eef515c6ada..588fc114a3e 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java @@ -107,7 +107,7 @@ public class TestTimelineUtils extends HoodieCommonTestHarness { @Test public void testGetPartitionsWithReplaceCommits() throws IOException { HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); - HoodieTimeline activeCommitTimeline = activeTimeline.getCommitTimeline(); + HoodieTimeline activeCommitTimeline = activeTimeline.getCommitAndReplaceTimeline(); assertTrue(activeCommitTimeline.empty()); String ts1 = "1"; @@ -146,7 +146,7 @@ public class TestTimelineUtils extends HoodieCommonTestHarness { @Test public void testGetPartitions() throws IOException { HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); - HoodieTimeline activeCommitTimeline = activeTimeline.getCommitTimeline(); + HoodieTimeline activeCommitTimeline = activeTimeline.getCommitAndReplaceTimeline(); assertTrue(activeCommitTimeline.empty()); String olderPartition = "0"; // older partitions that is modified by all cleans @@ -185,7 +185,7 @@ public class TestTimelineUtils extends HoodieCommonTestHarness { @Test public void testGetPartitionsUnPartitioned() throws IOException { HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); - HoodieTimeline activeCommitTimeline = activeTimeline.getCommitTimeline(); + HoodieTimeline activeCommitTimeline = activeTimeline.getCommitAndReplaceTimeline(); assertTrue(activeCommitTimeline.empty()); String partitionPath = ""; @@ -213,7 +213,7 @@ public class TestTimelineUtils extends HoodieCommonTestHarness { @Test public void testRestoreInstants() throws Exception { HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); - HoodieTimeline activeCommitTimeline = activeTimeline.getCommitTimeline(); + HoodieTimeline activeCommitTimeline = activeTimeline.getCommitAndReplaceTimeline(); assertTrue(activeCommitTimeline.empty()); for (int i = 1; i <= 5; i++) { @@ -238,7 +238,7 @@ public class TestTimelineUtils extends HoodieCommonTestHarness { String extraMetadataKey = "test_key"; String extraMetadataValue1 = "test_value1"; HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); - HoodieTimeline activeCommitTimeline = activeTimeline.getCommitTimeline(); + HoodieTimeline activeCommitTimeline = activeTimeline.getCommitAndReplaceTimeline(); assertTrue(activeCommitTimeline.empty()); assertFalse(TimelineUtils.getExtraMetadataFromLatest(metaClient, extraMetadataKey).isPresent()); @@ -616,7 +616,7 @@ public class TestTimelineUtils extends HoodieCommonTestHarness { @Test public void testGetDroppedPartitions() throws Exception { HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); - HoodieTimeline activeCommitTimeline = activeTimeline.getCommitTimeline(); + HoodieTimeline activeCommitTimeline = activeTimeline.getCommitAndReplaceTimeline(); assertTrue(activeCommitTimeline.empty()); String olderPartition = "p1"; // older partitions that will be deleted by clean commit diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java index fa2d7558ef5..1d4be5f02c8 100755 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java @@ -120,12 +120,16 @@ public class TestHoodieActiveTimeline extends HoodieCommonTestHarness { assertStreamEquals( Stream.of(instant1Complete, instant2Complete, instant3Complete, instant4Complete, instant5), timeline.getCommitTimeline().getInstantsAsStream(), "Check the instants stream"); + + assertStreamEquals( + Stream.of(instant1Complete, instant2Complete, instant3Complete, instant4Complete, instant5), + timeline.getCommitAndReplaceTimeline().getInstantsAsStream(), "Check the instants stream"); assertStreamEquals( Stream.of(instant1Complete, instant2Complete, instant3Complete, instant4Complete), - timeline.getCommitTimeline().filterCompletedInstants().getInstantsAsStream(), + timeline.getCommitAndReplaceTimeline().filterCompletedInstants().getInstantsAsStream(), "Check the instants stream"); assertStreamEquals(Stream.of(instant5), - timeline.getCommitTimeline().filterPendingExcludingMajorAndMinorCompaction().getInstantsAsStream(), + timeline.getCommitAndReplaceTimeline().filterPendingExcludingMajorAndMinorCompaction().getInstantsAsStream(), "Check the instants stream"); // Backwards compatibility testing for reading compaction plans @@ -174,23 +178,23 @@ public class TestHoodieActiveTimeline extends HoodieCommonTestHarness { timeline = new MockHoodieTimeline(Stream.of("01", "03", "05", "07", "09", "11", "13", "15", "17", "19"), Stream.of("21", "23")); assertStreamEquals(Stream.of("05", "07", "09", "11"), - timeline.getCommitTimeline().filterCompletedInstants().findInstantsInRange("04", "11") + timeline.getCommitAndReplaceTimeline().filterCompletedInstants().findInstantsInRange("04", "11") .getInstantsAsStream().map(HoodieInstant::getTimestamp), "findInstantsInRange should return 4 instants"); assertStreamEquals(Stream.of("03", "05", "07", "09", "11"), - timeline.getCommitTimeline().filterCompletedInstants().findInstantsInClosedRange("03", "11") + timeline.getCommitAndReplaceTimeline().filterCompletedInstants().findInstantsInClosedRange("03", "11") .getInstantsAsStream().map(HoodieInstant::getTimestamp), "findInstantsInClosedRange should return 5 instants"); assertStreamEquals(Stream.of("09", "11"), - timeline.getCommitTimeline().filterCompletedInstants().findInstantsAfter("07", 2) + timeline.getCommitAndReplaceTimeline().filterCompletedInstants().findInstantsAfter("07", 2) .getInstantsAsStream().map(HoodieInstant::getTimestamp), "findInstantsAfter 07 should return 2 instants"); assertStreamEquals(Stream.of("01", "03", "05"), - timeline.getCommitTimeline().filterCompletedInstants().findInstantsBefore("07") + timeline.getCommitAndReplaceTimeline().filterCompletedInstants().findInstantsBefore("07") .getInstantsAsStream().map(HoodieInstant::getTimestamp), "findInstantsBefore 07 should return 3 instants"); assertFalse(timeline.empty()); - assertFalse(timeline.getCommitTimeline().filterPendingExcludingMajorAndMinorCompaction().empty()); + assertFalse(timeline.getCommitAndReplaceTimeline().filterPendingExcludingMajorAndMinorCompaction().empty()); assertEquals(12, timeline.countInstants()); assertEquals("01", timeline.firstInstant( HoodieTimeline.COMMIT_ACTION, State.COMPLETED).get().getTimestamp()); @@ -201,7 +205,7 @@ public class TestHoodieActiveTimeline extends HoodieCommonTestHarness { assertFalse(timeline.firstInstant( HoodieTimeline.REPLACE_COMMIT_ACTION, State.COMPLETED).isPresent()); - HoodieTimeline activeCommitTimeline = timeline.getCommitTimeline().filterCompletedInstants(); + HoodieTimeline activeCommitTimeline = timeline.getCommitAndReplaceTimeline().filterCompletedInstants(); assertEquals(10, activeCommitTimeline.countInstants()); assertEquals("01", activeCommitTimeline.firstInstant().get().getTimestamp()); @@ -346,7 +350,7 @@ public class TestHoodieActiveTimeline extends HoodieCommonTestHarness { HoodieTimeline.COMMIT_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION, HoodieTimeline.REPLACE_COMMIT_ACTION)); checkTimeline.accept(timeline.getWriteTimeline(), CollectionUtils.createSet( HoodieTimeline.COMMIT_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION, HoodieTimeline.COMPACTION_ACTION, HoodieTimeline.REPLACE_COMMIT_ACTION)); - checkTimeline.accept(timeline.getCommitTimeline(), CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION, HoodieTimeline.REPLACE_COMMIT_ACTION)); + checkTimeline.accept(timeline.getCommitAndReplaceTimeline(), CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION, HoodieTimeline.REPLACE_COMMIT_ACTION)); checkTimeline.accept(timeline.getDeltaCommitTimeline(), Collections.singleton(HoodieTimeline.DELTA_COMMIT_ACTION)); checkTimeline.accept(timeline.getCleanerTimeline(), Collections.singleton(HoodieTimeline.CLEAN_ACTION)); checkTimeline.accept(timeline.getRollbackTimeline(), Collections.singleton(HoodieTimeline.ROLLBACK_ACTION)); @@ -551,12 +555,12 @@ public class TestHoodieActiveTimeline extends HoodieCommonTestHarness { public void testReplaceActionsTimeline() { int instantTime = 1; List<HoodieInstant> allInstants = new ArrayList<>(); - HoodieInstant instant = new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, String.format("%03d", instantTime++)); - allInstants.add(instant); - instant = new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, String.format("%03d", instantTime++)); - allInstants.add(instant); - instant = new HoodieInstant(State.COMPLETED, HoodieTimeline.REPLACE_COMMIT_ACTION, String.format("%03d", instantTime++)); - allInstants.add(instant); + HoodieInstant instant1 = new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, String.format("%03d", instantTime++)); + allInstants.add(instant1); + HoodieInstant instant2 = new HoodieInstant(State.COMPLETED, HoodieTimeline.COMMIT_ACTION, String.format("%03d", instantTime++)); + allInstants.add(instant2); + HoodieInstant instant3 = new HoodieInstant(State.COMPLETED, HoodieTimeline.REPLACE_COMMIT_ACTION, String.format("%03d", instantTime++)); + allInstants.add(instant3); timeline = new HoodieActiveTimeline(metaClient); timeline.setInstants(allInstants); @@ -564,8 +568,16 @@ public class TestHoodieActiveTimeline extends HoodieCommonTestHarness { timeline.getCompletedReplaceTimeline().getInstants(); assertEquals(1, validReplaceInstants.size()); - assertEquals(instant.getTimestamp(), validReplaceInstants.get(0).getTimestamp()); + assertEquals(instant3.getTimestamp(), validReplaceInstants.get(0).getTimestamp()); assertEquals(HoodieTimeline.REPLACE_COMMIT_ACTION, validReplaceInstants.get(0).getAction()); + + assertStreamEquals( + Stream.of(instant1, instant2, instant3), + timeline.getCommitAndReplaceTimeline().getInstantsAsStream(), "Check the instants stream"); + + assertStreamEquals( + Stream.of(instant1, instant2), + timeline.getCommitTimeline().getInstantsAsStream(), "Check the instants stream"); } @Test diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java index 4741cdef1f8..407251c64b2 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java @@ -291,6 +291,59 @@ public class TestCompactionUtils extends HoodieCommonTestHarness { } } + @Test + public void testGetDeltaCommitsSinceLastCompactionWithCompletedReplaceCommits() { + // 4th replace commit. + HoodieActiveTimeline timeline = new MockHoodieActiveTimeline( + Stream.of(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "01"), + new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "02"), + new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "03"), + new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "04"), + new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "05"), + new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "06"), + new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "07"), + new HoodieInstant(true, HoodieTimeline.DELTA_COMMIT_ACTION, "08"), + new HoodieInstant(true, HoodieTimeline.DELTA_COMMIT_ACTION, "09"))); + + Pair<HoodieTimeline, HoodieInstant> actual = + CompactionUtils.getDeltaCommitsSinceLatestCompaction(timeline).get(); + assertEquals( + Stream.of( + new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "07"), + new HoodieInstant(true, HoodieTimeline.DELTA_COMMIT_ACTION, "08"), + new HoodieInstant(true, HoodieTimeline.DELTA_COMMIT_ACTION, "09")) + .collect(Collectors.toList()), + actual.getLeft().getInstants()); + assertEquals( + new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "06"), + actual.getRight()); + + // mix of compaction commit and replace commit. + timeline = new MockHoodieActiveTimeline( + Stream.of(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "01"), + new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "02"), + new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "03"), + new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "04"), + new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "05"), + new HoodieInstant(false, HoodieTimeline.REPLACE_COMMIT_ACTION, "06"), + new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "07"), + new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "08"), + new HoodieInstant(true, HoodieTimeline.DELTA_COMMIT_ACTION, "09"))); + + actual = + CompactionUtils.getDeltaCommitsSinceLatestCompaction(timeline).get(); + assertEquals( + Stream.of( + new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "07"), + new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, "08"), + new HoodieInstant(true, HoodieTimeline.DELTA_COMMIT_ACTION, "09")) + .collect(Collectors.toList()), + actual.getLeft().getInstants()); + assertEquals( + new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "05"), + actual.getRight()); + } + @Test public void testGetDeltaCommitsSinceLatestCompactionWithEmptyDeltaCommits() { HoodieActiveTimeline timeline = new MockHoodieActiveTimeline(); @@ -386,6 +439,11 @@ public class TestCompactionUtils extends HoodieCommonTestHarness { this.setInstants(new ArrayList<>()); } + public MockHoodieActiveTimeline(Stream<HoodieInstant> instants) { + super(); + setInstants(instants.collect(Collectors.toList())); + } + public MockHoodieActiveTimeline( Stream<String> completedDeltaCommits, Stream<String> completedCompactionCommits, diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairAddpartitionmetaProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairAddpartitionmetaProcedure.scala index 2319d40480e..1f523aabc99 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairAddpartitionmetaProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairAddpartitionmetaProcedure.scala @@ -54,7 +54,7 @@ class RepairAddpartitionmetaProcedure extends BaseProcedure with ProcedureBuilde val metaClient = createMetaClient(jsc, tablePath) - val latestCommit: String = metaClient.getActiveTimeline.getCommitTimeline.lastInstant.get.getTimestamp + val latestCommit: String = metaClient.getActiveTimeline.getCommitAndReplaceTimeline.lastInstant.get.getTimestamp val partitionPaths: util.List[String] = FSUtils.getAllPartitionFoldersThreeLevelsDown(metaClient.getStorage, tablePath); val basePath: StoragePath = new StoragePath(tablePath) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairMigratePartitionMetaProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairMigratePartitionMetaProcedure.scala index b9f43e12e66..292f6d5fdee 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairMigratePartitionMetaProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairMigratePartitionMetaProcedure.scala @@ -72,7 +72,7 @@ class RepairMigratePartitionMetaProcedure extends BaseProcedure with ProcedureBu metaClient.getStorage, partition) val baseFormatFile: Option[StoragePath] = HoodiePartitionMetadata.baseFormatMetaPathIfExists( metaClient.getStorage, partition) - val latestCommit: String = metaClient.getActiveTimeline.getCommitTimeline.lastInstant.get.getTimestamp + val latestCommit: String = metaClient.getActiveTimeline.getCommitAndReplaceTimeline.lastInstant.get.getTimestamp var action = if (textFormatFile.isPresent) "MIGRATE" else "NONE" if (!dryRun) { if (!baseFormatFile.isPresent) { diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala index 4afa328b84a..1a025042f9b 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala @@ -68,7 +68,7 @@ class ShowHoodieLogFileRecordsProcedure extends BaseProcedure with ProcedureBuil .withBasePath(basePath) .withLogFilePaths(logFilePaths.asJava) .withReaderSchema(schema) - .withLatestInstantTime(client.getActiveTimeline.getCommitTimeline.lastInstant.get.getTimestamp) + .withLatestInstantTime(client.getActiveTimeline.getCommitAndReplaceTimeline.lastInstant.get.getTimestamp) .withReverseReader(java.lang.Boolean.parseBoolean(HoodieCompactionConfig.COMPACTION_REVERSE_LOG_READ_ENABLE.defaultValue)) .withBufferSize(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.defaultValue) .withMaxMemorySizeInBytes(HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/StatsWriteAmplificationProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/StatsWriteAmplificationProcedure.scala index 36be3b14678..5556fd93b33 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/StatsWriteAmplificationProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/StatsWriteAmplificationProcedure.scala @@ -46,7 +46,7 @@ class StatsWriteAmplificationProcedure extends BaseProcedure with ProcedureBuild val basePath = getBasePath(table) val client = createMetaClient(jsc, basePath) val activeTimeline = client.getActiveTimeline - val timeline = activeTimeline.getCommitTimeline.filterCompletedInstants() + val timeline = activeTimeline.getCommitAndReplaceTimeline.filterCompletedInstants() val rows = new java.util.ArrayList[Row] val df = new DecimalFormat("#.00") diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ValidateHoodieSyncProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ValidateHoodieSyncProcedure.scala index 10a10160745..57a17b213b8 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ValidateHoodieSyncProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ValidateHoodieSyncProcedure.scala @@ -190,7 +190,7 @@ class ValidateHoodieSyncProcedure extends BaseProcedure with ProcedureBuilder wi @throws[IOException] def countNewRecords(target: HoodieTableMetaClient, commitsToCatchup: List[String]): Long = { var totalNew: Long = 0 - val timeline: HoodieTimeline = target.reloadActiveTimeline.getCommitTimeline.filterCompletedInstants + val timeline: HoodieTimeline = target.reloadActiveTimeline.getCommitAndReplaceTimeline.filterCompletedInstants for (commit <- commitsToCatchup) { val c: HoodieCommitMetadata = HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, commit)).get, classOf[HoodieCommitMetadata]) totalNew += c.fetchTotalRecordsWritten - c.fetchTotalUpdateRecordsWritten diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java index 086363e447c..d02204dbe9b 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java @@ -202,9 +202,9 @@ public class HoodieJavaStreamingApp { HoodieTableMetaClient metaClient = HoodieClientTestUtils.createMetaClient(jssc, tablePath); if (tableType.equals(HoodieTableType.MERGE_ON_READ.name())) { // Ensure we have successfully completed one compaction commit - ValidationUtils.checkArgument(metaClient.getActiveTimeline().getCommitTimeline().countInstants() == 1); + ValidationUtils.checkArgument(metaClient.getActiveTimeline().getCommitAndReplaceTimeline().countInstants() == 1); } else { - ValidationUtils.checkArgument(metaClient.getActiveTimeline().getCommitTimeline().countInstants() >= 1); + ValidationUtils.checkArgument(metaClient.getActiveTimeline().getCommitAndReplaceTimeline().countInstants() >= 1); } // Deletes Stream diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala index ad017a5a4dc..6e9e2a0a481 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala @@ -177,6 +177,6 @@ class TestMORDataSourceStorage extends SparkClientFunctionalTestHarness { } // compaction should have been completed val metaClient = HoodieTestUtils.createMetaClient(new HadoopStorageConfiguration(fs.getConf), basePath) - assertEquals(1, metaClient.getActiveTimeline.getCommitTimeline.countInstants()) + assertEquals(1, metaClient.getActiveTimeline.getCommitAndReplaceTimeline.countInstants()) } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala index 054744109b0..babe1f73acd 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala @@ -503,6 +503,6 @@ class TestStructuredStreaming extends HoodieSparkClientTestBase { streamingWrite(inputDF.schema, sourcePath, destPath, opts, id) } val metaClient = HoodieTestUtils.createMetaClient(storage, destPath); - assertTrue(metaClient.getActiveTimeline.getCommitTimeline.empty()) + assertTrue(metaClient.getActiveTimeline.getCommitAndReplaceTimeline.empty()) } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala index cad585b6453..2da80c888dd 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala @@ -28,7 +28,6 @@ import org.apache.hudi.common.table.{HoodieTableConfig, TableSchemaResolver} import org.apache.hudi.common.testutils.HoodieTestDataGenerator import org.apache.hudi.common.testutils.RawTripTestPayload.{deleteRecordsToStrings, recordsToStrings} import org.apache.hudi.config.HoodieWriteConfig - import org.apache.avro.generic.GenericRecord import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.{Row, SaveMode} @@ -333,6 +332,7 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase { val inputDF4 = spark.read.json(spark.sparkContext.parallelize(records4, 2)) inputDF4.write.format("org.apache.hudi") .options(options) + .option("hoodie.compact.inline", "false") .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL) .mode(SaveMode.Append) .save(basePath) @@ -357,6 +357,7 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase { .options(options) .option("hoodie.clustering.inline", "true") .option("hoodie.clustering.inline.max.commits", "1") + .option("hoodie.compact.inline", "false") .mode(SaveMode.Append) .save(basePath) val instant5 = metaClient.reloadActiveTimeline.lastInstant().get() @@ -385,6 +386,7 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase { val inputDF6 = spark.read.json(spark.sparkContext.parallelize(records6, 2)) inputDF6.write.format("org.apache.hudi") .options(options) + .option("hoodie.compact.inline", "false") .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL) .mode(SaveMode.Append) .save(basePath) @@ -407,27 +409,32 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase { val inputDF7 = spark.read.json(spark.sparkContext.parallelize(records7, 2)) inputDF7.write.format("org.apache.hudi") .options(options) + .option("hoodie.compact.inline", "false") .mode(SaveMode.Append) .save(basePath) + totalInsertedCnt += 7 val records8 = recordsToStrings(dataGen.generateInserts("007", 3)).asScala.toList val inputDF8 = spark.read.json(spark.sparkContext.parallelize(records8, 2)) inputDF8.write.format("org.apache.hudi") .options(options) + .option("hoodie.compact.inline", "false") .mode(SaveMode.Append) .save(basePath) val instant8 = metaClient.reloadActiveTimeline.lastInstant().get() val commitTime8 = instant8.getTimestamp + totalInsertedCnt += 3 // 8. Upsert Operation With Clean Operation - val records9 = recordsToStrings(dataGen.generateUniqueUpdates("008", 30)).asScala.toList - val inputDF9 = spark.read.json(spark.sparkContext.parallelize(records9, 2)) + val inputDF9 = inputDF6.limit(30) // 30 updates to inserts added after insert overwrite table. if not for this, updates generated from datagne, + // could split as inserts and updates from hudi standpoint due to insert overwrite table operation. inputDF9.write.format("org.apache.hudi") .options(options) .option("hoodie.clean.automatic", "true") - .option("hoodie.keep.min.commits", "4") - .option("hoodie.keep.max.commits", "5") - .option("hoodie.cleaner.commits.retained", "3") + .option("hoodie.keep.min.commits", "16") + .option("hoodie.keep.max.commits", "17") + .option("hoodie.clean.commits.retained", "15") + .option("hoodie.compact.inline", "false") .mode(SaveMode.Append) .save(basePath) val instant9 = metaClient.reloadActiveTimeline.lastInstant().get() @@ -440,13 +447,8 @@ class TestCDCDataFrameSuite extends HoodieCDCTestBase { val updatedCnt9 = 30 - insertedCnt9 assertCDCOpCnt(cdcDataOnly9, insertedCnt9, updatedCnt9, 0) - // here cause we do the clean operation and just remain the commit6 and commit7, so we need to reset the total cnt. - // 70 is the number of inserted records at commit 6. - totalInsertedCnt = 80 + insertedCnt9 - totalUpdatedCnt = updatedCnt9 - totalDeletedCnt = 0 allVisibleCDCData = cdcDataFrame((commitTime1.toLong - 1).toString) - assertCDCOpCnt(allVisibleCDCData, totalInsertedCnt, totalUpdatedCnt, totalDeletedCnt) + assertCDCOpCnt(allVisibleCDCData, totalInsertedCnt, totalUpdatedCnt + 30, totalDeletedCnt) } /** diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala index 5675ac4ebe9..672f3308765 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestRepairsProcedure.scala @@ -254,7 +254,7 @@ class TestRepairsProcedure extends HoodieSparkProcedureTestBase { metaClient = HoodieTableMetaClient.reload(metaClient) // get fs and check number of latest files - val fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline.getCommitTimeline.filterCompletedInstants, + val fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline.getCommitAndReplaceTimeline.filterCompletedInstants, metaClient.getStorage.listDirectEntries(new StoragePath(duplicatedPartitionPath))) val filteredStatuses = fsView.getLatestBaseFiles.iterator().asScala.map(value => value.getPath).toList // there should be 3 files @@ -311,7 +311,7 @@ class TestRepairsProcedure extends HoodieSparkProcedureTestBase { metaClient = HoodieTableMetaClient.reload(metaClient) // get fs and check number of latest files - val fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline.getCommitTimeline.filterCompletedInstants, + val fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline.getCommitAndReplaceTimeline.filterCompletedInstants, metaClient.getStorage.listDirectEntries(new StoragePath(duplicatedPartitionPathWithUpdates))) val filteredStatuses = fsView.getLatestBaseFiles.iterator().asScala.map(value => value.getPath).toList // there should be 2 files @@ -369,7 +369,7 @@ class TestRepairsProcedure extends HoodieSparkProcedureTestBase { metaClient = HoodieTableMetaClient.reload(metaClient) // get fs and check number of latest files - val fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline.getCommitTimeline.filterCompletedInstants, + val fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline.getCommitAndReplaceTimeline.filterCompletedInstants, metaClient.getStorage.listDirectEntries(new StoragePath(duplicatedPartitionPathWithUpserts))) val filteredStatuses = fsView.getLatestBaseFiles.iterator().asScala.map(value => value.getPath).toList // there should be 3 files @@ -427,7 +427,7 @@ class TestRepairsProcedure extends HoodieSparkProcedureTestBase { metaClient = HoodieTableMetaClient.reload(metaClient) // get fs and check number of latest files - val fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline.getCommitTimeline.filterCompletedInstants, + val fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline.getCommitAndReplaceTimeline.filterCompletedInstants, metaClient.getStorage.listDirectEntries(new StoragePath(duplicatedPartitionPath))) val filteredStatuses = fsView.getLatestBaseFiles.iterator().asScala.map(value => value.getPath).toList // there should be 3 files diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java index e28b5bdec59..51a8d26754a 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java @@ -645,7 +645,7 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase { static void assertAtleastNCompactionCommits(int minExpected, String tablePath) { HoodieTableMetaClient meta = createMetaClient(storage, tablePath); - HoodieTimeline timeline = meta.getActiveTimeline().getCommitTimeline().filterCompletedInstants(); + HoodieTimeline timeline = meta.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants(); LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants()); int numCompactionCommits = timeline.countInstants(); assertTrue(minExpected <= numCompactionCommits, "Got=" + numCompactionCommits + ", exp >=" + minExpected); @@ -661,7 +661,7 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase { static void assertAtleastNCompactionCommitsAfterCommit(int minExpected, String lastSuccessfulCommit, String tablePath) { HoodieTableMetaClient meta = createMetaClient(storage.getConf(), tablePath); - HoodieTimeline timeline = meta.getActiveTimeline().getCommitTimeline().findInstantsAfter(lastSuccessfulCommit).filterCompletedInstants(); + HoodieTimeline timeline = meta.getActiveTimeline().getCommitAndReplaceTimeline().findInstantsAfter(lastSuccessfulCommit).filterCompletedInstants(); LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants()); int numCompactionCommits = timeline.countInstants(); assertTrue(minExpected <= numCompactionCommits, "Got=" + numCompactionCommits + ", exp >=" + minExpected); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java index cb30d3dc0be..4da6ef51b62 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java @@ -875,7 +875,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { // delete compaction commit HoodieTableMetaClient meta = HoodieTestUtils.createMetaClient(storage, tableBasePath); - HoodieTimeline timeline = meta.getActiveTimeline().getCommitTimeline().filterCompletedInstants(); + HoodieTimeline timeline = meta.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants(); HoodieInstant commitInstant = timeline.lastInstant().get(); String commitFileName = tableBasePath + "/.hoodie/" + commitInstant.getFileName(); fs.delete(new Path(commitFileName), false);
