This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit b99e87db0d0e28a937a800201fe2aed52c095501
Author: Y Ethan Guo <[email protected]>
AuthorDate: Wed Apr 24 14:41:09 2024 -0700

    [HUDI-7650] Remove FileSystem argument in TestHelpers methods (#11072)
    
    * [HUDI-7650] Remove FileSystem argument in TestHelpers methods
    
    * Fix checkstyle
---
 .../deltastreamer/HoodieDeltaStreamerTestBase.java |  25 ++--
 .../deltastreamer/TestHoodieDeltaStreamer.java     | 148 ++++++++++-----------
 .../TestHoodieDeltaStreamerWithMultiWriter.java    |  22 +--
 .../offlinejob/HoodieOfflineJobTestBase.java       |   7 +-
 .../offlinejob/TestHoodieClusteringJob.java        |  14 +-
 .../offlinejob/TestHoodieCompactorJob.java         |   8 +-
 6 files changed, 111 insertions(+), 113 deletions(-)

diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
index 6b1c09fa7c7..81b5be2ed9e 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
@@ -49,7 +49,6 @@ import org.apache.hudi.utilities.streamer.HoodieStreamer;
 import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
 
 import org.apache.avro.Schema;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
@@ -635,7 +634,7 @@ public class HoodieDeltaStreamerTestBase extends 
UtilitiesTestBase {
       return cfg;
     }
 
-    static void assertAtleastNCompactionCommits(int minExpected, String 
tablePath, FileSystem fs) {
+    static void assertAtleastNCompactionCommits(int minExpected, String 
tablePath) {
       HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build();
       HoodieTimeline timeline = 
meta.getActiveTimeline().getCommitTimeline().filterCompletedInstants();
       LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants());
@@ -643,7 +642,7 @@ public class HoodieDeltaStreamerTestBase extends 
UtilitiesTestBase {
       assertTrue(minExpected <= numCompactionCommits, "Got=" + 
numCompactionCommits + ", exp >=" + minExpected);
     }
 
-    static void assertAtleastNDeltaCommits(int minExpected, String tablePath, 
FileSystem fs) {
+    static void assertAtleastNDeltaCommits(int minExpected, String tablePath) {
       HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build();
       HoodieTimeline timeline = 
meta.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants();
       LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants());
@@ -651,7 +650,7 @@ public class HoodieDeltaStreamerTestBase extends 
UtilitiesTestBase {
       assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ", 
exp >=" + minExpected);
     }
 
-    static void assertAtleastNCompactionCommitsAfterCommit(int minExpected, 
String lastSuccessfulCommit, String tablePath, FileSystem fs) {
+    static void assertAtleastNCompactionCommitsAfterCommit(int minExpected, 
String lastSuccessfulCommit, String tablePath) {
       HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build();
       HoodieTimeline timeline = 
meta.getActiveTimeline().getCommitTimeline().findInstantsAfter(lastSuccessfulCommit).filterCompletedInstants();
       LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants());
@@ -659,7 +658,7 @@ public class HoodieDeltaStreamerTestBase extends 
UtilitiesTestBase {
       assertTrue(minExpected <= numCompactionCommits, "Got=" + 
numCompactionCommits + ", exp >=" + minExpected);
     }
 
-    static void assertAtleastNDeltaCommitsAfterCommit(int minExpected, String 
lastSuccessfulCommit, String tablePath, FileSystem fs) {
+    static void assertAtleastNDeltaCommitsAfterCommit(int minExpected, String 
lastSuccessfulCommit, String tablePath) {
       HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build();
       HoodieTimeline timeline = 
meta.reloadActiveTimeline().getDeltaCommitTimeline().findInstantsAfter(lastSuccessfulCommit).filterCompletedInstants();
       LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants());
@@ -667,7 +666,7 @@ public class HoodieDeltaStreamerTestBase extends 
UtilitiesTestBase {
       assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ", 
exp >=" + minExpected);
     }
 
-    static String assertCommitMetadata(String expected, String tablePath, 
FileSystem fs, int totalCommits)
+    static String assertCommitMetadata(String expected, String tablePath, int 
totalCommits)
         throws IOException {
       HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build();
       HoodieTimeline timeline = 
meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
@@ -696,7 +695,7 @@ public class HoodieDeltaStreamerTestBase extends 
UtilitiesTestBase {
       res.get(timeoutInSecs, TimeUnit.SECONDS);
     }
 
-    static void assertAtLeastNCommits(int minExpected, String tablePath, 
FileSystem fs) {
+    static void assertAtLeastNCommits(int minExpected, String tablePath) {
       HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build();
       HoodieTimeline timeline = 
meta.getActiveTimeline().filterCompletedInstants();
       LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants());
@@ -704,7 +703,7 @@ public class HoodieDeltaStreamerTestBase extends 
UtilitiesTestBase {
       assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ", 
exp >=" + minExpected);
     }
 
-    static void assertAtLeastNReplaceCommits(int minExpected, String 
tablePath, FileSystem fs) {
+    static void assertAtLeastNReplaceCommits(int minExpected, String 
tablePath) {
       HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).setLoadActiveTimelineOnLoad(true).build();
       HoodieTimeline timeline = 
meta.getActiveTimeline().getCompletedReplaceTimeline();
       LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants());
@@ -712,7 +711,7 @@ public class HoodieDeltaStreamerTestBase extends 
UtilitiesTestBase {
       assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ", 
exp >=" + minExpected);
     }
 
-    static void assertPendingIndexCommit(String tablePath, FileSystem fs) {
+    static void assertPendingIndexCommit(String tablePath) {
       HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).setLoadActiveTimelineOnLoad(true).build();
       HoodieTimeline timeline = 
meta.getActiveTimeline().getAllCommitsTimeline().filterPendingIndexTimeline();
       LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants());
@@ -720,7 +719,7 @@ public class HoodieDeltaStreamerTestBase extends 
UtilitiesTestBase {
       assertEquals(1, numIndexCommits, "Got=" + numIndexCommits + ", exp=1");
     }
 
-    static void assertCompletedIndexCommit(String tablePath, FileSystem fs) {
+    static void assertCompletedIndexCommit(String tablePath) {
       HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).setLoadActiveTimelineOnLoad(true).build();
       HoodieTimeline timeline = 
meta.getActiveTimeline().getAllCommitsTimeline().filterCompletedIndexTimeline();
       LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants());
@@ -728,7 +727,7 @@ public class HoodieDeltaStreamerTestBase extends 
UtilitiesTestBase {
       assertEquals(1, numIndexCommits, "Got=" + numIndexCommits + ", exp=1");
     }
 
-    static void assertNoReplaceCommits(String tablePath, FileSystem fs) {
+    static void assertNoReplaceCommits(String tablePath) {
       HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).setLoadActiveTimelineOnLoad(true).build();
       HoodieTimeline timeline = 
meta.getActiveTimeline().getCompletedReplaceTimeline();
       LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants());
@@ -736,7 +735,7 @@ public class HoodieDeltaStreamerTestBase extends 
UtilitiesTestBase {
       assertEquals(0, numDeltaCommits, "Got=" + numDeltaCommits + ", exp =" + 
0);
     }
 
-    static void assertAtLeastNReplaceRequests(int minExpected, String 
tablePath, FileSystem fs) {
+    static void assertAtLeastNReplaceRequests(int minExpected, String 
tablePath) {
       HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).setLoadActiveTimelineOnLoad(true).build();
       HoodieTimeline timeline = 
meta.getActiveTimeline().filterPendingReplaceTimeline();
       LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants());
@@ -744,7 +743,7 @@ public class HoodieDeltaStreamerTestBase extends 
UtilitiesTestBase {
       assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ", 
exp >=" + minExpected);
     }
 
-    static void assertAtLeastNCommitsAfterRollback(int minExpectedRollback, 
int minExpectedCommits, String tablePath, FileSystem fs) {
+    static void assertAtLeastNCommitsAfterRollback(int minExpectedRollback, 
int minExpectedCommits, String tablePath) {
       HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).setLoadActiveTimelineOnLoad(true).build();
       HoodieTimeline timeline = 
meta.getActiveTimeline().getRollbackTimeline().filterCompletedInstants();
       LOG.info("Rollback Timeline Instants=" + 
meta.getActiveTimeline().getInstants());
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 14aa3b5d2e9..23fd8bd9e78 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -555,7 +555,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     new HoodieDeltaStreamer(cfg, jsc).sync();
     assertRecordCount(expected, tableBasePath, sqlContext);
     assertDistanceCount(expected, tableBasePath, sqlContext);
-    TestHelpers.assertCommitMetadata(metadata, tableBasePath, fs, 
totalCommits);
+    TestHelpers.assertCommitMetadata(metadata, tableBasePath, totalCommits);
   }
 
   // TODO add tests w/ disabled reconciliation
@@ -576,7 +576,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     }
     new HoodieDeltaStreamer(cfg, jsc).sync();
     assertRecordCount(1000, tableBasePath, sqlContext);
-    TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
+    TestHelpers.assertCommitMetadata("00000", tableBasePath, 1);
 
     // Upsert data produced with Schema B, pass Schema B
     cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, 
Collections.singletonList(TripsWithEvolvedOptionalFieldTransformer.class.getName()),
@@ -591,7 +591,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     new HoodieDeltaStreamer(cfg, jsc).sync();
     // out of 1000 new records, 500 are inserts, 450 are updates and 50 are 
deletes.
     assertRecordCount(1450, tableBasePath, sqlContext);
-    TestHelpers.assertCommitMetadata("00001", tableBasePath, fs, 2);
+    TestHelpers.assertCommitMetadata("00001", tableBasePath, 2);
     List<Row> counts = countsPerCommit(tableBasePath, sqlContext);
     assertEquals(1450, counts.stream().mapToLong(entry -> 
entry.getLong(1)).sum());
 
@@ -618,7 +618,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     new HoodieDeltaStreamer(cfg, jsc).sync();
     // again, 1000 new records, 500 are inserts, 450 are updates and 50 are 
deletes.
     assertRecordCount(1900, tableBasePath, sqlContext);
-    TestHelpers.assertCommitMetadata("00002", tableBasePath, fs, 3);
+    TestHelpers.assertCommitMetadata("00002", tableBasePath, 3);
     counts = countsPerCommit(tableBasePath, sqlContext);
     assertEquals(1900, counts.stream().mapToLong(entry -> 
entry.getLong(1)).sum());
 
@@ -715,10 +715,10 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
     deltaStreamerTestRunner(ds, cfg, (r) -> {
       if (tableType.equals(HoodieTableType.MERGE_ON_READ)) {
-        TestHelpers.assertAtleastNDeltaCommits(5, tableBasePath, fs);
-        TestHelpers.assertAtleastNCompactionCommits(2, tableBasePath, fs);
+        TestHelpers.assertAtleastNDeltaCommits(5, tableBasePath);
+        TestHelpers.assertAtleastNCompactionCommits(2, tableBasePath);
       } else {
-        TestHelpers.assertAtleastNCompactionCommits(5, tableBasePath, fs);
+        TestHelpers.assertAtleastNCompactionCommits(5, tableBasePath);
       }
       assertRecordCount(totalRecords, tableBasePath, sqlContext);
       assertDistanceCount(totalRecords, tableBasePath, sqlContext);
@@ -795,8 +795,8 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     cfg.configs.add(String.format("%s=%s", 
"hoodie.datasource.write.row.writer.enable", "false"));
     HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
     deltaStreamerTestRunner(ds, cfg, (r) -> {
-      TestHelpers.assertAtLeastNCommits(2, tableBasePath, fs);
-      TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, fs);
+      TestHelpers.assertAtLeastNCommits(2, tableBasePath);
+      TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath);
       return true;
     });
     UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
@@ -814,7 +814,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
     ds.sync();
     // assert ingest successful
-    TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs);
+    TestHelpers.assertAtLeastNCommits(1, tableBasePath);
 
     // schedule a clustering job to build a clustering plan and transition to 
inflight
     HoodieClusteringJob clusteringJob = 
initialHoodieClusteringJob(tableBasePath, null, false, "schedule");
@@ -831,8 +831,8 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     ds2.sync();
     String completeClusteringTimeStamp = 
meta.reloadActiveTimeline().getCompletedReplaceTimeline().lastInstant().get().getTimestamp();
     assertEquals(clusteringRequest.getTimestamp(), 
completeClusteringTimeStamp);
-    TestHelpers.assertAtLeastNCommits(2, tableBasePath, fs);
-    TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, fs);
+    TestHelpers.assertAtLeastNCommits(2, tableBasePath);
+    TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath);
   }
 
   @Test
@@ -859,8 +859,8 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
     prepareParquetDFSUpdates(100, PARQUET_SOURCE_ROOT, "2.parquet", false, 
null, null, dataGenerator, "001");
     deltaStreamer.sync();
-    TestHelpers.assertAtleastNDeltaCommits(2, tableBasePath, fs);
-    TestHelpers.assertAtleastNCompactionCommits(1, tableBasePath, fs);
+    TestHelpers.assertAtleastNDeltaCommits(2, tableBasePath);
+    TestHelpers.assertAtleastNCompactionCommits(1, tableBasePath);
 
     // delete compaction commit
     HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tableBasePath).build();
@@ -873,7 +873,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     prepareParquetDFSUpdates(100, PARQUET_SOURCE_ROOT, "3.parquet", false, 
null, null, dataGenerator, "002");
     deltaStreamer = new HoodieDeltaStreamer(deltaCfg, jsc);
     deltaStreamer.sync();
-    TestHelpers.assertAtleastNDeltaCommits(3, tableBasePath, fs);
+    TestHelpers.assertAtleastNDeltaCommits(3, tableBasePath);
     meta = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tableBasePath).build();
     timeline = meta.getActiveTimeline().getRollbackTimeline();
     assertEquals(1, timeline.getInstants().size());
