This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit b99e87db0d0e28a937a800201fe2aed52c095501 Author: Y Ethan Guo <[email protected]> AuthorDate: Wed Apr 24 14:41:09 2024 -0700 [HUDI-7650] Remove FileSystem argument in TestHelpers methods (#11072) * [HUDI-7650] Remove FileSystem argument in TestHelpers methods * Fix checkstyle --- .../deltastreamer/HoodieDeltaStreamerTestBase.java | 25 ++-- .../deltastreamer/TestHoodieDeltaStreamer.java | 148 ++++++++++----------- .../TestHoodieDeltaStreamerWithMultiWriter.java | 22 +-- .../offlinejob/HoodieOfflineJobTestBase.java | 7 +- .../offlinejob/TestHoodieClusteringJob.java | 14 +- .../offlinejob/TestHoodieCompactorJob.java | 8 +- 6 files changed, 111 insertions(+), 113 deletions(-) diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java index 6b1c09fa7c7..81b5be2ed9e 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java @@ -49,7 +49,6 @@ import org.apache.hudi.utilities.streamer.HoodieStreamer; import org.apache.hudi.utilities.testutils.UtilitiesTestBase; import org.apache.avro.Schema; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.ByteArrayDeserializer; @@ -635,7 +634,7 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase { return cfg; } - static void assertAtleastNCompactionCommits(int minExpected, String tablePath, FileSystem fs) { + static void assertAtleastNCompactionCommits(int minExpected, String tablePath) { HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build(); HoodieTimeline timeline = meta.getActiveTimeline().getCommitTimeline().filterCompletedInstants(); LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants()); @@ -643,7 +642,7 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase { assertTrue(minExpected <= numCompactionCommits, "Got=" + numCompactionCommits + ", exp >=" + minExpected); } - static void assertAtleastNDeltaCommits(int minExpected, String tablePath, FileSystem fs) { + static void assertAtleastNDeltaCommits(int minExpected, String tablePath) { HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build(); HoodieTimeline timeline = meta.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants(); LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants()); @@ -651,7 +650,7 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase { assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ", exp >=" + minExpected); } - static void assertAtleastNCompactionCommitsAfterCommit(int minExpected, String lastSuccessfulCommit, String tablePath, FileSystem fs) { + static void assertAtleastNCompactionCommitsAfterCommit(int minExpected, String lastSuccessfulCommit, String tablePath) { HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build(); HoodieTimeline timeline = meta.getActiveTimeline().getCommitTimeline().findInstantsAfter(lastSuccessfulCommit).filterCompletedInstants(); LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants()); @@ -659,7 +658,7 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase { assertTrue(minExpected <= numCompactionCommits, "Got=" + numCompactionCommits + ", exp >=" + minExpected); } - static void assertAtleastNDeltaCommitsAfterCommit(int minExpected, String lastSuccessfulCommit, String tablePath, FileSystem fs) { + static void assertAtleastNDeltaCommitsAfterCommit(int minExpected, String lastSuccessfulCommit, String tablePath) { HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build(); HoodieTimeline timeline = meta.reloadActiveTimeline().getDeltaCommitTimeline().findInstantsAfter(lastSuccessfulCommit).filterCompletedInstants(); LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants()); @@ -667,7 +666,7 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase { assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ", exp >=" + minExpected); } - static String assertCommitMetadata(String expected, String tablePath, FileSystem fs, int totalCommits) + static String assertCommitMetadata(String expected, String tablePath, int totalCommits) throws IOException { HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build(); HoodieTimeline timeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); @@ -696,7 +695,7 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase { res.get(timeoutInSecs, TimeUnit.SECONDS); } - static void assertAtLeastNCommits(int minExpected, String tablePath, FileSystem fs) { + static void assertAtLeastNCommits(int minExpected, String tablePath) { HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build(); HoodieTimeline timeline = meta.getActiveTimeline().filterCompletedInstants(); LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants()); @@ -704,7 +703,7 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase { assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ", exp >=" + minExpected); } - static void assertAtLeastNReplaceCommits(int minExpected, String tablePath, FileSystem fs) { + static void assertAtLeastNReplaceCommits(int minExpected, String tablePath) { HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).setLoadActiveTimelineOnLoad(true).build(); HoodieTimeline timeline = meta.getActiveTimeline().getCompletedReplaceTimeline(); LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants()); @@ -712,7 +711,7 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase { assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ", exp >=" + minExpected); } - static void assertPendingIndexCommit(String tablePath, FileSystem fs) { + static void assertPendingIndexCommit(String tablePath) { HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).setLoadActiveTimelineOnLoad(true).build(); HoodieTimeline timeline = meta.getActiveTimeline().getAllCommitsTimeline().filterPendingIndexTimeline(); LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants()); @@ -720,7 +719,7 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase { assertEquals(1, numIndexCommits, "Got=" + numIndexCommits + ", exp=1"); } - static void assertCompletedIndexCommit(String tablePath, FileSystem fs) { + static void assertCompletedIndexCommit(String tablePath) { HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).setLoadActiveTimelineOnLoad(true).build(); HoodieTimeline timeline = meta.getActiveTimeline().getAllCommitsTimeline().filterCompletedIndexTimeline(); LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants()); @@ -728,7 +727,7 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase { assertEquals(1, numIndexCommits, "Got=" + numIndexCommits + ", exp=1"); } - static void assertNoReplaceCommits(String tablePath, FileSystem fs) { + static void assertNoReplaceCommits(String tablePath) { HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).setLoadActiveTimelineOnLoad(true).build(); HoodieTimeline timeline = meta.getActiveTimeline().getCompletedReplaceTimeline(); LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants()); @@ -736,7 +735,7 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase { assertEquals(0, numDeltaCommits, "Got=" + numDeltaCommits + ", exp =" + 0); } - static void assertAtLeastNReplaceRequests(int minExpected, String tablePath, FileSystem fs) { + static void assertAtLeastNReplaceRequests(int minExpected, String tablePath) { HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).setLoadActiveTimelineOnLoad(true).build(); HoodieTimeline timeline = meta.getActiveTimeline().filterPendingReplaceTimeline(); LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants()); @@ -744,7 +743,7 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase { assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ", exp >=" + minExpected); } - static void assertAtLeastNCommitsAfterRollback(int minExpectedRollback, int minExpectedCommits, String tablePath, FileSystem fs) { + static void assertAtLeastNCommitsAfterRollback(int minExpectedRollback, int minExpectedCommits, String tablePath) { HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).setLoadActiveTimelineOnLoad(true).build(); HoodieTimeline timeline = meta.getActiveTimeline().getRollbackTimeline().filterCompletedInstants(); LOG.info("Rollback Timeline Instants=" + meta.getActiveTimeline().getInstants()); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java index 14aa3b5d2e9..23fd8bd9e78 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java @@ -555,7 +555,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { new HoodieDeltaStreamer(cfg, jsc).sync(); assertRecordCount(expected, tableBasePath, sqlContext); assertDistanceCount(expected, tableBasePath, sqlContext); - TestHelpers.assertCommitMetadata(metadata, tableBasePath, fs, totalCommits); + TestHelpers.assertCommitMetadata(metadata, tableBasePath, totalCommits); } // TODO add tests w/ disabled reconciliation @@ -576,7 +576,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { } new HoodieDeltaStreamer(cfg, jsc).sync(); assertRecordCount(1000, tableBasePath, sqlContext); - TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1); + TestHelpers.assertCommitMetadata("00000", tableBasePath, 1); // Upsert data produced with Schema B, pass Schema B cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, Collections.singletonList(TripsWithEvolvedOptionalFieldTransformer.class.getName()), @@ -591,7 +591,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { new HoodieDeltaStreamer(cfg, jsc).sync(); // out of 1000 new records, 500 are inserts, 450 are updates and 50 are deletes. assertRecordCount(1450, tableBasePath, sqlContext); - TestHelpers.assertCommitMetadata("00001", tableBasePath, fs, 2); + TestHelpers.assertCommitMetadata("00001", tableBasePath, 2); List<Row> counts = countsPerCommit(tableBasePath, sqlContext); assertEquals(1450, counts.stream().mapToLong(entry -> entry.getLong(1)).sum()); @@ -618,7 +618,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { new HoodieDeltaStreamer(cfg, jsc).sync(); // again, 1000 new records, 500 are inserts, 450 are updates and 50 are deletes. assertRecordCount(1900, tableBasePath, sqlContext); - TestHelpers.assertCommitMetadata("00002", tableBasePath, fs, 3); + TestHelpers.assertCommitMetadata("00002", tableBasePath, 3); counts = countsPerCommit(tableBasePath, sqlContext); assertEquals(1900, counts.stream().mapToLong(entry -> entry.getLong(1)).sum()); @@ -715,10 +715,10 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc); deltaStreamerTestRunner(ds, cfg, (r) -> { if (tableType.equals(HoodieTableType.MERGE_ON_READ)) { - TestHelpers.assertAtleastNDeltaCommits(5, tableBasePath, fs); - TestHelpers.assertAtleastNCompactionCommits(2, tableBasePath, fs); + TestHelpers.assertAtleastNDeltaCommits(5, tableBasePath); + TestHelpers.assertAtleastNCompactionCommits(2, tableBasePath); } else { - TestHelpers.assertAtleastNCompactionCommits(5, tableBasePath, fs); + TestHelpers.assertAtleastNCompactionCommits(5, tableBasePath); } assertRecordCount(totalRecords, tableBasePath, sqlContext); assertDistanceCount(totalRecords, tableBasePath, sqlContext); @@ -795,8 +795,8 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { cfg.configs.add(String.format("%s=%s", "hoodie.datasource.write.row.writer.enable", "false")); HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc); deltaStreamerTestRunner(ds, cfg, (r) -> { - TestHelpers.assertAtLeastNCommits(2, tableBasePath, fs); - TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, fs); + TestHelpers.assertAtLeastNCommits(2, tableBasePath); + TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath); return true; }); UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath); @@ -814,7 +814,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc); ds.sync(); // assert ingest successful - TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs); + TestHelpers.assertAtLeastNCommits(1, tableBasePath); // schedule a clustering job to build a clustering plan and transition to inflight HoodieClusteringJob clusteringJob = initialHoodieClusteringJob(tableBasePath, null, false, "schedule"); @@ -831,8 +831,8 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { ds2.sync(); String completeClusteringTimeStamp = meta.reloadActiveTimeline().getCompletedReplaceTimeline().lastInstant().get().getTimestamp(); assertEquals(clusteringRequest.getTimestamp(), completeClusteringTimeStamp); - TestHelpers.assertAtLeastNCommits(2, tableBasePath, fs); - TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, fs); + TestHelpers.assertAtLeastNCommits(2, tableBasePath); + TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath); } @Test @@ -859,8 +859,8 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext); prepareParquetDFSUpdates(100, PARQUET_SOURCE_ROOT, "2.parquet", false, null, null, dataGenerator, "001"); deltaStreamer.sync(); - TestHelpers.assertAtleastNDeltaCommits(2, tableBasePath, fs); - TestHelpers.assertAtleastNCompactionCommits(1, tableBasePath, fs); + TestHelpers.assertAtleastNDeltaCommits(2, tableBasePath); + TestHelpers.assertAtleastNCompactionCommits(1, tableBasePath); // delete compaction commit HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tableBasePath).build(); @@ -873,7 +873,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { prepareParquetDFSUpdates(100, PARQUET_SOURCE_ROOT, "3.parquet", false, null, null, dataGenerator, "002"); deltaStreamer = new HoodieDeltaStreamer(deltaCfg, jsc); deltaStreamer.sync(); - TestHelpers.assertAtleastNDeltaCommits(3, tableBasePath, fs); + TestHelpers.assertAtleastNDeltaCommits(3, tableBasePath); meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tableBasePath).build(); timeline = meta.getActiveTimeline().getRollbackTimeline(); assertEquals(1, timeline.getInstants().size()); @@ -899,12 +899,12 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { cfg.configs.add(String.format("%s=%s", "hoodie.datasource.write.row.writer.enable", "false")); HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc); deltaStreamerTestRunner(ds, cfg, (r) -> { - TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, fs); + TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath); return true; }); - TestHelpers.assertAtLeastNCommits(6, tableBasePath, fs); - TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, fs); + TestHelpers.assertAtLeastNCommits(6, tableBasePath); + TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath); // Step 2 : Get the first replacecommit and extract the corresponding replaced file IDs. HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tableBasePath).build(); @@ -1049,7 +1049,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { Collections.singleton(HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key() + "=true")); deltaStreamerTestRunner(ds, (r) -> { - TestHelpers.assertAtLeastNCommits(2, tableBasePath, fs); + TestHelpers.assertAtLeastNCommits(2, tableBasePath); Option<String> scheduleIndexInstantTime = Option.empty(); try { @@ -1061,13 +1061,13 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { return false; } if (scheduleIndexInstantTime.isPresent()) { - TestHelpers.assertPendingIndexCommit(tableBasePath, fs); + TestHelpers.assertPendingIndexCommit(tableBasePath); LOG.info("Schedule indexing success, now build index with instant time " + scheduleIndexInstantTime.get()); HoodieIndexer runIndexingJob = new HoodieIndexer(jsc, buildIndexerConfig(tableBasePath, ds.getConfig().targetTableName, scheduleIndexInstantTime.get(), EXECUTE, "COLUMN_STATS")); runIndexingJob.start(0); LOG.info("Metadata indexing success"); - TestHelpers.assertCompletedIndexCommit(tableBasePath, fs); + TestHelpers.assertCompletedIndexCommit(tableBasePath); } else { LOG.warn("Metadata indexing failed"); } @@ -1084,7 +1084,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { CountDownLatch countDownLatch = new CountDownLatch(1); deltaStreamerTestRunner(ds, (r) -> { - TestHelpers.assertAtLeastNCommits(2, tableBasePath, fs); + TestHelpers.assertAtLeastNCommits(2, tableBasePath); countDownLatch.countDown(); return true; }); @@ -1105,7 +1105,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { shouldPassInClusteringInstantTime ? scheduleClusteringInstantTime.get() : null, false); HoodieClusteringJob clusterClusteringJob = new HoodieClusteringJob(jsc, clusterClusteringConfig); clusterClusteringJob.cluster(clusterClusteringConfig.retry); - TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, fs); + TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath); LOG.info("Cluster success"); } else { LOG.warn("Clustering execution failed"); @@ -1141,12 +1141,12 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { cfg.configs.add(String.format("%s=%s", "hoodie.merge.allow.duplicate.on.inserts", "false")); HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc); deltaStreamerTestRunner(ds, cfg, (r) -> { - TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, fs); + TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath); return true; }); // There should be 4 commits, one of which should be a replace commit - TestHelpers.assertAtLeastNCommits(4, tableBasePath, fs); - TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, fs); + TestHelpers.assertAtLeastNCommits(4, tableBasePath); + TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath); assertDistinctRecordCount(totalRecords, tableBasePath, sqlContext); UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath); } @@ -1179,12 +1179,12 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { deltaStreamerTestRunner(ds, cfg, (r) -> { // when pending clustering overlaps w/ incoming, incoming batch will fail and hence will result in rollback. // But eventually the batch should succeed. so, lets check for successful commits after a completed rollback. - assertAtLeastNCommitsAfterRollback(1, 1, tableBasePath, fs); + assertAtLeastNCommitsAfterRollback(1, 1, tableBasePath); return true; }); // There should be 4 commits, one of which should be a replace commit - TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, fs); - TestHelpers.assertAtLeastNCommits(3, tableBasePath, fs); + TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath); + TestHelpers.assertAtLeastNCommits(3, tableBasePath); UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath); } @@ -1204,13 +1204,13 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { cfg.configs.add(String.format("%s=%s", "hoodie.merge.allow.duplicate.on.inserts", "false")); HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc); deltaStreamerTestRunner(ds, cfg, (r) -> { - TestHelpers.assertAtleastNCompactionCommits(2, tableBasePath, fs); - TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, fs); + TestHelpers.assertAtleastNCompactionCommits(2, tableBasePath); + TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath); return true; }); // There should be 4 commits, one of which should be a replace commit - TestHelpers.assertAtLeastNCommits(4, tableBasePath, fs); - TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, fs); + TestHelpers.assertAtLeastNCommits(4, tableBasePath); + TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath); assertDistinctRecordCount(totalRecords, tableBasePath, sqlContext); UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath); } @@ -1232,7 +1232,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { ds.sync(); // assert ingest successful - TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs); + TestHelpers.assertAtLeastNCommits(1, tableBasePath); // schedule a clustering job to build a clustering plan HoodieClusteringJob schedule = initialHoodieClusteringJob(tableBasePath, null, false, "schedule"); @@ -1273,7 +1273,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { deltaStreamerTestRunner(ds, (r) -> { Exception exception = null; - TestHelpers.assertAtLeastNCommits(2, tableBasePath, fs); + TestHelpers.assertAtLeastNCommits(2, tableBasePath); try { int result = scheduleClusteringJob.cluster(0); if (result == 0) { @@ -1293,16 +1293,16 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { } switch (runningMode.toLowerCase()) { case SCHEDULE_AND_EXECUTE: { - TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, fs); + TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath); return true; } case SCHEDULE: { - TestHelpers.assertAtLeastNReplaceRequests(2, tableBasePath, fs); - TestHelpers.assertNoReplaceCommits(tableBasePath, fs); + TestHelpers.assertAtLeastNReplaceRequests(2, tableBasePath); + TestHelpers.assertNoReplaceCommits(tableBasePath); return true; } case EXECUTE: { - TestHelpers.assertNoReplaceCommits(tableBasePath, fs); + TestHelpers.assertNoReplaceCommits(tableBasePath); return true; } default: @@ -1469,12 +1469,12 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { // trigger continuous DS and wait until 1 replace commit is complete. try { deltaStreamerTestRunner(ds, cfg, (r) -> { - TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, fs); + TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath); return true; }); // There should be 4 commits, one of which should be a replace commit - TestHelpers.assertAtLeastNCommits(4, tableBasePath, fs); - TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, fs); + TestHelpers.assertAtLeastNCommits(4, tableBasePath); + TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath); } finally { // clean up resources ds.shutdownGracefully(); @@ -1505,7 +1505,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { assertRecordCount(1000, tableBasePath, sqlContext); assertDistanceCount(1000, tableBasePath, sqlContext); assertDistanceCountWithExactValue(1000, tableBasePath, sqlContext); - String lastInstantForUpstreamTable = TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1); + String lastInstantForUpstreamTable = TestHelpers.assertCommitMetadata("00000", tableBasePath, 1); // Now incrementally pull from the above hudi table and ingest to second table HoodieDeltaStreamer.Config downstreamCfg = @@ -1516,7 +1516,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { assertRecordCount(1000, downstreamTableBasePath, sqlContext); assertDistanceCount(1000, downstreamTableBasePath, sqlContext); assertDistanceCountWithExactValue(1000, downstreamTableBasePath, sqlContext); - TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable, downstreamTableBasePath, fs, 1); + TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable, downstreamTableBasePath, 1); // No new data => no commits for upstream table cfg.sourceLimit = 0; @@ -1524,7 +1524,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { assertRecordCount(1000, tableBasePath, sqlContext); assertDistanceCount(1000, tableBasePath, sqlContext); assertDistanceCountWithExactValue(1000, tableBasePath, sqlContext); - TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1); + TestHelpers.assertCommitMetadata("00000", tableBasePath, 1); // with no change in upstream table, no change in downstream too when pulled. HoodieDeltaStreamer.Config downstreamCfg1 = @@ -1534,7 +1534,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { assertRecordCount(1000, downstreamTableBasePath, sqlContext); assertDistanceCount(1000, downstreamTableBasePath, sqlContext); assertDistanceCountWithExactValue(1000, downstreamTableBasePath, sqlContext); - TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable, downstreamTableBasePath, fs, 1); + TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable, downstreamTableBasePath, 1); // upsert() #1 on upstream hudi table cfg.sourceLimit = 2000; @@ -1543,7 +1543,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { assertRecordCount(1950, tableBasePath, sqlContext); assertDistanceCount(1950, tableBasePath, sqlContext); assertDistanceCountWithExactValue(1950, tableBasePath, sqlContext); - lastInstantForUpstreamTable = TestHelpers.assertCommitMetadata("00001", tableBasePath, fs, 2); + lastInstantForUpstreamTable = TestHelpers.assertCommitMetadata("00001", tableBasePath, 2); List<Row> counts = countsPerCommit(tableBasePath, sqlContext); assertEquals(1950, counts.stream().mapToLong(entry -> entry.getLong(1)).sum()); @@ -1558,7 +1558,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { assertDistanceCount(2000, downstreamTableBasePath, sqlContext); assertDistanceCountWithExactValue(2000, downstreamTableBasePath, sqlContext); String finalInstant = - TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable, downstreamTableBasePath, fs, 2); + TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable, downstreamTableBasePath, 2); counts = countsPerCommit(downstreamTableBasePath, sqlContext); assertEquals(2000, counts.stream().mapToLong(entry -> entry.getLong(1)).sum()); @@ -1670,7 +1670,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT); new HoodieDeltaStreamer(cfg, jsc).sync(); assertRecordCount(1000, tableBasePath, sqlContext); - TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1); + TestHelpers.assertCommitMetadata("00000", tableBasePath, 1); // Generate the same 1000 records + 1000 new ones for upsert cfg.filterDupes = true; @@ -1678,7 +1678,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { cfg.operation = WriteOperationType.INSERT; new HoodieDeltaStreamer(cfg, jsc).sync(); assertRecordCount(2000, tableBasePath, sqlContext); - TestHelpers.assertCommitMetadata("00001", tableBasePath, fs, 2); + TestHelpers.assertCommitMetadata("00001", tableBasePath, 2); // 1000 records for commit 00000 & 1000 for commit 00001 List<Row> counts = countsPerCommit(tableBasePath, sqlContext); assertEquals(1000, counts.get(0).getLong(1)); @@ -2464,7 +2464,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(cfg, jsc); deltaStreamerTestRunner(deltaStreamer, cfg, (r) -> { - TestHelpers.assertAtleastNCompactionCommits(numRecords / sourceLimit + ((numRecords % sourceLimit == 0) ? 0 : 1), tableBasePath, fs); + TestHelpers.assertAtleastNCompactionCommits(numRecords / sourceLimit + ((numRecords % sourceLimit == 0) ? 0 : 1), tableBasePath); assertRecordCount(numRecords, tableBasePath, sqlContext); return true; }); @@ -2593,7 +2593,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { new HoodieDeltaStreamer(cfg, jsc).sync(); assertRecordCount(1000, tableBasePath, sqlContext); assertDistanceCount(1000, tableBasePath, sqlContext); - TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1); + TestHelpers.assertCommitMetadata("00000", tableBasePath, 1); // Collect the fileIds before running HoodieDeltaStreamer Set<String> beforeFileIDs = getAllFileIDsInTable(tableBasePath, Option.empty()); @@ -2607,12 +2607,12 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { if (operationType == WriteOperationType.INSERT_OVERWRITE) { assertRecordCount(1000, tableBasePath, sqlContext); assertDistanceCount(1000, tableBasePath, sqlContext); - TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1); + TestHelpers.assertCommitMetadata("00000", tableBasePath, 1); } else if (operationType == WriteOperationType.INSERT_OVERWRITE_TABLE) { HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(tableBasePath).build(); final HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, metaClient.getCommitsAndCompactionTimeline()); assertEquals(0, fsView.getLatestFileSlices("").count()); - TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1); + TestHelpers.assertCommitMetadata("00000", tableBasePath, 1); // Since the table has been overwritten all fileIDs before should have been replaced Set<String> afterFileIDs = getAllFileIDsInTable(tableBasePath, Option.empty()); @@ -2623,7 +2623,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { new HoodieDeltaStreamer(cfg, jsc).sync(); assertRecordCount(950, tableBasePath, sqlContext); assertDistanceCount(950, tableBasePath, sqlContext); - TestHelpers.assertCommitMetadata("00001", tableBasePath, fs, 2); + TestHelpers.assertCommitMetadata("00001", tableBasePath, 2); UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath); } @@ -2671,7 +2671,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc); ds.sync(); // assert ingest successful - TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs); + TestHelpers.assertAtLeastNCommits(1, tableBasePath); TableSchemaResolver tableSchemaResolver = new TableSchemaResolver( HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(fs.getConf()).build()); @@ -2713,8 +2713,8 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT); new HoodieDeltaStreamer(cfg, jsc).sync(); assertRecordCount(1000, tableBasePath, sqlContext); - TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1); - TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs); + TestHelpers.assertCommitMetadata("00000", tableBasePath, 1); + TestHelpers.assertAtLeastNCommits(1, tableBasePath); // change cow to mor HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder() @@ -2736,24 +2736,24 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { new HoodieDeltaStreamer(cfg, jsc).sync(); // out of 1000 new records, 500 are inserts, 450 are updates and 50 are deletes. assertRecordCount(1450, tableBasePath, sqlContext); - TestHelpers.assertCommitMetadata("00001", tableBasePath, fs, 2); + TestHelpers.assertCommitMetadata("00001", tableBasePath, 2); List<Row> counts = countsPerCommit(tableBasePath, sqlContext); assertEquals(1450, counts.stream().mapToLong(entry -> entry.getLong(1)).sum()); - TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs); + TestHelpers.assertAtLeastNCommits(1, tableBasePath); // currently there should be 1 deltacommits now - TestHelpers.assertAtleastNDeltaCommits(1, tableBasePath, fs); + TestHelpers.assertAtleastNDeltaCommits(1, tableBasePath); // test the table type is already mor new HoodieDeltaStreamer(cfg, jsc).sync(); // out of 1000 new records, 500 are inserts, 450 are updates and 50 are deletes. // total records should be 1900 now assertRecordCount(1900, tableBasePath, sqlContext); - TestHelpers.assertCommitMetadata("00002", tableBasePath, fs, 3); + TestHelpers.assertCommitMetadata("00002", tableBasePath, 3); counts = countsPerCommit(tableBasePath, sqlContext); assertEquals(1900, counts.stream().mapToLong(entry -> entry.getLong(1)).sum()); - TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs); + TestHelpers.assertAtLeastNCommits(1, tableBasePath); // currently there should be 2 deltacommits now - TestHelpers.assertAtleastNDeltaCommits(2, tableBasePath, fs); + TestHelpers.assertAtleastNDeltaCommits(2, tableBasePath); // clean up UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath); @@ -2767,8 +2767,8 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { cfg.tableType = HoodieTableType.MERGE_ON_READ.name(); new HoodieDeltaStreamer(cfg, jsc).sync(); assertRecordCount(1000, tableBasePath, sqlContext); - TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1); - TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs); + TestHelpers.assertCommitMetadata("00000", tableBasePath, 1); + TestHelpers.assertAtLeastNCommits(1, tableBasePath); // sync once, make one deltacommit and do a full compaction cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT); @@ -2780,12 +2780,12 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { assertRecordCount(1450, tableBasePath, sqlContext); // totalCommits: 1 deltacommit(bulk_insert) + 1 deltacommit(upsert) + 1 commit(compaction) // there is no checkpoint in the compacted commit metadata, the latest checkpoint 00001 is in the upsert deltacommit - TestHelpers.assertCommitMetadata(null, tableBasePath, fs, 3); + TestHelpers.assertCommitMetadata(null, tableBasePath, 3); List<Row> counts = countsPerCommit(tableBasePath, sqlContext); assertEquals(1450, counts.stream().mapToLong(entry -> entry.getLong(1)).sum()); - TestHelpers.assertAtLeastNCommits(3, tableBasePath, fs); + TestHelpers.assertAtLeastNCommits(3, tableBasePath); // currently there should be 2 deltacommits now - TestHelpers.assertAtleastNDeltaCommits(2, tableBasePath, fs); + TestHelpers.assertAtleastNDeltaCommits(2, tableBasePath); // change mor to cow HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder() @@ -2808,20 +2808,20 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { // out of 1000 new records, 500 are inserts, 450 are updates and 50 are deletes. assertRecordCount(1900, tableBasePath, sqlContext); // the checkpoint now should be 00002 - TestHelpers.assertCommitMetadata("00002", tableBasePath, fs, 4); + TestHelpers.assertCommitMetadata("00002", tableBasePath, 4); counts = countsPerCommit(tableBasePath, sqlContext); assertEquals(1900, counts.stream().mapToLong(entry -> entry.getLong(1)).sum()); - TestHelpers.assertAtLeastNCommits(4, tableBasePath, fs); + TestHelpers.assertAtLeastNCommits(4, tableBasePath); // test the table type is already cow new HoodieDeltaStreamer(cfg, jsc).sync(); // out of 1000 new records, 500 are inserts, 450 are updates and 50 are deletes. // total records should be 2350 now assertRecordCount(2350, tableBasePath, sqlContext); - TestHelpers.assertCommitMetadata("00003", tableBasePath, fs, 5); + TestHelpers.assertCommitMetadata("00003", tableBasePath, 5); counts = countsPerCommit(tableBasePath, sqlContext); assertEquals(2350, counts.stream().mapToLong(entry -> entry.getLong(1)).sum()); - TestHelpers.assertAtLeastNCommits(5, tableBasePath, fs); + TestHelpers.assertAtLeastNCommits(5, tableBasePath); // clean up UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath); @@ -2867,7 +2867,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { cfg.configs.add(String.format("%s=%s", UPSERT_PARALLELISM_VALUE.key(), upsertParallelism)); HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc); deltaStreamerTestRunner(ds, cfg, (r) -> { - TestHelpers.assertAtLeastNCommits(2, tableBasePath, fs); + TestHelpers.assertAtLeastNCommits(2, tableBasePath); // make sure the UPSERT_PARALLELISM_VALUE already changed (hot updated) Assertions.assertTrue(((HoodieStreamer.StreamSyncService) ds.getIngestionService()).getProps().getLong(UPSERT_PARALLELISM_VALUE.key()) > upsertParallelism); return true; diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerWithMultiWriter.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerWithMultiWriter.java index 4df68b9fbe9..526fc11a6bd 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerWithMultiWriter.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerWithMultiWriter.java @@ -121,10 +121,10 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends HoodieDeltaStreamerT // Prepare base dataset with some commits deltaStreamerTestRunner(prepJob, prepJobConfig, (r) -> { if (tableType.equals(HoodieTableType.MERGE_ON_READ)) { - TestHelpers.assertAtleastNDeltaCommits(3, tableBasePath, fs); - TestHelpers.assertAtleastNCompactionCommits(1, tableBasePath, fs); + TestHelpers.assertAtleastNDeltaCommits(3, tableBasePath); + TestHelpers.assertAtleastNCompactionCommits(1, tableBasePath); } else { - TestHelpers.assertAtleastNCompactionCommits(3, tableBasePath, fs); + TestHelpers.assertAtleastNCompactionCommits(3, tableBasePath); } assertRecordCount(totalRecords, tableBasePath, sqlContext); assertDistanceCount(totalRecords, tableBasePath, sqlContext); @@ -188,10 +188,10 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends HoodieDeltaStreamerT // Prepare base dataset with some commits deltaStreamerTestRunner(prepJob, prepJobConfig, (r) -> { if (tableType.equals(HoodieTableType.MERGE_ON_READ)) { - TestHelpers.assertAtleastNDeltaCommits(3, tableBasePath, fs); - TestHelpers.assertAtleastNCompactionCommits(1, tableBasePath, fs); + TestHelpers.assertAtleastNDeltaCommits(3, tableBasePath); + TestHelpers.assertAtleastNCompactionCommits(1, tableBasePath); } else { - TestHelpers.assertAtleastNCompactionCommits(3, tableBasePath, fs); + TestHelpers.assertAtleastNCompactionCommits(3, tableBasePath); } assertRecordCount(totalRecords, tableBasePath, sqlContext); assertDistanceCount(totalRecords, tableBasePath, sqlContext); @@ -262,10 +262,10 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends HoodieDeltaStreamerT // Prepare base dataset with some commits deltaStreamerTestRunner(prepJob, prepJobConfig, (r) -> { if (tableType.equals(HoodieTableType.MERGE_ON_READ)) { - TestHelpers.assertAtleastNDeltaCommits(3, tableBasePath, fs); - TestHelpers.assertAtleastNCompactionCommits(1, tableBasePath, fs); + TestHelpers.assertAtleastNDeltaCommits(3, tableBasePath); + TestHelpers.assertAtleastNCompactionCommits(1, tableBasePath); } else { - TestHelpers.assertAtleastNCompactionCommits(3, tableBasePath, fs); + TestHelpers.assertAtleastNCompactionCommits(3, tableBasePath); } assertRecordCount(totalRecords, tableBasePath, sqlContext); assertDistanceCount(totalRecords, tableBasePath, sqlContext); @@ -394,9 +394,9 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends HoodieDeltaStreamerT // Condition for parallel ingestion job Function<Boolean, Boolean> conditionForRegularIngestion = (r) -> { if (tableType.equals(HoodieTableType.MERGE_ON_READ)) { - TestHelpers.assertAtleastNDeltaCommitsAfterCommit(3, lastSuccessfulCommit, tableBasePath, fs); + TestHelpers.assertAtleastNDeltaCommitsAfterCommit(3, lastSuccessfulCommit, tableBasePath); } else { - TestHelpers.assertAtleastNCompactionCommitsAfterCommit(3, lastSuccessfulCommit, tableBasePath, fs); + TestHelpers.assertAtleastNCompactionCommitsAfterCommit(3, lastSuccessfulCommit, tableBasePath); } assertRecordCount(totalRecords, tableBasePath, sqlContext); assertDistanceCount(totalRecords, tableBasePath, sqlContext); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/HoodieOfflineJobTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/HoodieOfflineJobTestBase.java index 33615cdddee..6feb344af7e 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/HoodieOfflineJobTestBase.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/HoodieOfflineJobTestBase.java @@ -29,7 +29,6 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import org.apache.hudi.utilities.testutils.UtilitiesTestBase; -import org.apache.hadoop.fs.FileSystem; import org.apache.spark.api.java.JavaRDD; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; @@ -107,7 +106,7 @@ public class HoodieOfflineJobTestBase extends UtilitiesTestBase { // Inner Class // ------------------------------------------------------------------------- static class TestHelpers { - static void assertNCompletedCommits(int expected, String tablePath, FileSystem fs) { + static void assertNCompletedCommits(int expected, String tablePath) { HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build(); HoodieTimeline timeline = meta.getActiveTimeline().getWriteTimeline().filterCompletedInstants(); LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants()); @@ -115,7 +114,7 @@ public class HoodieOfflineJobTestBase extends UtilitiesTestBase { assertEquals(expected, numCommits, "Got=" + numCommits + ", exp =" + expected); } - static void assertNCleanCommits(int expected, String tablePath, FileSystem fs) { + static void assertNCleanCommits(int expected, String tablePath) { HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build(); HoodieTimeline timeline = meta.getActiveTimeline().getCleanerTimeline().filterCompletedInstants(); LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants()); @@ -123,7 +122,7 @@ public class HoodieOfflineJobTestBase extends UtilitiesTestBase { assertEquals(expected, numCleanCommits, "Got=" + numCleanCommits + ", exp =" + expected); } - static void assertNClusteringCommits(int expected, String tablePath, FileSystem fs) { + static void assertNClusteringCommits(int expected, String tablePath) { HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build(); HoodieTimeline timeline = meta.getActiveTimeline().getCompletedReplaceTimeline(); LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants()); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/TestHoodieClusteringJob.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/TestHoodieClusteringJob.java index c6ed0c698ff..e77c90ec034 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/TestHoodieClusteringJob.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/TestHoodieClusteringJob.java @@ -71,8 +71,8 @@ public class TestHoodieClusteringJob extends HoodieOfflineJobTestBase { HoodieClusteringJob hoodieCluster = init(tableBasePath, true, "scheduleAndExecute", false); hoodieCluster.cluster(0); - HoodieOfflineJobTestBase.TestHelpers.assertNClusteringCommits(1, tableBasePath, fs); - HoodieOfflineJobTestBase.TestHelpers.assertNCleanCommits(0, tableBasePath, fs); + HoodieOfflineJobTestBase.TestHelpers.assertNClusteringCommits(1, tableBasePath); + HoodieOfflineJobTestBase.TestHelpers.assertNCleanCommits(0, tableBasePath); writeData(false, HoodieActiveTimeline.createNewInstantTime(), 100, true); writeData(false, HoodieActiveTimeline.createNewInstantTime(), 100, true); @@ -81,8 +81,8 @@ public class TestHoodieClusteringJob extends HoodieOfflineJobTestBase { hoodieCluster = init(tableBasePath, true, "scheduleAndExecute", true); hoodieCluster.cluster(0); - HoodieOfflineJobTestBase.TestHelpers.assertNClusteringCommits(2, tableBasePath, fs); - HoodieOfflineJobTestBase.TestHelpers.assertNCleanCommits(1, tableBasePath, fs); + HoodieOfflineJobTestBase.TestHelpers.assertNClusteringCommits(2, tableBasePath); + HoodieOfflineJobTestBase.TestHelpers.assertNCleanCommits(1, tableBasePath); } @Test @@ -107,8 +107,8 @@ public class TestHoodieClusteringJob extends HoodieOfflineJobTestBase { HoodieClusteringJob hoodieCluster = init(tableBasePath, true, "scheduleAndExecute", false); hoodieCluster.cluster(0); - HoodieOfflineJobTestBase.TestHelpers.assertNClusteringCommits(1, tableBasePath, fs); - HoodieOfflineJobTestBase.TestHelpers.assertNCleanCommits(0, tableBasePath, fs); + HoodieOfflineJobTestBase.TestHelpers.assertNClusteringCommits(1, tableBasePath); + HoodieOfflineJobTestBase.TestHelpers.assertNCleanCommits(0, tableBasePath); // remove the completed instant from timeline and trigger purge of pending clustering instant. HoodieInstant latestClusteringInstant = metaClient.getActiveTimeline() @@ -121,7 +121,7 @@ public class TestHoodieClusteringJob extends HoodieOfflineJobTestBase { getClusteringConfigForPurge(tableBasePath, true, PURGE_PENDING_INSTANT, false, latestClusteringInstant.getTimestamp()); hoodieCluster.cluster(0); // validate that there are no clustering commits in timeline. - HoodieOfflineJobTestBase.TestHelpers.assertNClusteringCommits(0, tableBasePath, fs); + HoodieOfflineJobTestBase.TestHelpers.assertNClusteringCommits(0, tableBasePath); // validate that no records match the clustering instant. String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length]; diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/TestHoodieCompactorJob.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/TestHoodieCompactorJob.java index 689d76f5525..8fbb3210a71 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/TestHoodieCompactorJob.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/TestHoodieCompactorJob.java @@ -87,8 +87,8 @@ public class TestHoodieCompactorJob extends HoodieOfflineJobTestBase { HoodieCompactor hoodieCompactorSchedule = init(tableBasePath, true, "SCHEDULE", false); hoodieCompactorSchedule.compact(0); - TestHelpers.assertNCompletedCommits(2, tableBasePath, fs); - TestHelpers.assertNCleanCommits(0, tableBasePath, fs); + TestHelpers.assertNCompletedCommits(2, tableBasePath); + TestHelpers.assertNCleanCommits(0, tableBasePath); writeData(true, HoodieActiveTimeline.createNewInstantTime(), 100, true); writeData(true, HoodieActiveTimeline.createNewInstantTime(), 100, true); @@ -97,8 +97,8 @@ public class TestHoodieCompactorJob extends HoodieOfflineJobTestBase { HoodieCompactor hoodieCompactorExecute = init(tableBasePath, false, "EXECUTE", true); hoodieCompactorExecute.compact(0); - TestHelpers.assertNCompletedCommits(5, tableBasePath, fs); - TestHelpers.assertNCleanCommits(1, tableBasePath, fs); + TestHelpers.assertNCompletedCommits(5, tableBasePath); + TestHelpers.assertNCleanCommits(1, tableBasePath); } // -------------------------------------------------------------------------