@@ -899,12 +899,12 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     cfg.configs.add(String.format("%s=%s", 
"hoodie.datasource.write.row.writer.enable", "false"));
     HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
     deltaStreamerTestRunner(ds, cfg, (r) -> {
-      TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, fs);
+      TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath);
       return true;
     });
 
-    TestHelpers.assertAtLeastNCommits(6, tableBasePath, fs);
-    TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, fs);
+    TestHelpers.assertAtLeastNCommits(6, tableBasePath);
+    TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath);
 
     // Step 2 : Get the first replacecommit and extract the corresponding 
replaced file IDs.
     HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tableBasePath).build();
@@ -1049,7 +1049,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
         
Collections.singleton(HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key()
 + "=true"));
 
     deltaStreamerTestRunner(ds, (r) -> {
-      TestHelpers.assertAtLeastNCommits(2, tableBasePath, fs);
+      TestHelpers.assertAtLeastNCommits(2, tableBasePath);
 
       Option<String> scheduleIndexInstantTime = Option.empty();
       try {
@@ -1061,13 +1061,13 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
         return false;
       }
       if (scheduleIndexInstantTime.isPresent()) {
-        TestHelpers.assertPendingIndexCommit(tableBasePath, fs);
+        TestHelpers.assertPendingIndexCommit(tableBasePath);
         LOG.info("Schedule indexing success, now build index with instant time 
" + scheduleIndexInstantTime.get());
         HoodieIndexer runIndexingJob = new HoodieIndexer(jsc,
             buildIndexerConfig(tableBasePath, ds.getConfig().targetTableName, 
scheduleIndexInstantTime.get(), EXECUTE, "COLUMN_STATS"));
         runIndexingJob.start(0);
         LOG.info("Metadata indexing success");
-        TestHelpers.assertCompletedIndexCommit(tableBasePath, fs);
+        TestHelpers.assertCompletedIndexCommit(tableBasePath);
       } else {
         LOG.warn("Metadata indexing failed");
       }
@@ -1084,7 +1084,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     CountDownLatch countDownLatch = new CountDownLatch(1);
 
     deltaStreamerTestRunner(ds, (r) -> {
-      TestHelpers.assertAtLeastNCommits(2, tableBasePath, fs);
+      TestHelpers.assertAtLeastNCommits(2, tableBasePath);
       countDownLatch.countDown();
       return true;
     });
@@ -1105,7 +1105,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
             shouldPassInClusteringInstantTime ? 
scheduleClusteringInstantTime.get() : null, false);
         HoodieClusteringJob clusterClusteringJob = new 
HoodieClusteringJob(jsc, clusterClusteringConfig);
         clusterClusteringJob.cluster(clusterClusteringConfig.retry);
-        TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, fs);
+        TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath);
         LOG.info("Cluster success");
       } else {
         LOG.warn("Clustering execution failed");
@@ -1141,12 +1141,12 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     cfg.configs.add(String.format("%s=%s", 
"hoodie.merge.allow.duplicate.on.inserts", "false"));
     HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
     deltaStreamerTestRunner(ds, cfg, (r) -> {
-      TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, fs);
+      TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath);
       return true;
     });
     // There should be 4 commits, one of which should be a replace commit
-    TestHelpers.assertAtLeastNCommits(4, tableBasePath, fs);
-    TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, fs);
+    TestHelpers.assertAtLeastNCommits(4, tableBasePath);
+    TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath);
     assertDistinctRecordCount(totalRecords, tableBasePath, sqlContext);
     UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
   }
@@ -1179,12 +1179,12 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     deltaStreamerTestRunner(ds, cfg, (r) -> {
       // when pending clustering overlaps w/ incoming, incoming batch will 
fail and hence will result in rollback.
       // But eventually the batch should succeed. so, lets check for 
successful commits after a completed rollback.
-      assertAtLeastNCommitsAfterRollback(1, 1, tableBasePath, fs);
+      assertAtLeastNCommitsAfterRollback(1, 1, tableBasePath);
       return true;
     });
     // There should be 4 commits, one of which should be a replace commit
-    TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, fs);
-    TestHelpers.assertAtLeastNCommits(3, tableBasePath, fs);
+    TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath);
+    TestHelpers.assertAtLeastNCommits(3, tableBasePath);
     UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
   }
 
@@ -1204,13 +1204,13 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     cfg.configs.add(String.format("%s=%s", 
"hoodie.merge.allow.duplicate.on.inserts", "false"));
     HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
     deltaStreamerTestRunner(ds, cfg, (r) -> {
-      TestHelpers.assertAtleastNCompactionCommits(2, tableBasePath, fs);
-      TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, fs);
+      TestHelpers.assertAtleastNCompactionCommits(2, tableBasePath);
+      TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath);
       return true;
     });
     // There should be 4 commits, one of which should be a replace commit
-    TestHelpers.assertAtLeastNCommits(4, tableBasePath, fs);
-    TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, fs);
+    TestHelpers.assertAtLeastNCommits(4, tableBasePath);
+    TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath);
     assertDistinctRecordCount(totalRecords, tableBasePath, sqlContext);
     UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
   }
@@ -1232,7 +1232,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     ds.sync();
 
     // assert ingest successful
-    TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs);
+    TestHelpers.assertAtLeastNCommits(1, tableBasePath);
 
     // schedule a clustering job to build a clustering plan
     HoodieClusteringJob schedule = initialHoodieClusteringJob(tableBasePath, 
null, false, "schedule");
@@ -1273,7 +1273,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
 
     deltaStreamerTestRunner(ds, (r) -> {
       Exception exception = null;
-      TestHelpers.assertAtLeastNCommits(2, tableBasePath, fs);
+      TestHelpers.assertAtLeastNCommits(2, tableBasePath);
       try {
         int result = scheduleClusteringJob.cluster(0);
         if (result == 0) {
@@ -1293,16 +1293,16 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
       }
       switch (runningMode.toLowerCase()) {
         case SCHEDULE_AND_EXECUTE: {
-          TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, fs);
+          TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath);
           return true;
         }
         case SCHEDULE: {
-          TestHelpers.assertAtLeastNReplaceRequests(2, tableBasePath, fs);
-          TestHelpers.assertNoReplaceCommits(tableBasePath, fs);
+          TestHelpers.assertAtLeastNReplaceRequests(2, tableBasePath);
+          TestHelpers.assertNoReplaceCommits(tableBasePath);
           return true;
         }
         case EXECUTE: {
-          TestHelpers.assertNoReplaceCommits(tableBasePath, fs);
+          TestHelpers.assertNoReplaceCommits(tableBasePath);
           return true;
         }
         default:
@@ -1469,12 +1469,12 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     // trigger continuous DS and wait until 1 replace commit is complete.
     try {
       deltaStreamerTestRunner(ds, cfg, (r) -> {
-        TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, fs);
+        TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath);
         return true;
       });
       // There should be 4 commits, one of which should be a replace commit
-      TestHelpers.assertAtLeastNCommits(4, tableBasePath, fs);
-      TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, fs);
+      TestHelpers.assertAtLeastNCommits(4, tableBasePath);
+      TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath);
     } finally {
       // clean up resources
       ds.shutdownGracefully();
@@ -1505,7 +1505,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     assertRecordCount(1000, tableBasePath, sqlContext);
     assertDistanceCount(1000, tableBasePath, sqlContext);
     assertDistanceCountWithExactValue(1000, tableBasePath, sqlContext);
-    String lastInstantForUpstreamTable = 
TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
+    String lastInstantForUpstreamTable = 
TestHelpers.assertCommitMetadata("00000", tableBasePath, 1);
 
     // Now incrementally pull from the above hudi table and ingest to second 
table
     HoodieDeltaStreamer.Config downstreamCfg =
@@ -1516,7 +1516,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     assertRecordCount(1000, downstreamTableBasePath, sqlContext);
     assertDistanceCount(1000, downstreamTableBasePath, sqlContext);
     assertDistanceCountWithExactValue(1000, downstreamTableBasePath, 
sqlContext);
-    TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable, 
downstreamTableBasePath, fs, 1);
+    TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable, 
downstreamTableBasePath, 1);
 
     // No new data => no commits for upstream table
     cfg.sourceLimit = 0;
@@ -1524,7 +1524,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     assertRecordCount(1000, tableBasePath, sqlContext);
     assertDistanceCount(1000, tableBasePath, sqlContext);
     assertDistanceCountWithExactValue(1000, tableBasePath, sqlContext);
-    TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
+    TestHelpers.assertCommitMetadata("00000", tableBasePath, 1);
 
     // with no change in upstream table, no change in downstream too when 
pulled.
     HoodieDeltaStreamer.Config downstreamCfg1 =
@@ -1534,7 +1534,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     assertRecordCount(1000, downstreamTableBasePath, sqlContext);
     assertDistanceCount(1000, downstreamTableBasePath, sqlContext);
     assertDistanceCountWithExactValue(1000, downstreamTableBasePath, 
sqlContext);
-    TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable, 
downstreamTableBasePath, fs, 1);
+    TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable, 
downstreamTableBasePath, 1);
 
     // upsert() #1 on upstream hudi table
     cfg.sourceLimit = 2000;
@@ -1543,7 +1543,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     assertRecordCount(1950, tableBasePath, sqlContext);
     assertDistanceCount(1950, tableBasePath, sqlContext);
     assertDistanceCountWithExactValue(1950, tableBasePath, sqlContext);
-    lastInstantForUpstreamTable = TestHelpers.assertCommitMetadata("00001", 
tableBasePath, fs, 2);
+    lastInstantForUpstreamTable = TestHelpers.assertCommitMetadata("00001", 
tableBasePath, 2);
     List<Row> counts = countsPerCommit(tableBasePath, sqlContext);
     assertEquals(1950, counts.stream().mapToLong(entry -> 
entry.getLong(1)).sum());
 
@@ -1558,7 +1558,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     assertDistanceCount(2000, downstreamTableBasePath, sqlContext);
     assertDistanceCountWithExactValue(2000, downstreamTableBasePath, 
sqlContext);
     String finalInstant =
-        TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable, 
downstreamTableBasePath, fs, 2);
+        TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable, 
downstreamTableBasePath, 2);
     counts = countsPerCommit(downstreamTableBasePath, sqlContext);
     assertEquals(2000, counts.stream().mapToLong(entry -> 
entry.getLong(1)).sum());
 
@@ -1670,7 +1670,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.BULK_INSERT);
     new HoodieDeltaStreamer(cfg, jsc).sync();
     assertRecordCount(1000, tableBasePath, sqlContext);
-    TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
+    TestHelpers.assertCommitMetadata("00000", tableBasePath, 1);
 
     // Generate the same 1000 records + 1000 new ones for upsert
     cfg.filterDupes = true;
@@ -1678,7 +1678,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     cfg.operation = WriteOperationType.INSERT;
     new HoodieDeltaStreamer(cfg, jsc).sync();
     assertRecordCount(2000, tableBasePath, sqlContext);
-    TestHelpers.assertCommitMetadata("00001", tableBasePath, fs, 2);
+    TestHelpers.assertCommitMetadata("00001", tableBasePath, 2);
     // 1000 records for commit 00000 & 1000 for commit 00001
     List<Row> counts = countsPerCommit(tableBasePath, sqlContext);
     assertEquals(1000, counts.get(0).getLong(1));
@@ -2464,7 +2464,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
 
       HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(cfg, jsc);
       deltaStreamerTestRunner(deltaStreamer, cfg, (r) -> {
-        TestHelpers.assertAtleastNCompactionCommits(numRecords / sourceLimit + 
((numRecords % sourceLimit == 0) ? 0 : 1), tableBasePath, fs);
+        TestHelpers.assertAtleastNCompactionCommits(numRecords / sourceLimit + 
((numRecords % sourceLimit == 0) ? 0 : 1), tableBasePath);
         assertRecordCount(numRecords, tableBasePath, sqlContext);
         return true;
       });
@@ -2593,7 +2593,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     new HoodieDeltaStreamer(cfg, jsc).sync();
     assertRecordCount(1000, tableBasePath, sqlContext);
     assertDistanceCount(1000, tableBasePath, sqlContext);
-    TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
+    TestHelpers.assertCommitMetadata("00000", tableBasePath, 1);
 
     // Collect the fileIds before running HoodieDeltaStreamer
     Set<String> beforeFileIDs = getAllFileIDsInTable(tableBasePath, 
Option.empty());
@@ -2607,12 +2607,12 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     if (operationType == WriteOperationType.INSERT_OVERWRITE) {
       assertRecordCount(1000, tableBasePath, sqlContext);
       assertDistanceCount(1000, tableBasePath, sqlContext);
-      TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
+      TestHelpers.assertCommitMetadata("00000", tableBasePath, 1);
     } else if (operationType == WriteOperationType.INSERT_OVERWRITE_TABLE) {
       HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(tableBasePath).build();
       final HoodieTableFileSystemView fsView = new 
HoodieTableFileSystemView(metaClient, 
metaClient.getCommitsAndCompactionTimeline());
       assertEquals(0, fsView.getLatestFileSlices("").count());
-      TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
+      TestHelpers.assertCommitMetadata("00000", tableBasePath, 1);
 
       // Since the table has been overwritten all fileIDs before should have 
been replaced
       Set<String> afterFileIDs = getAllFileIDsInTable(tableBasePath, 
Option.empty());
@@ -2623,7 +2623,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     new HoodieDeltaStreamer(cfg, jsc).sync();
     assertRecordCount(950, tableBasePath, sqlContext);
     assertDistanceCount(950, tableBasePath, sqlContext);
-    TestHelpers.assertCommitMetadata("00001", tableBasePath, fs, 2);
+    TestHelpers.assertCommitMetadata("00001", tableBasePath, 2);
     UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
   }
 
@@ -2671,7 +2671,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
     ds.sync();
     // assert ingest successful
-    TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs);
+    TestHelpers.assertAtLeastNCommits(1, tableBasePath);
 
     TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(
         
HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(fs.getConf()).build());
@@ -2713,8 +2713,8 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.BULK_INSERT);
     new HoodieDeltaStreamer(cfg, jsc).sync();
     assertRecordCount(1000, tableBasePath, sqlContext);
-    TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
-    TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs);
+    TestHelpers.assertCommitMetadata("00000", tableBasePath, 1);
+    TestHelpers.assertAtLeastNCommits(1, tableBasePath);
 
     // change cow to mor
     HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
@@ -2736,24 +2736,24 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     new HoodieDeltaStreamer(cfg, jsc).sync();
     // out of 1000 new records, 500 are inserts, 450 are updates and 50 are 
deletes.
     assertRecordCount(1450, tableBasePath, sqlContext);
-    TestHelpers.assertCommitMetadata("00001", tableBasePath, fs, 2);
+    TestHelpers.assertCommitMetadata("00001", tableBasePath, 2);
     List<Row> counts = countsPerCommit(tableBasePath, sqlContext);
     assertEquals(1450, counts.stream().mapToLong(entry -> 
entry.getLong(1)).sum());
-    TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs);
+    TestHelpers.assertAtLeastNCommits(1, tableBasePath);
     // currently there should be 1 deltacommits now
-    TestHelpers.assertAtleastNDeltaCommits(1, tableBasePath, fs);
+    TestHelpers.assertAtleastNDeltaCommits(1, tableBasePath);
 
     // test the table type is already mor
     new HoodieDeltaStreamer(cfg, jsc).sync();
     // out of 1000 new records, 500 are inserts, 450 are updates and 50 are 
deletes.
     // total records should be 1900 now
     assertRecordCount(1900, tableBasePath, sqlContext);
-    TestHelpers.assertCommitMetadata("00002", tableBasePath, fs, 3);
+    TestHelpers.assertCommitMetadata("00002", tableBasePath, 3);
     counts = countsPerCommit(tableBasePath, sqlContext);
     assertEquals(1900, counts.stream().mapToLong(entry -> 
entry.getLong(1)).sum());
-    TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs);
+    TestHelpers.assertAtLeastNCommits(1, tableBasePath);
     // currently there should be 2 deltacommits now
-    TestHelpers.assertAtleastNDeltaCommits(2, tableBasePath, fs);
+    TestHelpers.assertAtleastNDeltaCommits(2, tableBasePath);
 
     // clean up
     UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
@@ -2767,8 +2767,8 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     cfg.tableType = HoodieTableType.MERGE_ON_READ.name();
     new HoodieDeltaStreamer(cfg, jsc).sync();
     assertRecordCount(1000, tableBasePath, sqlContext);
-    TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
-    TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs);
+    TestHelpers.assertCommitMetadata("00000", tableBasePath, 1);
+    TestHelpers.assertAtLeastNCommits(1, tableBasePath);
 
     // sync once, make one deltacommit and do a full compaction
     cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT);
@@ -2780,12 +2780,12 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     assertRecordCount(1450, tableBasePath, sqlContext);
     // totalCommits: 1 deltacommit(bulk_insert) + 1 deltacommit(upsert) + 1 
commit(compaction)
     // there is no checkpoint in the compacted commit metadata, the latest 
checkpoint 00001 is in the upsert deltacommit
-    TestHelpers.assertCommitMetadata(null, tableBasePath, fs, 3);
+    TestHelpers.assertCommitMetadata(null, tableBasePath, 3);
     List<Row> counts = countsPerCommit(tableBasePath, sqlContext);
     assertEquals(1450, counts.stream().mapToLong(entry -> 
entry.getLong(1)).sum());
-    TestHelpers.assertAtLeastNCommits(3, tableBasePath, fs);
+    TestHelpers.assertAtLeastNCommits(3, tableBasePath);
     // currently there should be 2 deltacommits now
-    TestHelpers.assertAtleastNDeltaCommits(2, tableBasePath, fs);
+    TestHelpers.assertAtleastNDeltaCommits(2, tableBasePath);
 
     // change mor to cow
     HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
@@ -2808,20 +2808,20 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     // out of 1000 new records, 500 are inserts, 450 are updates and 50 are 
deletes.
     assertRecordCount(1900, tableBasePath, sqlContext);
     // the checkpoint now should be 00002
-    TestHelpers.assertCommitMetadata("00002", tableBasePath, fs, 4);
+    TestHelpers.assertCommitMetadata("00002", tableBasePath, 4);
     counts = countsPerCommit(tableBasePath, sqlContext);
     assertEquals(1900, counts.stream().mapToLong(entry -> 
entry.getLong(1)).sum());
-    TestHelpers.assertAtLeastNCommits(4, tableBasePath, fs);
+    TestHelpers.assertAtLeastNCommits(4, tableBasePath);
 
     // test the table type is already cow
     new HoodieDeltaStreamer(cfg, jsc).sync();
     // out of 1000 new records, 500 are inserts, 450 are updates and 50 are 
deletes.
     // total records should be 2350 now
     assertRecordCount(2350, tableBasePath, sqlContext);
-    TestHelpers.assertCommitMetadata("00003", tableBasePath, fs, 5);
+    TestHelpers.assertCommitMetadata("00003", tableBasePath, 5);
     counts = countsPerCommit(tableBasePath, sqlContext);
     assertEquals(2350, counts.stream().mapToLong(entry -> 
entry.getLong(1)).sum());
-    TestHelpers.assertAtLeastNCommits(5, tableBasePath, fs);
+    TestHelpers.assertAtLeastNCommits(5, tableBasePath);
 
     // clean up
     UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
@@ -2867,7 +2867,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     cfg.configs.add(String.format("%s=%s", UPSERT_PARALLELISM_VALUE.key(), 
upsertParallelism));
     HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
     deltaStreamerTestRunner(ds, cfg, (r) -> {
-      TestHelpers.assertAtLeastNCommits(2, tableBasePath, fs);
+      TestHelpers.assertAtLeastNCommits(2, tableBasePath);
       // make sure the UPSERT_PARALLELISM_VALUE already changed (hot updated)
       Assertions.assertTrue(((HoodieStreamer.StreamSyncService) 
ds.getIngestionService()).getProps().getLong(UPSERT_PARALLELISM_VALUE.key()) > 
upsertParallelism);
       return true;
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerWithMultiWriter.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerWithMultiWriter.java
index 4df68b9fbe9..526fc11a6bd 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerWithMultiWriter.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerWithMultiWriter.java
@@ -121,10 +121,10 @@ public class TestHoodieDeltaStreamerWithMultiWriter 
extends HoodieDeltaStreamerT
     // Prepare base dataset with some commits
     deltaStreamerTestRunner(prepJob, prepJobConfig, (r) -> {
       if (tableType.equals(HoodieTableType.MERGE_ON_READ)) {
-        TestHelpers.assertAtleastNDeltaCommits(3, tableBasePath, fs);
-        TestHelpers.assertAtleastNCompactionCommits(1, tableBasePath, fs);
+        TestHelpers.assertAtleastNDeltaCommits(3, tableBasePath);
+        TestHelpers.assertAtleastNCompactionCommits(1, tableBasePath);
       } else {
-        TestHelpers.assertAtleastNCompactionCommits(3, tableBasePath, fs);
+        TestHelpers.assertAtleastNCompactionCommits(3, tableBasePath);
       }
       assertRecordCount(totalRecords, tableBasePath, sqlContext);
       assertDistanceCount(totalRecords, tableBasePath, sqlContext);
@@ -188,10 +188,10 @@ public class TestHoodieDeltaStreamerWithMultiWriter 
extends HoodieDeltaStreamerT
     // Prepare base dataset with some commits
     deltaStreamerTestRunner(prepJob, prepJobConfig, (r) -> {
       if (tableType.equals(HoodieTableType.MERGE_ON_READ)) {
-        TestHelpers.assertAtleastNDeltaCommits(3, tableBasePath, fs);
-        TestHelpers.assertAtleastNCompactionCommits(1, tableBasePath, fs);
+        TestHelpers.assertAtleastNDeltaCommits(3, tableBasePath);
+        TestHelpers.assertAtleastNCompactionCommits(1, tableBasePath);
       } else {
-        TestHelpers.assertAtleastNCompactionCommits(3, tableBasePath, fs);
+        TestHelpers.assertAtleastNCompactionCommits(3, tableBasePath);
       }
       assertRecordCount(totalRecords, tableBasePath, sqlContext);
       assertDistanceCount(totalRecords, tableBasePath, sqlContext);
@@ -262,10 +262,10 @@ public class TestHoodieDeltaStreamerWithMultiWriter 
extends HoodieDeltaStreamerT
     // Prepare base dataset with some commits
     deltaStreamerTestRunner(prepJob, prepJobConfig, (r) -> {
       if (tableType.equals(HoodieTableType.MERGE_ON_READ)) {
-        TestHelpers.assertAtleastNDeltaCommits(3, tableBasePath, fs);
-        TestHelpers.assertAtleastNCompactionCommits(1, tableBasePath, fs);
+        TestHelpers.assertAtleastNDeltaCommits(3, tableBasePath);
+        TestHelpers.assertAtleastNCompactionCommits(1, tableBasePath);
       } else {
-        TestHelpers.assertAtleastNCompactionCommits(3, tableBasePath, fs);
+        TestHelpers.assertAtleastNCompactionCommits(3, tableBasePath);
       }
       assertRecordCount(totalRecords, tableBasePath, sqlContext);
       assertDistanceCount(totalRecords, tableBasePath, sqlContext);
@@ -394,9 +394,9 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends 
HoodieDeltaStreamerT
     // Condition for parallel ingestion job
     Function<Boolean, Boolean> conditionForRegularIngestion = (r) -> {
       if (tableType.equals(HoodieTableType.MERGE_ON_READ)) {
-        TestHelpers.assertAtleastNDeltaCommitsAfterCommit(3, 
lastSuccessfulCommit, tableBasePath, fs);
+        TestHelpers.assertAtleastNDeltaCommitsAfterCommit(3, 
lastSuccessfulCommit, tableBasePath);
       } else {
-        TestHelpers.assertAtleastNCompactionCommitsAfterCommit(3, 
lastSuccessfulCommit, tableBasePath, fs);
+        TestHelpers.assertAtleastNCompactionCommitsAfterCommit(3, 
lastSuccessfulCommit, tableBasePath);
       }
       assertRecordCount(totalRecords, tableBasePath, sqlContext);
       assertDistanceCount(totalRecords, tableBasePath, sqlContext);
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/HoodieOfflineJobTestBase.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/HoodieOfflineJobTestBase.java
index 33615cdddee..6feb344af7e 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/HoodieOfflineJobTestBase.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/HoodieOfflineJobTestBase.java
@@ -29,7 +29,6 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
 
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.spark.api.java.JavaRDD;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
@@ -107,7 +106,7 @@ public class HoodieOfflineJobTestBase extends 
UtilitiesTestBase {
   //  Inner Class
   // -------------------------------------------------------------------------
   static class TestHelpers {
-    static void assertNCompletedCommits(int expected, String tablePath, 
FileSystem fs) {
+    static void assertNCompletedCommits(int expected, String tablePath) {
       HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build();
       HoodieTimeline timeline = 
meta.getActiveTimeline().getWriteTimeline().filterCompletedInstants();
       LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants());
@@ -115,7 +114,7 @@ public class HoodieOfflineJobTestBase extends 
UtilitiesTestBase {
       assertEquals(expected, numCommits, "Got=" + numCommits + ", exp =" + 
expected);
     }
 
-    static void assertNCleanCommits(int expected, String tablePath, FileSystem 
fs) {
+    static void assertNCleanCommits(int expected, String tablePath) {
       HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build();
       HoodieTimeline timeline = 
meta.getActiveTimeline().getCleanerTimeline().filterCompletedInstants();
       LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants());
@@ -123,7 +122,7 @@ public class HoodieOfflineJobTestBase extends 
UtilitiesTestBase {
       assertEquals(expected, numCleanCommits, "Got=" + numCleanCommits + ", 
exp =" + expected);
     }
 
-    static void assertNClusteringCommits(int expected, String tablePath, 
FileSystem fs) {
+    static void assertNClusteringCommits(int expected, String tablePath) {
       HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build();
       HoodieTimeline timeline = 
meta.getActiveTimeline().getCompletedReplaceTimeline();
       LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants());
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/TestHoodieClusteringJob.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/TestHoodieClusteringJob.java
index c6ed0c698ff..e77c90ec034 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/TestHoodieClusteringJob.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/TestHoodieClusteringJob.java
@@ -71,8 +71,8 @@ public class TestHoodieClusteringJob extends 
HoodieOfflineJobTestBase {
     HoodieClusteringJob hoodieCluster =
         init(tableBasePath, true, "scheduleAndExecute", false);
     hoodieCluster.cluster(0);
-    HoodieOfflineJobTestBase.TestHelpers.assertNClusteringCommits(1, 
tableBasePath, fs);
-    HoodieOfflineJobTestBase.TestHelpers.assertNCleanCommits(0, tableBasePath, 
fs);
+    HoodieOfflineJobTestBase.TestHelpers.assertNClusteringCommits(1, 
tableBasePath);
+    HoodieOfflineJobTestBase.TestHelpers.assertNCleanCommits(0, tableBasePath);
 
     writeData(false, HoodieActiveTimeline.createNewInstantTime(), 100, true);
     writeData(false, HoodieActiveTimeline.createNewInstantTime(), 100, true);
@@ -81,8 +81,8 @@ public class TestHoodieClusteringJob extends 
HoodieOfflineJobTestBase {
     hoodieCluster =
         init(tableBasePath, true, "scheduleAndExecute", true);
     hoodieCluster.cluster(0);
-    HoodieOfflineJobTestBase.TestHelpers.assertNClusteringCommits(2, 
tableBasePath, fs);
-    HoodieOfflineJobTestBase.TestHelpers.assertNCleanCommits(1, tableBasePath, 
fs);
+    HoodieOfflineJobTestBase.TestHelpers.assertNClusteringCommits(2, 
tableBasePath);
+    HoodieOfflineJobTestBase.TestHelpers.assertNCleanCommits(1, tableBasePath);
   }
 
   @Test
@@ -107,8 +107,8 @@ public class TestHoodieClusteringJob extends 
HoodieOfflineJobTestBase {
     HoodieClusteringJob hoodieCluster =
         init(tableBasePath, true, "scheduleAndExecute", false);
     hoodieCluster.cluster(0);
-    HoodieOfflineJobTestBase.TestHelpers.assertNClusteringCommits(1, 
tableBasePath, fs);
-    HoodieOfflineJobTestBase.TestHelpers.assertNCleanCommits(0, tableBasePath, 
fs);
+    HoodieOfflineJobTestBase.TestHelpers.assertNClusteringCommits(1, 
tableBasePath);
+    HoodieOfflineJobTestBase.TestHelpers.assertNCleanCommits(0, tableBasePath);
 
     // remove the completed instant from timeline and trigger purge of pending 
clustering instant.
     HoodieInstant latestClusteringInstant = metaClient.getActiveTimeline()
@@ -121,7 +121,7 @@ public class TestHoodieClusteringJob extends 
HoodieOfflineJobTestBase {
         getClusteringConfigForPurge(tableBasePath, true, 
PURGE_PENDING_INSTANT, false, latestClusteringInstant.getTimestamp());
     hoodieCluster.cluster(0);
     // validate that there are no clustering commits in timeline.
-    HoodieOfflineJobTestBase.TestHelpers.assertNClusteringCommits(0, 
tableBasePath, fs);
+    HoodieOfflineJobTestBase.TestHelpers.assertNClusteringCommits(0, 
tableBasePath);
 
     // validate that no records match the clustering instant.
     String[] fullPartitionPaths = new 
String[dataGen.getPartitionPaths().length];
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/TestHoodieCompactorJob.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/TestHoodieCompactorJob.java
index 689d76f5525..8fbb3210a71 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/TestHoodieCompactorJob.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/TestHoodieCompactorJob.java
@@ -87,8 +87,8 @@ public class TestHoodieCompactorJob extends 
HoodieOfflineJobTestBase {
     HoodieCompactor hoodieCompactorSchedule =
         init(tableBasePath, true, "SCHEDULE", false);
     hoodieCompactorSchedule.compact(0);
-    TestHelpers.assertNCompletedCommits(2, tableBasePath, fs);
-    TestHelpers.assertNCleanCommits(0, tableBasePath, fs);
+    TestHelpers.assertNCompletedCommits(2, tableBasePath);
+    TestHelpers.assertNCleanCommits(0, tableBasePath);
 
     writeData(true, HoodieActiveTimeline.createNewInstantTime(), 100, true);
     writeData(true, HoodieActiveTimeline.createNewInstantTime(), 100, true);
@@ -97,8 +97,8 @@ public class TestHoodieCompactorJob extends 
HoodieOfflineJobTestBase {
     HoodieCompactor hoodieCompactorExecute =
         init(tableBasePath, false, "EXECUTE", true);
     hoodieCompactorExecute.compact(0);
-    TestHelpers.assertNCompletedCommits(5, tableBasePath, fs);
-    TestHelpers.assertNCleanCommits(1, tableBasePath, fs);
+    TestHelpers.assertNCompletedCommits(5, tableBasePath);
+    TestHelpers.assertNCleanCommits(1, tableBasePath);
   }
 
   // -------------------------------------------------------------------------

Reply via email to