This is an automated email from the ASF dual-hosted git repository.

pwason pushed a commit to branch release-0.14.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 456f6731cc4fb29abbc3c9fbd51a9c798efab310
Author: Lokesh Jain <[email protected]>
AuthorDate: Tue Sep 12 02:04:24 2023 +0530

    [HUDI-6753] Fix parquet inline reading flaky test (#9618)
---
 .../deltastreamer/HoodieDeltaStreamerTestBase.java | 269 +++++++++++-
 .../deltastreamer/TestHoodieDeltaStreamer.java     | 472 +++++----------------
 .../TestHoodieDeltaStreamerDAGExecution.java       |   4 +-
 .../TestHoodieDeltaStreamerWithMultiWriter.java    | 127 +++---
 .../TestHoodieMultiTableDeltaStreamer.java         |  12 +-
 .../utilities/deltastreamer/TestTransformer.java   |   4 +-
 .../utilities/testutils/UtilitiesTestBase.java     |   3 +-
 7 files changed, 462 insertions(+), 429 deletions(-)

diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
index 3c5b45b35c1..b117b2001fa 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
@@ -21,6 +21,7 @@ package org.apache.hudi.utilities.deltastreamer;
 
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
@@ -34,6 +35,7 @@ import org.apache.hudi.config.HoodieClusteringConfig;
 import org.apache.hudi.hive.MultiPartKeysValueExtractor;
 import org.apache.hudi.utilities.config.SourceTestConfig;
 import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
+import org.apache.hudi.utilities.sources.HoodieIncrSource;
 import org.apache.hudi.utilities.sources.TestDataSource;
 import org.apache.hudi.utilities.sources.TestParquetDFSSourceEmptyBatch;
 import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
@@ -41,18 +43,27 @@ import 
org.apache.hudi.utilities.testutils.UtilitiesTestBase;
 import org.apache.avro.Schema;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SQLContext;
 import org.apache.spark.streaming.kafka010.KafkaTestUtils;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
 
 import static org.apache.hudi.common.util.StringUtils.nonEmpty;
 import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_URL;
@@ -62,9 +73,14 @@ import static 
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NA
 import static 
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS;
 import static 
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_FIELDS;
 import static 
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_TABLE_NAME;
+import static org.apache.hudi.utilities.streamer.HoodieStreamer.CHECKPOINT_KEY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieDeltaStreamerTestBase.class);
+
   static final Random RANDOM = new Random();
   static final String PROPS_FILENAME_TEST_SOURCE = "test-source.properties";
   static final String PROPS_FILENAME_TEST_SOURCE1 = "test-source1.properties";
@@ -111,6 +127,8 @@ public class HoodieDeltaStreamerTestBase extends 
UtilitiesTestBase {
   protected static String defaultSchemaProviderClassName = 
FilebasedSchemaProvider.class.getName();
   protected static int testNum = 1;
 
+  Map<String, String> hudiOpts = new HashMap<>();
+
   protected static void prepareTestSetup() throws IOException {
     PARQUET_SOURCE_ROOT = basePath + "/parquetFiles";
     ORC_SOURCE_ROOT = basePath + "/orcFiles";
@@ -230,8 +248,9 @@ public class HoodieDeltaStreamerTestBase extends 
UtilitiesTestBase {
   }
 
   @BeforeEach
-  public void resetTestDataSource() {
+  public void setupTest() {
     TestDataSource.returnEmptyBatch = false;
+    hudiOpts = new HashMap<>();
   }
 
   protected static void 
populateInvalidTableConfigFilePathProps(TypedProperties props, String 
dfsBasePath) {
@@ -431,4 +450,252 @@ public class HoodieDeltaStreamerTestBase extends 
UtilitiesTestBase {
         
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
   }
 
+  void assertRecordCount(long expected, String tablePath, SQLContext 
sqlContext) {
+    sqlContext.clearCache();
+    long recordCount = 
sqlContext.read().options(hudiOpts).format("org.apache.hudi").load(tablePath).count();
+    assertEquals(expected, recordCount);
+  }
+
+  void assertDistinctRecordCount(long expected, String tablePath, SQLContext 
sqlContext) {
+    sqlContext.clearCache();
+    long recordCount = 
sqlContext.read().options(hudiOpts).format("org.apache.hudi").load(tablePath).select("_hoodie_record_key").distinct().count();
+    assertEquals(expected, recordCount);
+  }
+
+  List<Row> countsPerCommit(String tablePath, SQLContext sqlContext) {
+    sqlContext.clearCache();
+    List<Row> rows = 
sqlContext.read().options(hudiOpts).format("org.apache.hudi").load(tablePath)
+        .groupBy("_hoodie_commit_time").count()
+        .sort("_hoodie_commit_time").collectAsList();
+    return rows;
+  }
+
+  void assertDistanceCount(long expected, String tablePath, SQLContext 
sqlContext) {
+    sqlContext.clearCache();
+    
sqlContext.read().options(hudiOpts).format("org.apache.hudi").load(tablePath).registerTempTable("tmp_trips");
+    long recordCount =
+        sqlContext.sql("select * from tmp_trips where haversine_distance is 
not NULL").count();
+    assertEquals(expected, recordCount);
+  }
+
+  void assertDistanceCountWithExactValue(long expected, String tablePath, 
SQLContext sqlContext) {
+    sqlContext.clearCache();
+    
sqlContext.read().options(hudiOpts).format("org.apache.hudi").load(tablePath).registerTempTable("tmp_trips");
+    long recordCount =
+        sqlContext.sql("select * from tmp_trips where haversine_distance = 
1.0").count();
+    assertEquals(expected, recordCount);
+  }
+
+  Map<String, Long> getPartitionRecordCount(String basePath, SQLContext 
sqlContext) {
+    sqlContext.clearCache();
+    List<Row> rows = 
sqlContext.read().options(hudiOpts).format("org.apache.hudi")
+        .load(basePath)
+        .groupBy(HoodieRecord.PARTITION_PATH_METADATA_FIELD)
+        .count()
+        .collectAsList();
+    Map<String, Long> partitionRecordCount = new HashMap<>();
+    rows.stream().forEach(row -> partitionRecordCount.put(row.getString(0), 
row.getLong(1)));
+    return partitionRecordCount;
+  }
+
+  void assertNoPartitionMatch(String basePath, SQLContext sqlContext, String 
partitionToValidate) {
+    sqlContext.clearCache();
+    assertEquals(0, 
sqlContext.read().options(hudiOpts).format("org.apache.hudi").load(basePath)
+        .filter(HoodieRecord.PARTITION_PATH_METADATA_FIELD + " = " + 
partitionToValidate)
+        .count());
+  }
+
+  static class TestHelpers {
+
+    static HoodieDeltaStreamer.Config makeDropAllConfig(String basePath, 
WriteOperationType op) {
+      return makeConfig(basePath, op, 
Collections.singletonList(TestHoodieDeltaStreamer.DropAllTransformer.class.getName()));
+    }
+
+    static HoodieDeltaStreamer.Config makeConfig(String basePath, 
WriteOperationType op) {
+      return makeConfig(basePath, op, 
Collections.singletonList(TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName()));
+    }
+
+    static HoodieDeltaStreamer.Config makeConfig(String basePath, 
WriteOperationType op, List<String> transformerClassNames) {
+      return makeConfig(basePath, op, transformerClassNames, 
PROPS_FILENAME_TEST_SOURCE, false);
+    }
+
+    static HoodieDeltaStreamer.Config makeConfig(String basePath, 
WriteOperationType op, List<String> transformerClassNames,
+                                                 String propsFilename, boolean 
enableHiveSync) {
+      return makeConfig(basePath, op, transformerClassNames, propsFilename, 
enableHiveSync, true,
+          false, null, null);
+    }
+
+    static HoodieDeltaStreamer.Config makeConfig(String basePath, 
WriteOperationType op, List<String> transformerClassNames,
+                                                 String propsFilename, boolean 
enableHiveSync, boolean useSchemaProviderClass, boolean updatePayloadClass,
+                                                 String payloadClassName, 
String tableType) {
+      return makeConfig(basePath, op, TestDataSource.class.getName(), 
transformerClassNames, propsFilename, enableHiveSync,
+          useSchemaProviderClass, 1000, updatePayloadClass, payloadClassName, 
tableType, "timestamp", null);
+    }
+
+    static HoodieDeltaStreamer.Config makeConfig(String basePath, 
WriteOperationType op, String sourceClassName,
+                                                 List<String> 
transformerClassNames, String propsFilename, boolean enableHiveSync, boolean 
useSchemaProviderClass,
+                                                 int sourceLimit, boolean 
updatePayloadClass, String payloadClassName, String tableType, String 
sourceOrderingField,
+                                                 String checkpoint) {
+      return makeConfig(basePath, op, sourceClassName, transformerClassNames, 
propsFilename, enableHiveSync, useSchemaProviderClass, sourceLimit, 
updatePayloadClass, payloadClassName,
+          tableType, sourceOrderingField, checkpoint, false);
+    }
+
+    static HoodieDeltaStreamer.Config makeConfig(String basePath, 
WriteOperationType op, String sourceClassName,
+                                                 List<String> 
transformerClassNames, String propsFilename, boolean enableHiveSync, boolean 
useSchemaProviderClass,
+                                                 int sourceLimit, boolean 
updatePayloadClass, String payloadClassName, String tableType, String 
sourceOrderingField,
+                                                 String checkpoint, boolean 
allowCommitOnNoCheckpointChange) {
+      HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
+      cfg.targetBasePath = basePath;
+      cfg.targetTableName = "hoodie_trips";
+      cfg.tableType = tableType == null ? "COPY_ON_WRITE" : tableType;
+      cfg.sourceClassName = sourceClassName;
+      cfg.transformerClassNames = transformerClassNames;
+      cfg.operation = op;
+      cfg.enableHiveSync = enableHiveSync;
+      cfg.sourceOrderingField = sourceOrderingField;
+      cfg.propsFilePath = UtilitiesTestBase.basePath + "/" + propsFilename;
+      cfg.sourceLimit = sourceLimit;
+      cfg.checkpoint = checkpoint;
+      if (updatePayloadClass) {
+        cfg.payloadClassName = payloadClassName;
+      }
+      if (useSchemaProviderClass) {
+        cfg.schemaProviderClassName = defaultSchemaProviderClassName;
+      }
+      cfg.allowCommitOnNoCheckpointChange = allowCommitOnNoCheckpointChange;
+      return cfg;
+    }
+
+    static HoodieDeltaStreamer.Config makeConfigForHudiIncrSrc(String 
srcBasePath, String basePath, WriteOperationType op,
+                                                               boolean 
addReadLatestOnMissingCkpt, String schemaProviderClassName) {
+      HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
+      cfg.targetBasePath = basePath;
+      cfg.targetTableName = "hoodie_trips_copy";
+      cfg.tableType = "COPY_ON_WRITE";
+      cfg.sourceClassName = HoodieIncrSource.class.getName();
+      cfg.operation = op;
+      cfg.sourceOrderingField = "timestamp";
+      cfg.propsFilePath = UtilitiesTestBase.basePath + 
"/test-downstream-source.properties";
+      cfg.sourceLimit = 1000;
+      if (null != schemaProviderClassName) {
+        cfg.schemaProviderClassName = schemaProviderClassName;
+      }
+      List<String> cfgs = new ArrayList<>();
+      
cfgs.add("hoodie.deltastreamer.source.hoodieincr.read_latest_on_missing_ckpt=" 
+ addReadLatestOnMissingCkpt);
+      cfgs.add("hoodie.deltastreamer.source.hoodieincr.path=" + srcBasePath);
+      // No partition
+      
cfgs.add("hoodie.deltastreamer.source.hoodieincr.partition.fields=datestr");
+      cfg.configs = cfgs;
+      return cfg;
+    }
+
+    static void assertAtleastNCompactionCommits(int minExpected, String 
tablePath, FileSystem fs) {
+      HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build();
+      HoodieTimeline timeline = 
meta.getActiveTimeline().getCommitTimeline().filterCompletedInstants();
+      LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants());
+      int numCompactionCommits = timeline.countInstants();
+      assertTrue(minExpected <= numCompactionCommits, "Got=" + 
numCompactionCommits + ", exp >=" + minExpected);
+    }
+
+    static void assertAtleastNDeltaCommits(int minExpected, String tablePath, 
FileSystem fs) {
+      HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build();
+      HoodieTimeline timeline = 
meta.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants();
+      LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants());
+      int numDeltaCommits = timeline.countInstants();
+      assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ", 
exp >=" + minExpected);
+    }
+
+    static void assertAtleastNCompactionCommitsAfterCommit(int minExpected, 
String lastSuccessfulCommit, String tablePath, FileSystem fs) {
+      HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build();
+      HoodieTimeline timeline = 
meta.getActiveTimeline().getCommitTimeline().findInstantsAfter(lastSuccessfulCommit).filterCompletedInstants();
+      LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants());
+      int numCompactionCommits = timeline.countInstants();
+      assertTrue(minExpected <= numCompactionCommits, "Got=" + 
numCompactionCommits + ", exp >=" + minExpected);
+    }
+
+    static void assertAtleastNDeltaCommitsAfterCommit(int minExpected, String 
lastSuccessfulCommit, String tablePath, FileSystem fs) {
+      HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build();
+      HoodieTimeline timeline = 
meta.reloadActiveTimeline().getDeltaCommitTimeline().findInstantsAfter(lastSuccessfulCommit).filterCompletedInstants();
+      LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants());
+      int numDeltaCommits = timeline.countInstants();
+      assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ", 
exp >=" + minExpected);
+    }
+
+    static String assertCommitMetadata(String expected, String tablePath, 
FileSystem fs, int totalCommits)
+        throws IOException {
+      HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build();
+      HoodieTimeline timeline = 
meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+      HoodieInstant lastInstant = timeline.lastInstant().get();
+      HoodieCommitMetadata commitMetadata =
+          
HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(lastInstant).get(), 
HoodieCommitMetadata.class);
+      assertEquals(totalCommits, timeline.countInstants());
+      assertEquals(expected, commitMetadata.getMetadata(CHECKPOINT_KEY));
+      return lastInstant.getTimestamp();
+    }
+
+    static void waitTillCondition(Function<Boolean, Boolean> condition, Future 
dsFuture, long timeoutInSecs) throws Exception {
+      Future<Boolean> res = Executors.newSingleThreadExecutor().submit(() -> {
+        boolean ret = false;
+        while (!ret && !dsFuture.isDone()) {
+          try {
+            Thread.sleep(3000);
+            ret = condition.apply(true);
+          } catch (Throwable error) {
+            LOG.warn("Got error :", error);
+            ret = false;
+          }
+        }
+        return ret;
+      });
+      res.get(timeoutInSecs, TimeUnit.SECONDS);
+    }
+
+    static void assertAtLeastNCommits(int minExpected, String tablePath, 
FileSystem fs) {
+      HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build();
+      HoodieTimeline timeline = 
meta.getActiveTimeline().filterCompletedInstants();
+      LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants());
+      int numDeltaCommits = timeline.countInstants();
+      assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ", 
exp >=" + minExpected);
+    }
+
+    static void assertAtLeastNReplaceCommits(int minExpected, String 
tablePath, FileSystem fs) {
+      HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).setLoadActiveTimelineOnLoad(true).build();
+      HoodieTimeline timeline = 
meta.getActiveTimeline().getCompletedReplaceTimeline();
+      LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants());
+      int numDeltaCommits = timeline.countInstants();
+      assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ", 
exp >=" + minExpected);
+    }
+
+    static void assertPendingIndexCommit(String tablePath, FileSystem fs) {
+      HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).setLoadActiveTimelineOnLoad(true).build();
+      HoodieTimeline timeline = 
meta.getActiveTimeline().getAllCommitsTimeline().filterPendingIndexTimeline();
+      LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants());
+      int numIndexCommits = timeline.countInstants();
+      assertEquals(1, numIndexCommits, "Got=" + numIndexCommits + ", exp=1");
+    }
+
+    static void assertCompletedIndexCommit(String tablePath, FileSystem fs) {
+      HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).setLoadActiveTimelineOnLoad(true).build();
+      HoodieTimeline timeline = 
meta.getActiveTimeline().getAllCommitsTimeline().filterCompletedIndexTimeline();
+      LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants());
+      int numIndexCommits = timeline.countInstants();
+      assertEquals(1, numIndexCommits, "Got=" + numIndexCommits + ", exp=1");
+    }
+
+    static void assertNoReplaceCommits(String tablePath, FileSystem fs) {
+      HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).setLoadActiveTimelineOnLoad(true).build();
+      HoodieTimeline timeline = 
meta.getActiveTimeline().getCompletedReplaceTimeline();
+      LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants());
+      int numDeltaCommits = timeline.countInstants();
+      assertEquals(0, numDeltaCommits, "Got=" + numDeltaCommits + ", exp =" + 
0);
+    }
+
+    static void assertAtLeastNReplaceRequests(int minExpected, String 
tablePath, FileSystem fs) {
+      HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).setLoadActiveTimelineOnLoad(true).build();
+      HoodieTimeline timeline = 
meta.getActiveTimeline().filterPendingReplaceTimeline();
+      LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants());
+      int numDeltaCommits = timeline.countInstants();
+      assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ", 
exp >=" + minExpected);
+    }
+  }
 }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 2a7db25647e..32af50eee64 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -77,7 +77,6 @@ import org.apache.hudi.utilities.config.SourceTestConfig;
 import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.sources.CsvDFSSource;
-import org.apache.hudi.utilities.sources.HoodieIncrSource;
 import org.apache.hudi.utilities.sources.InputBatch;
 import org.apache.hudi.utilities.sources.JdbcSource;
 import org.apache.hudi.utilities.sources.JsonKafkaSource;
@@ -111,7 +110,6 @@ import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.AnalysisException;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
-import org.apache.spark.sql.SQLContext;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.api.java.UDF4;
 import org.apache.spark.sql.functions;
@@ -183,6 +181,18 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(TestHoodieDeltaStreamer.class);
 
+  private void addRecordMerger(HoodieRecordType type, List<String> 
hoodieConfig) {
+    if (type == HoodieRecordType.SPARK) {
+      Map<String, String> opts = new HashMap<>();
+      opts.put(HoodieWriteConfig.RECORD_MERGER_IMPLS.key(), 
HoodieSparkRecordMerger.class.getName());
+      opts.put(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(),"parquet");
+      for (Map.Entry<String, String> entry : opts.entrySet()) {
+        hoodieConfig.add(String.format("%s=%s", entry.getKey(), 
entry.getValue()));
+      }
+      hudiOpts.putAll(opts);
+    }
+  }
+
   protected HoodieDeltaStreamer initialHoodieDeltaStreamer(String 
tableBasePath, int totalRecords, String asyncCluster, HoodieRecordType 
recordType) throws IOException {
     return initialHoodieDeltaStreamer(tableBasePath, totalRecords, 
asyncCluster, recordType, WriteOperationType.INSERT);
   }
@@ -195,7 +205,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
   protected HoodieDeltaStreamer initialHoodieDeltaStreamer(String 
tableBasePath, int totalRecords, String asyncCluster, HoodieRecordType 
recordType,
                                                              
WriteOperationType writeOperationType, Set<String> customConfigs) throws 
IOException {
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
writeOperationType);
-    TestHelpers.addRecordMerger(recordType, cfg.configs);
+    addRecordMerger(recordType, cfg.configs);
     cfg.continuousMode = true;
     cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
     cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "", 
asyncCluster, ""));
@@ -216,261 +226,11 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
                                                            Boolean 
retryLastFailedClusteringJob, HoodieRecordType recordType) {
     HoodieClusteringJob.Config scheduleClusteringConfig = 
buildHoodieClusteringUtilConfig(tableBasePath,
         clusteringInstantTime, runSchedule, scheduleAndExecute, 
retryLastFailedClusteringJob);
-    TestHelpers.addRecordMerger(recordType, scheduleClusteringConfig.configs);
+    addRecordMerger(recordType, scheduleClusteringConfig.configs);
     scheduleClusteringConfig.configs.addAll(getAllMultiWriterConfigs());
     return new HoodieClusteringJob(jsc, scheduleClusteringConfig);
   }
 
-  static class TestHelpers {
-
-    static HoodieDeltaStreamer.Config makeDropAllConfig(String basePath, 
WriteOperationType op) {
-      return makeConfig(basePath, op, 
Collections.singletonList(DropAllTransformer.class.getName()));
-    }
-
-    static HoodieDeltaStreamer.Config makeConfig(String basePath, 
WriteOperationType op) {
-      return makeConfig(basePath, op, 
Collections.singletonList(TripsWithDistanceTransformer.class.getName()));
-    }
-
-    static HoodieDeltaStreamer.Config makeConfig(String basePath, 
WriteOperationType op, List<String> transformerClassNames) {
-      return makeConfig(basePath, op, transformerClassNames, 
PROPS_FILENAME_TEST_SOURCE, false);
-    }
-
-    static HoodieDeltaStreamer.Config makeConfig(String basePath, 
WriteOperationType op, List<String> transformerClassNames,
-                                                 String propsFilename, boolean 
enableHiveSync) {
-      return makeConfig(basePath, op, transformerClassNames, propsFilename, 
enableHiveSync, true,
-          false, null, null);
-    }
-
-    static HoodieDeltaStreamer.Config makeConfig(String basePath, 
WriteOperationType op, List<String> transformerClassNames,
-                                                 String propsFilename, boolean 
enableHiveSync, boolean useSchemaProviderClass, boolean updatePayloadClass,
-                                                 String payloadClassName, 
String tableType) {
-      return makeConfig(basePath, op, TestDataSource.class.getName(), 
transformerClassNames, propsFilename, enableHiveSync,
-          useSchemaProviderClass, 1000, updatePayloadClass, payloadClassName, 
tableType, "timestamp", null);
-    }
-
-    static HoodieDeltaStreamer.Config makeConfig(String basePath, 
WriteOperationType op, String sourceClassName,
-                                                 List<String> 
transformerClassNames, String propsFilename, boolean enableHiveSync, boolean 
useSchemaProviderClass,
-                                                 int sourceLimit, boolean 
updatePayloadClass, String payloadClassName, String tableType, String 
sourceOrderingField,
-                                                 String checkpoint) {
-      return makeConfig(basePath, op, sourceClassName, transformerClassNames, 
propsFilename, enableHiveSync, useSchemaProviderClass, sourceLimit, 
updatePayloadClass, payloadClassName,
-          tableType, sourceOrderingField, checkpoint, false);
-    }
-
-    static HoodieDeltaStreamer.Config makeConfig(String basePath, 
WriteOperationType op, String sourceClassName,
-                                                 List<String> 
transformerClassNames, String propsFilename, boolean enableHiveSync, boolean 
useSchemaProviderClass,
-                                                 int sourceLimit, boolean 
updatePayloadClass, String payloadClassName, String tableType, String 
sourceOrderingField,
-                                                 String checkpoint, boolean 
allowCommitOnNoCheckpointChange) {
-      HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
-      cfg.targetBasePath = basePath;
-      cfg.targetTableName = "hoodie_trips";
-      cfg.tableType = tableType == null ? "COPY_ON_WRITE" : tableType;
-      cfg.sourceClassName = sourceClassName;
-      cfg.transformerClassNames = transformerClassNames;
-      cfg.operation = op;
-      cfg.enableHiveSync = enableHiveSync;
-      cfg.sourceOrderingField = sourceOrderingField;
-      cfg.propsFilePath = UtilitiesTestBase.basePath + "/" + propsFilename;
-      cfg.sourceLimit = sourceLimit;
-      cfg.checkpoint = checkpoint;
-      if (updatePayloadClass) {
-        cfg.payloadClassName = payloadClassName;
-      }
-      if (useSchemaProviderClass) {
-        cfg.schemaProviderClassName = defaultSchemaProviderClassName;
-      }
-      cfg.allowCommitOnNoCheckpointChange = allowCommitOnNoCheckpointChange;
-      return cfg;
-    }
-
-    static HoodieDeltaStreamer.Config makeConfigForHudiIncrSrc(String 
srcBasePath, String basePath, WriteOperationType op,
-                                                               boolean 
addReadLatestOnMissingCkpt, String schemaProviderClassName) {
-      HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
-      cfg.targetBasePath = basePath;
-      cfg.targetTableName = "hoodie_trips_copy";
-      cfg.tableType = "COPY_ON_WRITE";
-      cfg.sourceClassName = HoodieIncrSource.class.getName();
-      cfg.operation = op;
-      cfg.sourceOrderingField = "timestamp";
-      cfg.propsFilePath = UtilitiesTestBase.basePath + 
"/test-downstream-source.properties";
-      cfg.sourceLimit = 1000;
-      if (null != schemaProviderClassName) {
-        cfg.schemaProviderClassName = schemaProviderClassName;
-      }
-      List<String> cfgs = new ArrayList<>();
-      
cfgs.add("hoodie.deltastreamer.source.hoodieincr.read_latest_on_missing_ckpt=" 
+ addReadLatestOnMissingCkpt);
-      cfgs.add("hoodie.deltastreamer.source.hoodieincr.path=" + srcBasePath);
-      // No partition
-      
cfgs.add("hoodie.deltastreamer.source.hoodieincr.partition.fields=datestr");
-      cfg.configs = cfgs;
-      return cfg;
-    }
-
-    static void addRecordMerger(HoodieRecordType type, List<String> 
hoodieConfig) {
-      if (type == HoodieRecordType.SPARK) {
-        hoodieConfig.add(String.format("%s=%s", 
HoodieWriteConfig.RECORD_MERGER_IMPLS.key(), 
HoodieSparkRecordMerger.class.getName()));
-        hoodieConfig.add(String.format("%s=%s", 
HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(),"parquet"));
-      }
-    }
-
-    static void assertRecordCount(long expected, String tablePath, SQLContext 
sqlContext) {
-      sqlContext.clearCache();
-      long recordCount = 
sqlContext.read().format("org.apache.hudi").load(tablePath).count();
-      assertEquals(expected, recordCount);
-    }
-
-    static Map<String, Long> getPartitionRecordCount(String basePath, 
SQLContext sqlContext) {
-      sqlContext.clearCache();
-      List<Row> rows = 
sqlContext.read().format("org.apache.hudi").load(basePath).groupBy(HoodieRecord.PARTITION_PATH_METADATA_FIELD).count().collectAsList();
-      Map<String, Long> partitionRecordCount = new HashMap<>();
-      rows.stream().forEach(row -> partitionRecordCount.put(row.getString(0), 
row.getLong(1)));
-      return partitionRecordCount;
-    }
-
-    static void assertNoPartitionMatch(String basePath, SQLContext sqlContext, 
String partitionToValidate) {
-      sqlContext.clearCache();
-      assertEquals(0, 
sqlContext.read().format("org.apache.hudi").load(basePath).filter(HoodieRecord.PARTITION_PATH_METADATA_FIELD
 + " = " + partitionToValidate).count());
-    }
-
-    static void assertDistinctRecordCount(long expected, String tablePath, 
SQLContext sqlContext) {
-      sqlContext.clearCache();
-      long recordCount = 
sqlContext.read().format("org.apache.hudi").load(tablePath).select("_hoodie_record_key").distinct().count();
-      assertEquals(expected, recordCount);
-    }
-
-    static List<Row> countsPerCommit(String tablePath, SQLContext sqlContext) {
-      sqlContext.clearCache();
-      List<Row> rows = 
sqlContext.read().format("org.apache.hudi").load(tablePath)
-          .groupBy("_hoodie_commit_time").count()
-          .sort("_hoodie_commit_time").collectAsList();
-      return rows;
-    }
-
-    static void assertDistanceCount(long expected, String tablePath, 
SQLContext sqlContext) {
-      sqlContext.clearCache();
-      
sqlContext.read().format("org.apache.hudi").load(tablePath).registerTempTable("tmp_trips");
-      long recordCount =
-          sqlContext.sql("select * from tmp_trips where haversine_distance is 
not NULL").count();
-      assertEquals(expected, recordCount);
-    }
-
-    static void assertDistanceCountWithExactValue(long expected, String 
tablePath, SQLContext sqlContext) {
-      sqlContext.clearCache();
-      
sqlContext.read().format("org.apache.hudi").load(tablePath).registerTempTable("tmp_trips");
-      long recordCount =
-          sqlContext.sql("select * from tmp_trips where haversine_distance = 
1.0").count();
-      assertEquals(expected, recordCount);
-    }
-
-    static void assertAtleastNCompactionCommits(int minExpected, String 
tablePath, FileSystem fs) {
-      HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build();
-      HoodieTimeline timeline = 
meta.getActiveTimeline().getCommitTimeline().filterCompletedInstants();
-      LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants());
-      int numCompactionCommits = timeline.countInstants();
-      assertTrue(minExpected <= numCompactionCommits, "Got=" + 
numCompactionCommits + ", exp >=" + minExpected);
-    }
-
-    static void assertAtleastNDeltaCommits(int minExpected, String tablePath, 
FileSystem fs) {
-      HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build();
-      HoodieTimeline timeline = 
meta.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants();
-      LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants());
-      int numDeltaCommits = timeline.countInstants();
-      assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ", 
exp >=" + minExpected);
-    }
-
-    static void assertAtleastNCompactionCommitsAfterCommit(int minExpected, 
String lastSuccessfulCommit, String tablePath, FileSystem fs) {
-      HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build();
-      HoodieTimeline timeline = 
meta.getActiveTimeline().getCommitTimeline().findInstantsAfter(lastSuccessfulCommit).filterCompletedInstants();
-      LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants());
-      int numCompactionCommits = timeline.countInstants();
-      assertTrue(minExpected <= numCompactionCommits, "Got=" + 
numCompactionCommits + ", exp >=" + minExpected);
-    }
-
-    static void assertAtleastNDeltaCommitsAfterCommit(int minExpected, String 
lastSuccessfulCommit, String tablePath, FileSystem fs) {
-      HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build();
-      HoodieTimeline timeline = 
meta.reloadActiveTimeline().getDeltaCommitTimeline().findInstantsAfter(lastSuccessfulCommit).filterCompletedInstants();
-      LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants());
-      int numDeltaCommits = timeline.countInstants();
-      assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ", 
exp >=" + minExpected);
-    }
-
-    static String assertCommitMetadata(String expected, String tablePath, 
FileSystem fs, int totalCommits)
-        throws IOException {
-      HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build();
-      HoodieTimeline timeline = 
meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
-      HoodieInstant lastInstant = timeline.lastInstant().get();
-      HoodieCommitMetadata commitMetadata =
-          
HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(lastInstant).get(), 
HoodieCommitMetadata.class);
-      assertEquals(totalCommits, timeline.countInstants());
-      assertEquals(expected, commitMetadata.getMetadata(CHECKPOINT_KEY));
-      return lastInstant.getTimestamp();
-    }
-
-    static void waitTillCondition(Function<Boolean, Boolean> condition, Future 
dsFuture, long timeoutInSecs) throws Exception {
-      Future<Boolean> res = Executors.newSingleThreadExecutor().submit(() -> {
-        boolean ret = false;
-        while (!ret && !dsFuture.isDone()) {
-          try {
-            Thread.sleep(3000);
-            ret = condition.apply(true);
-          } catch (Throwable error) {
-            LOG.warn("Got error :", error);
-            ret = false;
-          }
-        }
-        return ret;
-      });
-      res.get(timeoutInSecs, TimeUnit.SECONDS);
-    }
-
-    static void assertAtLeastNCommits(int minExpected, String tablePath, 
FileSystem fs) {
-      HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build();
-      HoodieTimeline timeline = 
meta.getActiveTimeline().filterCompletedInstants();
-      LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants());
-      int numDeltaCommits = timeline.countInstants();
-      assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ", 
exp >=" + minExpected);
-    }
-
-    static void assertAtLeastNReplaceCommits(int minExpected, String 
tablePath, FileSystem fs) {
-      HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).setLoadActiveTimelineOnLoad(true).build();
-      HoodieTimeline timeline = 
meta.getActiveTimeline().getCompletedReplaceTimeline();
-      LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants());
-      int numDeltaCommits = timeline.countInstants();
-      assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ", 
exp >=" + minExpected);
-    }
-
-    static void assertPendingIndexCommit(String tablePath, FileSystem fs) {
-      HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).setLoadActiveTimelineOnLoad(true).build();
-      HoodieTimeline timeline = 
meta.getActiveTimeline().getAllCommitsTimeline().filterPendingIndexTimeline();
-      LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants());
-      int numIndexCommits = timeline.countInstants();
-      assertEquals(1, numIndexCommits, "Got=" + numIndexCommits + ", exp=1");
-    }
-
-    static void assertCompletedIndexCommit(String tablePath, FileSystem fs) {
-      HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).setLoadActiveTimelineOnLoad(true).build();
-      HoodieTimeline timeline = 
meta.getActiveTimeline().getAllCommitsTimeline().filterCompletedIndexTimeline();
-      LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants());
-      int numIndexCommits = timeline.countInstants();
-      assertEquals(1, numIndexCommits, "Got=" + numIndexCommits + ", exp=1");
-    }
-
-    static void assertNoReplaceCommits(String tablePath, FileSystem fs) {
-      HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).setLoadActiveTimelineOnLoad(true).build();
-      HoodieTimeline timeline = 
meta.getActiveTimeline().getCompletedReplaceTimeline();
-      LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants());
-      int numDeltaCommits = timeline.countInstants();
-      assertEquals(0, numDeltaCommits, "Got=" + numDeltaCommits + ", exp =" + 
0);
-    }
-
-    static void assertAtLeastNReplaceRequests(int minExpected, String 
tablePath, FileSystem fs) {
-      HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).setLoadActiveTimelineOnLoad(true).build();
-      HoodieTimeline timeline = 
meta.getActiveTimeline().filterPendingReplaceTimeline();
-      LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants());
-      int numDeltaCommits = timeline.countInstants();
-      assertTrue(minExpected <= numDeltaCommits, "Got=" + numDeltaCommits + ", 
exp >=" + minExpected);
-    }
-  }
-
   @Test
   public void testProps() {
     TypedProperties props =
@@ -696,7 +456,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
 
     // Initial bulk insert
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.BULK_INSERT);
-    TestHelpers.addRecordMerger(recordType, cfg.configs);
+    addRecordMerger(recordType, cfg.configs);
     syncAndAssertRecordCount(cfg, 1000,  tableBasePath,  "00000",  1);
 
     // No new data => no commits.
@@ -707,7 +467,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     cfg.sourceLimit = 2000;
     cfg.operation = WriteOperationType.UPSERT;
     syncAndAssertRecordCount(cfg,1950, tableBasePath, "00001", 2);
-    List<Row> counts = TestHelpers.countsPerCommit(tableBasePath, sqlContext);
+    List<Row> counts = countsPerCommit(tableBasePath, sqlContext);
     assertEquals(1950, counts.stream().mapToLong(entry -> 
entry.getLong(1)).sum());
 
     // Perform bootstrap with tableBasePath as source
@@ -732,7 +492,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     LOG.info("Schema :");
     res.printSchema();
 
-    TestHelpers.assertRecordCount(1950, newDatasetBasePath, sqlContext);
+    assertRecordCount(1950, newDatasetBasePath, sqlContext);
     res.registerTempTable("bootstrapped");
     assertEquals(1950, sqlContext.sql("select distinct _hoodie_record_key from 
bootstrapped").count());
     // NOTE: To fetch record's count Spark will optimize the query fetching 
minimal possible amount
@@ -767,7 +527,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     cfg.operation = WriteOperationType.UPSERT;
     cfg.configs.add(HoodieTableConfig.RECORDKEY_FIELDS.key() + 
"=differentval");
     assertThrows(HoodieException.class, () -> 
syncAndAssertRecordCount(cfg,1000,tableBasePath,"00000",1));
-    List<Row> counts = TestHelpers.countsPerCommit(tableBasePath, sqlContext);
+    List<Row> counts = countsPerCommit(tableBasePath, sqlContext);
     assertEquals(1000, counts.stream().mapToLong(entry -> 
entry.getLong(1)).sum());
 
 
@@ -776,14 +536,14 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     newCfg.sourceLimit = 2000;
     newCfg.operation = WriteOperationType.UPSERT;
     syncAndAssertRecordCount(newCfg, 1950, tableBasePath, "00001", 2);
-    List<Row> counts2 = TestHelpers.countsPerCommit(tableBasePath, sqlContext);
+    List<Row> counts2 = countsPerCommit(tableBasePath, sqlContext);
     assertEquals(1950, counts2.stream().mapToLong(entry -> 
entry.getLong(1)).sum());
   }
 
   private void syncAndAssertRecordCount(HoodieDeltaStreamer.Config cfg, 
Integer expected, String tableBasePath, String metadata, Integer totalCommits) 
throws Exception {
     new HoodieDeltaStreamer(cfg, jsc).sync();
-    TestHelpers.assertRecordCount(expected, tableBasePath, sqlContext);
-    TestHelpers.assertDistanceCount(expected, tableBasePath, sqlContext);
+    assertRecordCount(expected, tableBasePath, sqlContext);
+    assertDistanceCount(expected, tableBasePath, sqlContext);
     TestHelpers.assertCommitMetadata(metadata, tableBasePath, fs, 
totalCommits);
   }
 
@@ -796,7 +556,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     // Insert data produced with Schema A, pass Schema A
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.INSERT, 
Collections.singletonList(TestIdentityTransformer.class.getName()),
         PROPS_FILENAME_TEST_SOURCE, false, true, false, null, tableType);
-    TestHelpers.addRecordMerger(recordType, cfg.configs);
+    addRecordMerger(recordType, cfg.configs);
     cfg.configs.add("hoodie.deltastreamer.schemaprovider.source.schema.file=" 
+ basePath + "/source.avsc");
     cfg.configs.add("hoodie.deltastreamer.schemaprovider.target.schema.file=" 
+ basePath + "/source.avsc");
     cfg.configs.add(DataSourceWriteOptions.RECONCILE_SCHEMA().key() + "=true");
@@ -804,13 +564,13 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
       
cfg.configs.add(HoodieSchemaProviderConfig.SPARK_AVRO_POST_PROCESSOR_ENABLE.key()
 + "=false");
     }
     new HoodieDeltaStreamer(cfg, jsc).sync();
-    TestHelpers.assertRecordCount(1000, tableBasePath, sqlContext);
+    assertRecordCount(1000, tableBasePath, sqlContext);
     TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
 
     // Upsert data produced with Schema B, pass Schema B
     cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, 
Collections.singletonList(TripsWithEvolvedOptionalFieldTransformer.class.getName()),
         PROPS_FILENAME_TEST_SOURCE, false, true, false, null, tableType);
-    TestHelpers.addRecordMerger(recordType, cfg.configs);
+    addRecordMerger(recordType, cfg.configs);
     cfg.configs.add("hoodie.deltastreamer.schemaprovider.source.schema.file=" 
+ basePath + "/source.avsc");
     cfg.configs.add("hoodie.deltastreamer.schemaprovider.target.schema.file=" 
+ basePath + "/source_evolved.avsc");
     cfg.configs.add(DataSourceWriteOptions.RECONCILE_SCHEMA().key() + "=true");
@@ -819,9 +579,9 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     }
     new HoodieDeltaStreamer(cfg, jsc).sync();
     // out of 1000 new records, 500 are inserts, 450 are updates and 50 are 
deletes.
-    TestHelpers.assertRecordCount(1450, tableBasePath, sqlContext);
+    assertRecordCount(1450, tableBasePath, sqlContext);
     TestHelpers.assertCommitMetadata("00001", tableBasePath, fs, 2);
-    List<Row> counts = TestHelpers.countsPerCommit(tableBasePath, sqlContext);
+    List<Row> counts = countsPerCommit(tableBasePath, sqlContext);
     assertEquals(1450, counts.stream().mapToLong(entry -> 
entry.getLong(1)).sum());
 
     
sqlContext.read().format("org.apache.hudi").load(tableBasePath).createOrReplaceTempView("tmp_trips");
@@ -835,7 +595,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     }
     cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, 
Collections.singletonList(TestIdentityTransformer.class.getName()),
         PROPS_FILENAME_TEST_SOURCE, false, true, false, null, tableType);
-    TestHelpers.addRecordMerger(recordType, cfg.configs);
+    addRecordMerger(recordType, cfg.configs);
     cfg.configs.add("hoodie.deltastreamer.schemaprovider.source.schema.file=" 
+ basePath + "/source.avsc");
     if (useUserProvidedSchema) {
       
cfg.configs.add("hoodie.deltastreamer.schemaprovider.target.schema.file=" + 
basePath + "/source_evolved.avsc");
@@ -846,9 +606,9 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     cfg.configs.add(DataSourceWriteOptions.RECONCILE_SCHEMA().key() + "=true");
     new HoodieDeltaStreamer(cfg, jsc).sync();
     // again, 1000 new records, 500 are inserts, 450 are updates and 50 are 
deletes.
-    TestHelpers.assertRecordCount(1900, tableBasePath, sqlContext);
+    assertRecordCount(1900, tableBasePath, sqlContext);
     TestHelpers.assertCommitMetadata("00002", tableBasePath, fs, 3);
-    counts = TestHelpers.countsPerCommit(tableBasePath, sqlContext);
+    counts = countsPerCommit(tableBasePath, sqlContext);
     assertEquals(1900, counts.stream().mapToLong(entry -> 
entry.getLong(1)).sum());
 
     TableSchemaResolver tableSchemaResolver = new 
TableSchemaResolver(HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(fs.getConf()).build());
@@ -882,14 +642,14 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
   public void testUpsertsCOW_ContinuousModeDisabled(HoodieRecordType 
recordType) throws Exception {
     String tableBasePath = basePath  + "/non_continuous_cow";
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.UPSERT);
-    TestHelpers.addRecordMerger(recordType, cfg.configs);
+    addRecordMerger(recordType, cfg.configs);
     cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
     cfg.configs.add(String.format("%s=%s", TURN_METRICS_ON.key(), "true"));
     cfg.configs.add(String.format("%s=%s", METRICS_REPORTER_TYPE_VALUE.key(), 
"CONSOLE"));
     cfg.continuousMode = false;
     HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
     ds.sync();
-    TestHelpers.assertRecordCount(SQL_SOURCE_NUM_RECORDS, tableBasePath, 
sqlContext);
+    assertRecordCount(SQL_SOURCE_NUM_RECORDS, tableBasePath, sqlContext);
     assertFalse(Metrics.isInitialized(tableBasePath), "Metrics should be 
shutdown");
     UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
   }
@@ -913,14 +673,14 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
   public void testUpsertsMOR_ContinuousModeDisabled(HoodieRecordType 
recordType) throws Exception {
     String tableBasePath = basePath  + "/non_continuous_mor";
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.UPSERT);
-    TestHelpers.addRecordMerger(recordType, cfg.configs);
+    addRecordMerger(recordType, cfg.configs);
     cfg.tableType = HoodieTableType.MERGE_ON_READ.name();
     cfg.configs.add(String.format("%s=%s", TURN_METRICS_ON.key(), "true"));
     cfg.configs.add(String.format("%s=%s", METRICS_REPORTER_TYPE_VALUE.key(), 
"CONSOLE"));
     cfg.continuousMode = false;
     HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
     ds.sync();
-    TestHelpers.assertRecordCount(SQL_SOURCE_NUM_RECORDS, tableBasePath, 
sqlContext);
+    assertRecordCount(SQL_SOURCE_NUM_RECORDS, tableBasePath, sqlContext);
     assertFalse(Metrics.isInitialized(tableBasePath), "Metrics should be 
shutdown");
     UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
   }
@@ -935,7 +695,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     int totalRecords = 3000;
     // Initial bulk insert
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.UPSERT);
-    TestHelpers.addRecordMerger(recordType, cfg.configs);
+    addRecordMerger(recordType, cfg.configs);
     cfg.continuousMode = true;
     if (testShutdownGracefully) {
       cfg.postWriteTerminationStrategyClass = 
NoNewDataTerminationStrategy.class.getName();
@@ -951,8 +711,8 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
       } else {
         TestHelpers.assertAtleastNCompactionCommits(5, tableBasePath, fs);
       }
-      TestHelpers.assertRecordCount(totalRecords, tableBasePath, sqlContext);
-      TestHelpers.assertDistanceCount(totalRecords, tableBasePath, sqlContext);
+      assertRecordCount(totalRecords, tableBasePath, sqlContext);
+      assertDistanceCount(totalRecords, tableBasePath, sqlContext);
       if (testShutdownGracefully) {
         TestDataSource.returnEmptyBatch = true;
       }
@@ -1019,7 +779,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
 
     // Initial bulk insert
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.UPSERT);
-    TestHelpers.addRecordMerger(recordType, cfg.configs);
+    addRecordMerger(recordType, cfg.configs);
     cfg.continuousMode = true;
     cfg.tableType = HoodieTableType.MERGE_ON_READ.name();
     cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "true", 
"2", "", ""));
@@ -1085,7 +845,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     // sync twice and trigger compaction
     HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(deltaCfg, jsc);
     deltaStreamer.sync();
-    TestHelpers.assertRecordCount(parquetRecordsCount, tableBasePath, 
sqlContext);
+    assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
     prepareParquetDFSUpdates(100, PARQUET_SOURCE_ROOT, "2.parquet", false, 
null, null, dataGenerator, "001");
     deltaStreamer.sync();
     TestHelpers.assertAtleastNDeltaCommits(2, tableBasePath, fs);
@@ -1118,7 +878,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     // Step 1 : Prepare and insert data without archival and cleaner.
     // Make sure that there are 6 commits including 2 replacecommits completed.
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.INSERT);
-    TestHelpers.addRecordMerger(recordType, cfg.configs);
+    addRecordMerger(recordType, cfg.configs);
     cfg.continuousMode = true;
     cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
     cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "true", 
"2", "", ""));
@@ -1186,7 +946,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
       configs.add(String.format("%s=%s", 
HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(),
           InProcessLockProvider.class.getName()));
     }
-    TestHelpers.addRecordMerger(recordType, configs);
+    addRecordMerger(recordType, configs);
     cfg.configs = configs;
     cfg.continuousMode = false;
     // timeline as of now. no cleaner and archival kicked in.
@@ -1361,7 +1121,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
 
     // Initial bulk insert
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.INSERT);
-    TestHelpers.addRecordMerger(recordType, cfg.configs);
+    addRecordMerger(recordType, cfg.configs);
     cfg.continuousMode = true;
     cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
     cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "", 
"true", "3"));
@@ -1373,7 +1133,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     // There should be 4 commits, one of which should be a replace commit
     TestHelpers.assertAtLeastNCommits(4, tableBasePath, fs);
     TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, fs);
-    TestHelpers.assertDistinctRecordCount(totalRecords, tableBasePath, 
sqlContext);
+    assertDistinctRecordCount(totalRecords, tableBasePath, sqlContext);
     UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
   }
 
@@ -1392,7 +1152,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
 
     // Initial bulk insert
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.UPSERT);
-    TestHelpers.addRecordMerger(recordType, cfg.configs);
+    addRecordMerger(recordType, cfg.configs);
     cfg.continuousMode = true;
     cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
     cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "", 
"true", "3"));
@@ -1404,7 +1164,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     // There should be 4 commits, one of which should be a replace commit
     TestHelpers.assertAtLeastNCommits(4, tableBasePath, fs);
     TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, fs);
-    TestHelpers.assertDistinctRecordCount(1900, tableBasePath, sqlContext);
+    assertDistinctRecordCount(1900, tableBasePath, sqlContext);
     UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
   }
 
@@ -1418,7 +1178,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
 
     // Initial bulk insert
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.INSERT);
-    TestHelpers.addRecordMerger(recordType, cfg.configs);
+    addRecordMerger(recordType, cfg.configs);
     cfg.continuousMode = true;
     cfg.tableType = HoodieTableType.MERGE_ON_READ.name();
     cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "", 
"true", "3"));
@@ -1431,7 +1191,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     // There should be 4 commits, one of which should be a replace commit
     TestHelpers.assertAtLeastNCommits(4, tableBasePath, fs);
     TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, fs);
-    TestHelpers.assertDistinctRecordCount(totalRecords, tableBasePath, 
sqlContext);
+    assertDistinctRecordCount(totalRecords, tableBasePath, sqlContext);
     UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
   }
 
@@ -1443,7 +1203,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     // ingest data
     int totalRecords = 3000;
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.INSERT);
-    TestHelpers.addRecordMerger(recordType, cfg.configs);
+    addRecordMerger(recordType, cfg.configs);
     cfg.continuousMode = false;
     cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
     cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "false", 
"0", "false", "0"));
@@ -1548,32 +1308,32 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     // Initial bulk insert to ingest to first hudi table
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.BULK_INSERT,
         Collections.singletonList(SqlQueryBasedTransformer.class.getName()), 
PROPS_FILENAME_TEST_SOURCE, true);
-    TestHelpers.addRecordMerger(recordType, cfg.configs);
+    addRecordMerger(recordType, cfg.configs);
     // NOTE: We should not have need to set below config, 'datestr' should 
have assumed date partitioning
     
cfg.configs.add("hoodie.datasource.hive_sync.partition_fields=year,month,day");
     new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync();
-    TestHelpers.assertRecordCount(1000, tableBasePath, sqlContext);
-    TestHelpers.assertDistanceCount(1000, tableBasePath, sqlContext);
-    TestHelpers.assertDistanceCountWithExactValue(1000, tableBasePath, 
sqlContext);
+    assertRecordCount(1000, tableBasePath, sqlContext);
+    assertDistanceCount(1000, tableBasePath, sqlContext);
+    assertDistanceCountWithExactValue(1000, tableBasePath, sqlContext);
     String lastInstantForUpstreamTable = 
TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
 
     // Now incrementally pull from the above hudi table and ingest to second 
table
     HoodieDeltaStreamer.Config downstreamCfg =
         TestHelpers.makeConfigForHudiIncrSrc(tableBasePath, 
downstreamTableBasePath, WriteOperationType.BULK_INSERT,
             true, null);
-    TestHelpers.addRecordMerger(recordType, downstreamCfg.configs);
+    addRecordMerger(recordType, downstreamCfg.configs);
     new HoodieDeltaStreamer(downstreamCfg, jsc, fs, 
hiveServer.getHiveConf()).sync();
-    TestHelpers.assertRecordCount(1000, downstreamTableBasePath, sqlContext);
-    TestHelpers.assertDistanceCount(1000, downstreamTableBasePath, sqlContext);
-    TestHelpers.assertDistanceCountWithExactValue(1000, 
downstreamTableBasePath, sqlContext);
+    assertRecordCount(1000, downstreamTableBasePath, sqlContext);
+    assertDistanceCount(1000, downstreamTableBasePath, sqlContext);
+    assertDistanceCountWithExactValue(1000, downstreamTableBasePath, 
sqlContext);
     TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable, 
downstreamTableBasePath, fs, 1);
 
     // No new data => no commits for upstream table
     cfg.sourceLimit = 0;
     new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync();
-    TestHelpers.assertRecordCount(1000, tableBasePath, sqlContext);
-    TestHelpers.assertDistanceCount(1000, tableBasePath, sqlContext);
-    TestHelpers.assertDistanceCountWithExactValue(1000, tableBasePath, 
sqlContext);
+    assertRecordCount(1000, tableBasePath, sqlContext);
+    assertDistanceCount(1000, tableBasePath, sqlContext);
+    assertDistanceCountWithExactValue(1000, tableBasePath, sqlContext);
     TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
 
     // with no change in upstream table, no change in downstream too when 
pulled.
@@ -1581,35 +1341,35 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
         TestHelpers.makeConfigForHudiIncrSrc(tableBasePath, 
downstreamTableBasePath,
             WriteOperationType.BULK_INSERT, true, 
DummySchemaProvider.class.getName());
     new HoodieDeltaStreamer(downstreamCfg1, jsc).sync();
-    TestHelpers.assertRecordCount(1000, downstreamTableBasePath, sqlContext);
-    TestHelpers.assertDistanceCount(1000, downstreamTableBasePath, sqlContext);
-    TestHelpers.assertDistanceCountWithExactValue(1000, 
downstreamTableBasePath, sqlContext);
+    assertRecordCount(1000, downstreamTableBasePath, sqlContext);
+    assertDistanceCount(1000, downstreamTableBasePath, sqlContext);
+    assertDistanceCountWithExactValue(1000, downstreamTableBasePath, 
sqlContext);
     TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable, 
downstreamTableBasePath, fs, 1);
 
     // upsert() #1 on upstream hudi table
     cfg.sourceLimit = 2000;
     cfg.operation = WriteOperationType.UPSERT;
     new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync();
-    TestHelpers.assertRecordCount(1950, tableBasePath, sqlContext);
-    TestHelpers.assertDistanceCount(1950, tableBasePath, sqlContext);
-    TestHelpers.assertDistanceCountWithExactValue(1950, tableBasePath, 
sqlContext);
+    assertRecordCount(1950, tableBasePath, sqlContext);
+    assertDistanceCount(1950, tableBasePath, sqlContext);
+    assertDistanceCountWithExactValue(1950, tableBasePath, sqlContext);
     lastInstantForUpstreamTable = TestHelpers.assertCommitMetadata("00001", 
tableBasePath, fs, 2);
-    List<Row> counts = TestHelpers.countsPerCommit(tableBasePath, sqlContext);
+    List<Row> counts = countsPerCommit(tableBasePath, sqlContext);
     assertEquals(1950, counts.stream().mapToLong(entry -> 
entry.getLong(1)).sum());
 
     // Incrementally pull changes in upstream hudi table and apply to 
downstream table
     downstreamCfg =
         TestHelpers.makeConfigForHudiIncrSrc(tableBasePath, 
downstreamTableBasePath, WriteOperationType.UPSERT,
             false, null);
-    TestHelpers.addRecordMerger(recordType, downstreamCfg.configs);
+    addRecordMerger(recordType, downstreamCfg.configs);
     downstreamCfg.sourceLimit = 2000;
     new HoodieDeltaStreamer(downstreamCfg, jsc).sync();
-    TestHelpers.assertRecordCount(2000, downstreamTableBasePath, sqlContext);
-    TestHelpers.assertDistanceCount(2000, downstreamTableBasePath, sqlContext);
-    TestHelpers.assertDistanceCountWithExactValue(2000, 
downstreamTableBasePath, sqlContext);
+    assertRecordCount(2000, downstreamTableBasePath, sqlContext);
+    assertDistanceCount(2000, downstreamTableBasePath, sqlContext);
+    assertDistanceCountWithExactValue(2000, downstreamTableBasePath, 
sqlContext);
     String finalInstant =
         TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable, 
downstreamTableBasePath, fs, 2);
-    counts = TestHelpers.countsPerCommit(downstreamTableBasePath, sqlContext);
+    counts = countsPerCommit(downstreamTableBasePath, sqlContext);
     assertEquals(2000, counts.stream().mapToLong(entry -> 
entry.getLong(1)).sum());
 
     // Test Hive integration
@@ -1648,7 +1408,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
         Collections.singletonList(SqlQueryBasedTransformer.class.getName()), 
PROPS_FILENAME_TEST_SOURCE, false,
         true, false, null, "MERGE_ON_READ");
     new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync();
-    TestHelpers.assertRecordCount(1000, dataSetBasePath, sqlContext);
+    assertRecordCount(1000, dataSetBasePath, sqlContext);
 
     //now create one more deltaStreamer instance and update payload class
     cfg = TestHelpers.makeConfig(dataSetBasePath, 
WriteOperationType.BULK_INSERT,
@@ -1674,7 +1434,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
           Collections.singletonList(SqlQueryBasedTransformer.class.getName()), 
PROPS_FILENAME_TEST_SOURCE, false,
           true, true, PartialUpdateAvroPayload.class.getName(), 
"MERGE_ON_READ");
     new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync();
-    TestHelpers.assertRecordCount(1000, dataSetBasePath, sqlContext);
+    assertRecordCount(1000, dataSetBasePath, sqlContext);
 
     //now assert that hoodie.properties file now has updated payload class name
     Properties props = new Properties();
@@ -1693,7 +1453,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
         Collections.singletonList(SqlQueryBasedTransformer.class.getName()), 
PROPS_FILENAME_TEST_SOURCE, false,
         true, false, null, null);
     new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync();
-    TestHelpers.assertRecordCount(1000, dataSetBasePath, sqlContext);
+    assertRecordCount(1000, dataSetBasePath, sqlContext);
 
     //now create one more deltaStreamer instance and update payload class
     cfg = TestHelpers.makeConfig(dataSetBasePath, 
WriteOperationType.BULK_INSERT,
@@ -1719,9 +1479,9 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
 
     // Initial bulk insert
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.BULK_INSERT);
-    TestHelpers.addRecordMerger(recordType, cfg.configs);
+    addRecordMerger(recordType, cfg.configs);
     new HoodieDeltaStreamer(cfg, jsc).sync();
-    TestHelpers.assertRecordCount(1000, tableBasePath, sqlContext);
+    assertRecordCount(1000, tableBasePath, sqlContext);
     TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
 
     // Generate the same 1000 records + 1000 new ones for upsert
@@ -1729,10 +1489,10 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     cfg.sourceLimit = 2000;
     cfg.operation = WriteOperationType.INSERT;
     new HoodieDeltaStreamer(cfg, jsc).sync();
-    TestHelpers.assertRecordCount(2000, tableBasePath, sqlContext);
+    assertRecordCount(2000, tableBasePath, sqlContext);
     TestHelpers.assertCommitMetadata("00001", tableBasePath, fs, 2);
     // 1000 records for commit 00000 & 1000 for commit 00001
-    List<Row> counts = TestHelpers.countsPerCommit(tableBasePath, sqlContext);
+    List<Row> counts = countsPerCommit(tableBasePath, sqlContext);
     assertEquals(1000, counts.get(0).getLong(1));
     assertEquals(1000, counts.get(1).getLong(1));
 
@@ -1740,7 +1500,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     HoodieTableMetaClient mClient = 
HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(tableBasePath).setLoadActiveTimelineOnLoad(true).build();
     HoodieInstant lastFinished = 
mClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get();
     HoodieDeltaStreamer.Config cfg2 = 
TestHelpers.makeDropAllConfig(tableBasePath, WriteOperationType.UPSERT);
-    TestHelpers.addRecordMerger(recordType, cfg2.configs);
+    addRecordMerger(recordType, cfg2.configs);
     cfg2.filterDupes = false;
     cfg2.sourceLimit = 2000;
     cfg2.operation = WriteOperationType.UPSERT;
@@ -1817,13 +1577,13 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
             transformerClassNames, PROPS_FILENAME_TEST_PARQUET, false,
             useSchemaProvider, 100000, false, null, null, "timestamp", null), 
jsc);
     deltaStreamer.sync();
-    TestHelpers.assertRecordCount(parquetRecordsCount, tableBasePath, 
sqlContext);
+    assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
 
     if (testEmptyBatch) {
       prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT, "2.parquet", false, 
null, null);
       deltaStreamer.sync();
       // since we mimic'ed empty batch, total records should be same as first 
sync().
-      TestHelpers.assertRecordCount(parquetRecordsCount, tableBasePath, 
sqlContext);
+      assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
       HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(jsc.hadoopConfiguration()).build();
 
       // validate table schema fetches valid schema from last but one commit.
@@ -1834,7 +1594,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     // proceed w/ non empty batch.
     prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT, "3.parquet", false, null, 
null);
     deltaStreamer.sync();
-    TestHelpers.assertRecordCount(parquetRecordsCount + 100, tableBasePath, 
sqlContext);
+    assertRecordCount(parquetRecordsCount + 100, tableBasePath, sqlContext);
     // validate commit metadata for all completed commits to have valid schema 
in extra metadata.
     HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(jsc.hadoopConfiguration()).build();
     
metaClient.reloadActiveTimeline().getCommitsTimeline().filterCompletedInstants().getInstants().forEach(entry
 -> assertValidSchemaInCommitMetadata(entry, metaClient));
@@ -1875,7 +1635,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
             transformerClassNames, PROPS_FILENAME_TEST_ORC, false,
             useSchemaProvider, 100000, false, null, null, "timestamp", null), 
jsc);
     deltaStreamer.sync();
-    TestHelpers.assertRecordCount(ORC_NUM_RECORDS, tableBasePath, sqlContext);
+    assertRecordCount(ORC_NUM_RECORDS, tableBasePath, sqlContext);
     testNum++;
   }
 
@@ -1925,7 +1685,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
             Collections.emptyList(), PROPS_FILENAME_TEST_PARQUET, false,
             true, 100000, false, null, null, "timestamp", null), jsc);
     deltaStreamer.sync();
-    TestHelpers.assertRecordCount(parquetRecords, tableBasePath, sqlContext);
+    assertRecordCount(parquetRecords, tableBasePath, sqlContext);
     deltaStreamer.shutdownGracefully();
 
     // prep json kafka source
@@ -1940,13 +1700,13 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     deltaStreamer.sync();
     // if auto reset value is set to LATEST, this all kafka records so far may 
not be synced.
     int totalExpectedRecords = parquetRecords + ((autoResetToLatest) ? 0 : 
JSON_KAFKA_NUM_RECORDS);
-    TestHelpers.assertRecordCount(totalExpectedRecords, tableBasePath, 
sqlContext);
+    assertRecordCount(totalExpectedRecords, tableBasePath, sqlContext);
 
     // verify 2nd batch to test LATEST auto reset value.
     prepareJsonKafkaDFSFiles(20, false, topicName);
     totalExpectedRecords += 20;
     deltaStreamer.sync();
-    TestHelpers.assertRecordCount(totalExpectedRecords, tableBasePath, 
sqlContext);
+    assertRecordCount(totalExpectedRecords, tableBasePath, sqlContext);
     testNum++;
   }
 
@@ -1961,14 +1721,14 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
             Collections.emptyList(), PROPS_FILENAME_TEST_JSON_KAFKA, false,
             true, 100000, false, null, null, "timestamp", null), jsc);
     deltaStreamer.sync();
-    TestHelpers.assertRecordCount(JSON_KAFKA_NUM_RECORDS, tableBasePath, 
sqlContext);
+    assertRecordCount(JSON_KAFKA_NUM_RECORDS, tableBasePath, sqlContext);
 
     int totalRecords = JSON_KAFKA_NUM_RECORDS;
     int records = 10;
     totalRecords += records;
     prepareJsonKafkaDFSFiles(records, false, topicName);
     deltaStreamer.sync();
-    TestHelpers.assertRecordCount(totalRecords, tableBasePath, sqlContext);
+    assertRecordCount(totalRecords, tableBasePath, sqlContext);
   }
 
   @Test
@@ -2022,7 +1782,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
             true, 100000, false, null,
             null, "timestamp", String.valueOf(System.currentTimeMillis())), 
jsc);
     deltaStreamer.sync();
-    TestHelpers.assertRecordCount(JSON_KAFKA_NUM_RECORDS, tableBasePath, 
sqlContext);
+    assertRecordCount(JSON_KAFKA_NUM_RECORDS, tableBasePath, sqlContext);
 
     prepareJsonKafkaDFSFiles(JSON_KAFKA_NUM_RECORDS, false, topicName);
     deltaStreamer = new HoodieDeltaStreamer(
@@ -2031,7 +1791,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
             true, 100000, false, null, null,
             "timestamp", String.valueOf(System.currentTimeMillis())), jsc);
     deltaStreamer.sync();
-    TestHelpers.assertRecordCount(JSON_KAFKA_NUM_RECORDS * 2, tableBasePath, 
sqlContext);
+    assertRecordCount(JSON_KAFKA_NUM_RECORDS * 2, tableBasePath, sqlContext);
   }
 
   @Disabled("HUDI-6609")
@@ -2055,7 +1815,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     //parquetCfg.continuousMode = false;
     HoodieDeltaStreamer parquetDs = new HoodieDeltaStreamer(parquetCfg, jsc);
     parquetDs.sync();
-    TestHelpers.assertRecordCount(100, tableBasePath, sqlContext);
+    assertRecordCount(100, tableBasePath, sqlContext);
 
     // prep json kafka source
     topicName = "topic" + testNum;
@@ -2070,13 +1830,13 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
             true, Integer.MAX_VALUE, false, null, null, "timestamp", null), 
jsc);
     kafkaDs.sync();
     int totalExpectedRecords = parquetRecords + 20;
-    TestHelpers.assertRecordCount(totalExpectedRecords, tableBasePath, 
sqlContext);
+    assertRecordCount(totalExpectedRecords, tableBasePath, sqlContext);
     //parquet again
     prepareParquetDFSUpdates(parquetRecords, PARQUET_SOURCE_ROOT, 
FIRST_PARQUET_FILE_NAME, true, HoodieTestDataGenerator.TRIP_SCHEMA, 
HoodieTestDataGenerator.AVRO_TRIP_SCHEMA,
         dataGenerator, "001");
     parquetDs = new HoodieDeltaStreamer(parquetCfg, jsc);
     parquetDs.sync();
-    TestHelpers.assertRecordCount(parquetRecords * 2 + 20, tableBasePath, 
sqlContext);
+    assertRecordCount(parquetRecords * 2 + 20, tableBasePath, sqlContext);
 
     HoodieTableMetaClient metaClient = 
HoodieTestUtils.init(jsc.hadoopConfiguration(), tableBasePath);
     List<HoodieInstant> instants = 
metaClient.getCommitsTimeline().getInstants();
@@ -2172,7 +1932,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
               null, PROPS_FILENAME_TEST_PARQUET, false,
               useSchemaProvider, 100000, false, null, null, "timestamp", 
null), jsc);
       deltaStreamer.sync();
-      TestHelpers.assertRecordCount(parquetRecordsCount, tableBasePath, 
sqlContext);
+      assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
     } else {
       assertThrows(HoodieIOException.class, () -> new HoodieDeltaStreamer(
           TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, 
ParquetDFSSource.class.getName(),
@@ -2266,7 +2026,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
             transformerClassNames, PROPS_FILENAME_TEST_CSV, false,
             useSchemaProvider, 1000, false, null, null, sourceOrderingField, 
null), jsc);
     deltaStreamer.sync();
-    TestHelpers.assertRecordCount(CSV_NUM_RECORDS, tableBasePath, sqlContext);
+    assertRecordCount(CSV_NUM_RECORDS, tableBasePath, sqlContext);
     testNum++;
   }
 
@@ -2386,7 +2146,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
             Collections.emptyList(), PROPS_FILENAME_TEST_SQL_SOURCE, false,
             false, 1000, false, null, null, "timestamp", null, true), jsc);
     deltaStreamer.sync();
-    TestHelpers.assertRecordCount(SQL_SOURCE_NUM_RECORDS, tableBasePath, 
sqlContext);
+    assertRecordCount(SQL_SOURCE_NUM_RECORDS, tableBasePath, sqlContext);
   }
 
   @Disabled
@@ -2420,7 +2180,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
       HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(cfg, jsc);
       deltaStreamerTestRunner(deltaStreamer, cfg, (r) -> {
         TestHelpers.assertAtleastNCompactionCommits(numRecords / sourceLimit + 
((numRecords % sourceLimit == 0) ? 0 : 1), tableBasePath, fs);
-        TestHelpers.assertRecordCount(numRecords, tableBasePath, sqlContext);
+        assertRecordCount(numRecords, tableBasePath, sqlContext);
         return true;
       });
     } catch (Exception e) {
@@ -2443,7 +2203,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     insertInTable(tableBasePath, 9, WriteOperationType.UPSERT);
     //No change as this fails with Path not exist error
     assertThrows(HoodieIncrementalPathNotFoundException.class, () -> new 
HoodieDeltaStreamer(downstreamCfg, jsc).sync());
-    TestHelpers.assertRecordCount(1000, downstreamTableBasePath, sqlContext);
+    assertRecordCount(1000, downstreamTableBasePath, sqlContext);
 
     if (downstreamCfg.configs == null) {
       downstreamCfg.configs = new ArrayList<>();
@@ -2506,7 +2266,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
             null, PROPS_FILENAME_TEST_PARQUET, false,
             false, 100000, false, null, null, "timestamp", null), jsc);
     deltaStreamer.sync();
-    TestHelpers.assertRecordCount(PARQUET_NUM_RECORDS, tableBasePath, 
sqlContext);
+    assertRecordCount(PARQUET_NUM_RECORDS, tableBasePath, sqlContext);
     testNum++;
 
     prepareParquetDFSFiles(PARQUET_NUM_RECORDS, PARQUET_SOURCE_ROOT);
@@ -2518,7 +2278,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
             false, 100000, false, null, null, "timestamp", null), jsc);
     deltaStreamer.sync();
     // No records should match the 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION.
-    TestHelpers.assertNoPartitionMatch(tableBasePath, sqlContext, 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
+    assertNoPartitionMatch(tableBasePath, sqlContext, 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
 
     // There should not be any fileIDs in the deleted partition
     assertTrue(getAllFileIDsInTable(tableBasePath, 
Option.of(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).isEmpty());
@@ -2544,10 +2304,10 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
   void testDeltaStreamerWithSpecifiedOperation(final String tableBasePath, 
WriteOperationType operationType, HoodieRecordType recordType) throws Exception 
{
     // Initial insert
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.BULK_INSERT);
-    TestHelpers.addRecordMerger(recordType, cfg.configs);
+    addRecordMerger(recordType, cfg.configs);
     new HoodieDeltaStreamer(cfg, jsc).sync();
-    TestHelpers.assertRecordCount(1000, tableBasePath, sqlContext);
-    TestHelpers.assertDistanceCount(1000, tableBasePath, sqlContext);
+    assertRecordCount(1000, tableBasePath, sqlContext);
+    assertDistanceCount(1000, tableBasePath, sqlContext);
     TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
 
     // Collect the fileIds before running HoodieDeltaStreamer
@@ -2560,8 +2320,8 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     new HoodieDeltaStreamer(cfg, jsc).sync();
 
     if (operationType == WriteOperationType.INSERT_OVERWRITE) {
-      TestHelpers.assertRecordCount(1000, tableBasePath, sqlContext);
-      TestHelpers.assertDistanceCount(1000, tableBasePath, sqlContext);
+      assertRecordCount(1000, tableBasePath, sqlContext);
+      assertDistanceCount(1000, tableBasePath, sqlContext);
       TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
     } else if (operationType == WriteOperationType.INSERT_OVERWRITE_TABLE) {
       HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(tableBasePath).build();
@@ -2576,8 +2336,8 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
 
     cfg.sourceLimit = 1000;
     new HoodieDeltaStreamer(cfg, jsc).sync();
-    TestHelpers.assertRecordCount(950, tableBasePath, sqlContext);
-    TestHelpers.assertDistanceCount(950, tableBasePath, sqlContext);
+    assertRecordCount(950, tableBasePath, sqlContext);
+    assertDistanceCount(950, tableBasePath, sqlContext);
     TestHelpers.assertCommitMetadata("00001", tableBasePath, fs, 2);
     UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
   }
@@ -2621,7 +2381,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     String tableBasePath = basePath + "/test_drop_partition_columns" + 
testNum++;
     // ingest data with dropping partition columns enabled
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.UPSERT);
-    TestHelpers.addRecordMerger(recordType, cfg.configs);
+    addRecordMerger(recordType, cfg.configs);
     cfg.configs.add(String.format("%s=%s", 
HoodieTableConfig.DROP_PARTITION_COLUMNS.key(), "true"));
     HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
     ds.sync();
@@ -2651,7 +2411,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     cfg.forceEmptyMetaSync = true;
 
     new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync();
-    TestHelpers.assertRecordCount(0, tableBasePath, sqlContext);
+    assertRecordCount(0, tableBasePath, sqlContext);
 
     // make sure hive table is present
     HiveSyncConfig hiveSyncConfig = getHiveSyncConfig(tableBasePath, 
"hive_trips");
@@ -2667,7 +2427,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     // default table type is COW
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.BULK_INSERT);
     new HoodieDeltaStreamer(cfg, jsc).sync();
-    TestHelpers.assertRecordCount(1000, tableBasePath, sqlContext);
+    assertRecordCount(1000, tableBasePath, sqlContext);
     TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
     TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs);
 
@@ -2690,9 +2450,9 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     cfg.tableType = HoodieTableType.MERGE_ON_READ.name();
     new HoodieDeltaStreamer(cfg, jsc).sync();
     // out of 1000 new records, 500 are inserts, 450 are updates and 50 are 
deletes.
-    TestHelpers.assertRecordCount(1450, tableBasePath, sqlContext);
+    assertRecordCount(1450, tableBasePath, sqlContext);
     TestHelpers.assertCommitMetadata("00001", tableBasePath, fs, 2);
-    List<Row> counts = TestHelpers.countsPerCommit(tableBasePath, sqlContext);
+    List<Row> counts = countsPerCommit(tableBasePath, sqlContext);
     assertEquals(1450, counts.stream().mapToLong(entry -> 
entry.getLong(1)).sum());
     TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs);
     // currently there should be 1 deltacommits now
@@ -2702,9 +2462,9 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     new HoodieDeltaStreamer(cfg, jsc).sync();
     // out of 1000 new records, 500 are inserts, 450 are updates and 50 are 
deletes.
     // total records should be 1900 now
-    TestHelpers.assertRecordCount(1900, tableBasePath, sqlContext);
+    assertRecordCount(1900, tableBasePath, sqlContext);
     TestHelpers.assertCommitMetadata("00002", tableBasePath, fs, 3);
-    counts = TestHelpers.countsPerCommit(tableBasePath, sqlContext);
+    counts = countsPerCommit(tableBasePath, sqlContext);
     assertEquals(1900, counts.stream().mapToLong(entry -> 
entry.getLong(1)).sum());
     TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs);
     // currently there should be 2 deltacommits now
@@ -2731,11 +2491,11 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
         useSchemaProvider, 100000, false, null, null, "timestamp", null);
     HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(config, jsc);
     deltaStreamer.sync();
-    TestHelpers.assertRecordCount(parquetRecordsCount, tableBasePath, 
sqlContext);
+    assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
 
     prepareParquetDFSFiles(200, PARQUET_SOURCE_ROOT, "2.parquet", false, null, 
null);
     deltaStreamer.sync();
-    TestHelpers.assertRecordCount(parquetRecordsCount + 200, tableBasePath, 
sqlContext);
+    assertRecordCount(parquetRecordsCount + 200, tableBasePath, sqlContext);
     testNum++;
   }
 
@@ -2746,7 +2506,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     String tableBasePath = basePath + 
String.format("/configurationHotUpdate_%s_%s", tableType.name(), 
recordType.name());
 
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.UPSERT);
-    TestHelpers.addRecordMerger(recordType, cfg.configs);
+    addRecordMerger(recordType, cfg.configs);
     cfg.continuousMode = true;
     cfg.tableType = tableType.name();
     cfg.configHotUpdateStrategyClass = 
MockConfigurationHotUpdateStrategy.class.getName();
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerDAGExecution.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerDAGExecution.java
index 528a69a7e91..53e1733c9a6 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerDAGExecution.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerDAGExecution.java
@@ -86,14 +86,14 @@ public class TestHoodieDeltaStreamerDAGExecution extends 
HoodieDeltaStreamerTest
         PARQUET_SOURCE_ROOT, false, "partition_path", "");
     String tableBasePath = basePath + "/runDeltaStreamer" + testNum;
     FileIOUtils.deleteDirectory(new File(tableBasePath));
-    HoodieDeltaStreamer.Config config = 
TestHoodieDeltaStreamer.TestHelpers.makeConfig(tableBasePath, operationType,
+    HoodieDeltaStreamer.Config config = TestHelpers.makeConfig(tableBasePath, 
operationType,
         ParquetDFSSource.class.getName(), null, PROPS_FILENAME_TEST_PARQUET, 
false,
         useSchemaProvider, 100000, false, null, 
HoodieTableType.MERGE_ON_READ.name(), "timestamp", null);
     configsOpt.ifPresent(cfgs -> config.configs.addAll(cfgs));
     HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(config, jsc);
 
     deltaStreamer.sync();
-    TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(parquetRecordsCount, 
tableBasePath, sqlContext);
+    assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
     testNum++;
 
     if (shouldGenerateUpdates) {
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerWithMultiWriter.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerWithMultiWriter.java
index 8a95be0b6cd..e59d23685e7 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerWithMultiWriter.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerWithMultiWriter.java
@@ -26,11 +26,11 @@ import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.FileIOUtils;
 import org.apache.hudi.config.HoodieCleanConfig;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode;
-import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
 import org.apache.hudi.utilities.config.SourceTestConfig;
 import org.apache.hudi.utilities.sources.TestDataSource;
 import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
@@ -38,12 +38,14 @@ import 
org.apache.hudi.utilities.testutils.UtilitiesTestBase;
 import org.apache.hadoop.fs.FileSystem;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.io.IOException;
 import java.net.URI;
 import java.nio.file.Paths;
@@ -62,37 +64,40 @@ import static 
org.apache.hudi.config.HoodieWriteConfig.FINALIZE_WRITE_PARALLELIS
 import static 
org.apache.hudi.config.HoodieWriteConfig.INSERT_PARALLELISM_VALUE;
 import static 
org.apache.hudi.config.HoodieWriteConfig.UPSERT_PARALLELISM_VALUE;
 import static 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_KEY;
-import static 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerTestBase.PROPS_FILENAME_TEST_MULTI_WRITER;
-import static 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerTestBase.addCommitToTimeline;
-import static 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerTestBase.defaultSchemaProviderClassName;
-import static 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerTestBase.prepareInitialConfigs;
 import static 
org.apache.hudi.utilities.deltastreamer.TestHoodieDeltaStreamer.deltaStreamerTestRunner;
 
-public class TestHoodieDeltaStreamerWithMultiWriter extends 
SparkClientFunctionalTestHarness {
+public class TestHoodieDeltaStreamerWithMultiWriter extends 
HoodieDeltaStreamerTestBase {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(TestHoodieDeltaStreamerWithMultiWriter.class);
 
   String basePath;
   String propsFilePath;
   String tableBasePath;
-  
+
+  @BeforeEach
+  public void setup() throws Exception {
+    basePath = UtilitiesTestBase.basePath;
+    super.setupTest();
+  }
+
   @AfterEach
   public void teardown() throws Exception {
     TestDataSource.resetDataGen();
+    FileIOUtils.deleteDirectory(new File(basePath));
   }
 
   @ParameterizedTest
   @EnumSource(HoodieTableType.class)
   void 
testUpsertsContinuousModeWithMultipleWritersForConflicts(HoodieTableType 
tableType) throws Exception {
     // NOTE : Overriding the LockProvider to InProcessLockProvider since 
Zookeeper locks work in unit test but fail on Jenkins with connection timeouts
-    basePath = Paths.get(URI.create(basePath().replaceAll("/$", 
""))).toString();
+    basePath = Paths.get(URI.create(basePath.replaceAll("/$", ""))).toString();
     propsFilePath = basePath + "/" + PROPS_FILENAME_TEST_MULTI_WRITER;
-    tableBasePath = basePath + "/testtable_" + tableType;
-    prepareInitialConfigs(fs(), basePath, "foo");
-    TypedProperties props = prepareMultiWriterProps(fs(), basePath, 
propsFilePath);
+    tableBasePath = basePath + 
"/testUpsertsContinuousModeWithMultipleWritersForConflicts_" + tableType;
+    prepareInitialConfigs(fs, basePath, "foo");
+    TypedProperties props = prepareMultiWriterProps(fs, basePath, 
propsFilePath);
     props.setProperty("hoodie.write.lock.provider", 
"org.apache.hudi.client.transaction.lock.InProcessLockProvider");
     
props.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,"3000");
-    UtilitiesTestBase.Helpers.savePropsToDFS(props, fs(), propsFilePath);
+    UtilitiesTestBase.Helpers.savePropsToDFS(props, fs, propsFilePath);
     // Keep it higher than batch-size to test continuous mode
     int totalRecords = 3000;
 
@@ -106,18 +111,18 @@ public class TestHoodieDeltaStreamerWithMultiWriter 
extends SparkClientFunctiona
       prepJobConfig.configs.add(String.format("%s=3", 
HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key()));
       prepJobConfig.configs.add(String.format("%s=0", 
HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key()));
     }
-    HoodieDeltaStreamer prepJob = new HoodieDeltaStreamer(prepJobConfig, 
jsc());
+    HoodieDeltaStreamer prepJob = new HoodieDeltaStreamer(prepJobConfig, jsc);
 
     // Prepare base dataset with some commits
     deltaStreamerTestRunner(prepJob, prepJobConfig, (r) -> {
       if (tableType.equals(HoodieTableType.MERGE_ON_READ)) {
-        TestHoodieDeltaStreamer.TestHelpers.assertAtleastNDeltaCommits(3, 
tableBasePath, fs());
-        TestHoodieDeltaStreamer.TestHelpers.assertAtleastNCompactionCommits(1, 
tableBasePath, fs());
+        TestHelpers.assertAtleastNDeltaCommits(3, tableBasePath, fs);
+        TestHelpers.assertAtleastNCompactionCommits(1, tableBasePath, fs);
       } else {
-        TestHoodieDeltaStreamer.TestHelpers.assertAtleastNCompactionCommits(3, 
tableBasePath, fs());
+        TestHelpers.assertAtleastNCompactionCommits(3, tableBasePath, fs);
       }
-      TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(totalRecords, 
tableBasePath, sqlContext());
-      TestHoodieDeltaStreamer.TestHelpers.assertDistanceCount(totalRecords, 
tableBasePath, sqlContext());
+      assertRecordCount(totalRecords, tableBasePath, sqlContext);
+      assertDistanceCount(totalRecords, tableBasePath, sqlContext);
       return true;
     });
 
@@ -131,17 +136,17 @@ public class TestHoodieDeltaStreamerWithMultiWriter 
extends SparkClientFunctiona
     HoodieDeltaStreamer.Config cfgBackfillJob = 
getDeltaStreamerConfig(tableBasePath, tableType.name(), 
WriteOperationType.UPSERT,
         propsFilePath, 
Collections.singletonList(TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName()));
     cfgBackfillJob.continuousMode = false;
-    HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(hadoopConf()).setBasePath(tableBasePath).build();
+    HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(tableBasePath).build();
     HoodieTimeline timeline = 
meta.reloadActiveTimeline().getCommitsTimeline().filterCompletedInstants();
     HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
         
.fromBytes(timeline.getInstantDetails(timeline.firstInstant().get()).get(), 
HoodieCommitMetadata.class);
     cfgBackfillJob.checkpoint = commitMetadata.getMetadata(CHECKPOINT_KEY);
     cfgBackfillJob.configs.add(String.format("%s=%d", 
SourceTestConfig.MAX_UNIQUE_RECORDS_PROP.key(), totalRecords));
     cfgBackfillJob.configs.add(String.format("%s=false", 
HoodieCleanConfig.AUTO_CLEAN.key()));
-    HoodieDeltaStreamer backfillJob = new HoodieDeltaStreamer(cfgBackfillJob, 
jsc());
+    HoodieDeltaStreamer backfillJob = new HoodieDeltaStreamer(cfgBackfillJob, 
jsc);
 
     // re-init ingestion job to start sync service
-    HoodieDeltaStreamer ingestionJob2 = new 
HoodieDeltaStreamer(cfgIngestionJob, jsc());
+    HoodieDeltaStreamer ingestionJob2 = new 
HoodieDeltaStreamer(cfgIngestionJob, jsc);
 
     // run ingestion & backfill in parallel, create conflict and fail one
     runJobsInParallel(tableBasePath, tableType, totalRecords, ingestionJob2,
@@ -152,14 +157,14 @@ public class TestHoodieDeltaStreamerWithMultiWriter 
extends SparkClientFunctiona
   @EnumSource(HoodieTableType.class)
   void 
testUpsertsContinuousModeWithMultipleWritersWithoutConflicts(HoodieTableType 
tableType) throws Exception {
     // NOTE : Overriding the LockProvider to InProcessLockProvider since 
Zookeeper locks work in unit test but fail on Jenkins with connection timeouts
-    basePath = Paths.get(URI.create(basePath().replaceAll("/$", 
""))).toString();
+    basePath = Paths.get(URI.create(basePath.replaceAll("/$", ""))).toString();
     propsFilePath = basePath + "/" + PROPS_FILENAME_TEST_MULTI_WRITER;
-    tableBasePath = basePath + "/testtable_" + tableType;
-    prepareInitialConfigs(fs(), basePath, "foo");
-    TypedProperties props = prepareMultiWriterProps(fs(), basePath, 
propsFilePath);
+    tableBasePath = basePath + 
"/testUpsertsContinuousModeWithMultipleWritersWithoutConflicts_" + tableType;
+    prepareInitialConfigs(fs, basePath, "foo");
+    TypedProperties props = prepareMultiWriterProps(fs, basePath, 
propsFilePath);
     props.setProperty("hoodie.write.lock.provider", 
"org.apache.hudi.client.transaction.lock.InProcessLockProvider");
     
props.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,"3000");
-    UtilitiesTestBase.Helpers.savePropsToDFS(props, fs(), propsFilePath);
+    UtilitiesTestBase.Helpers.savePropsToDFS(props, fs, propsFilePath);
     // Keep it higher than batch-size to test continuous mode
     int totalRecords = 3000;
 
@@ -168,31 +173,31 @@ public class TestHoodieDeltaStreamerWithMultiWriter 
extends SparkClientFunctiona
     prepJobConfig.continuousMode = true;
     prepJobConfig.configs.add(String.format("%s=%d", 
SourceTestConfig.MAX_UNIQUE_RECORDS_PROP.key(), totalRecords));
     prepJobConfig.configs.add(String.format("%s=false", 
HoodieCleanConfig.AUTO_CLEAN.key()));
-    HoodieDeltaStreamer prepJob = new HoodieDeltaStreamer(prepJobConfig, 
jsc());
+    HoodieDeltaStreamer prepJob = new HoodieDeltaStreamer(prepJobConfig, jsc);
 
     // Prepare base dataset with some commits
     deltaStreamerTestRunner(prepJob, prepJobConfig, (r) -> {
       if (tableType.equals(HoodieTableType.MERGE_ON_READ)) {
-        TestHoodieDeltaStreamer.TestHelpers.assertAtleastNDeltaCommits(3, 
tableBasePath, fs());
-        TestHoodieDeltaStreamer.TestHelpers.assertAtleastNCompactionCommits(1, 
tableBasePath, fs());
+        TestHelpers.assertAtleastNDeltaCommits(3, tableBasePath, fs);
+        TestHelpers.assertAtleastNCompactionCommits(1, tableBasePath, fs);
       } else {
-        TestHoodieDeltaStreamer.TestHelpers.assertAtleastNCompactionCommits(3, 
tableBasePath, fs());
+        TestHelpers.assertAtleastNCompactionCommits(3, tableBasePath, fs);
       }
-      TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(totalRecords, 
tableBasePath, sqlContext());
-      TestHoodieDeltaStreamer.TestHelpers.assertDistanceCount(totalRecords, 
tableBasePath, sqlContext());
+      assertRecordCount(totalRecords, tableBasePath, sqlContext);
+      assertDistanceCount(totalRecords, tableBasePath, sqlContext);
       return true;
     });
 
     // create new ingestion & backfill job config to generate only INSERTS to 
avoid conflict
-    props = prepareMultiWriterProps(fs(), basePath, propsFilePath);
+    props = prepareMultiWriterProps(fs, basePath, propsFilePath);
     props.setProperty("hoodie.write.lock.provider", 
"org.apache.hudi.client.transaction.lock.InProcessLockProvider");
     
props.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,"3000");
     props.setProperty("hoodie.test.source.generate.inserts", "true");
-    UtilitiesTestBase.Helpers.savePropsToDFS(props, fs(), basePath + "/" + 
PROPS_FILENAME_TEST_MULTI_WRITER);
+    UtilitiesTestBase.Helpers.savePropsToDFS(props, fs, basePath + "/" + 
PROPS_FILENAME_TEST_MULTI_WRITER);
     HoodieDeltaStreamer.Config cfgBackfillJob2 = 
getDeltaStreamerConfig(tableBasePath, tableType.name(), 
WriteOperationType.INSERT,
         propsFilePath, 
Collections.singletonList(TestHoodieDeltaStreamer.TestIdentityTransformer.class.getName()));
     cfgBackfillJob2.continuousMode = false;
-    HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(hadoopConf()).setBasePath(tableBasePath).build();
+    HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(tableBasePath).build();
     HoodieTimeline timeline = 
meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
     HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
         
.fromBytes(timeline.getInstantDetails(timeline.firstInstant().get()).get(), 
HoodieCommitMetadata.class);
@@ -206,9 +211,9 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends 
SparkClientFunctiona
     cfgIngestionJob2.configs.add(String.format("%s=%d", 
SourceTestConfig.MAX_UNIQUE_RECORDS_PROP.key(), totalRecords));
     cfgIngestionJob2.configs.add(String.format("%s=false", 
HoodieCleanConfig.AUTO_CLEAN.key()));
     // re-init ingestion job
-    HoodieDeltaStreamer ingestionJob3 = new 
HoodieDeltaStreamer(cfgIngestionJob2, jsc());
+    HoodieDeltaStreamer ingestionJob3 = new 
HoodieDeltaStreamer(cfgIngestionJob2, jsc);
     // re-init backfill job
-    HoodieDeltaStreamer backfillJob2 = new 
HoodieDeltaStreamer(cfgBackfillJob2, jsc());
+    HoodieDeltaStreamer backfillJob2 = new 
HoodieDeltaStreamer(cfgBackfillJob2, jsc);
 
     // run ingestion & backfill in parallel, avoid conflict and succeed both
     runJobsInParallel(tableBasePath, tableType, totalRecords, ingestionJob3,
@@ -220,14 +225,14 @@ public class TestHoodieDeltaStreamerWithMultiWriter 
extends SparkClientFunctiona
   @EnumSource(value = HoodieTableType.class, names = {"COPY_ON_WRITE"})
   void testLatestCheckpointCarryOverWithMultipleWriters(HoodieTableType 
tableType) throws Exception {
     // NOTE : Overriding the LockProvider to InProcessLockProvider since 
Zookeeper locks work in unit test but fail on Jenkins with connection timeouts
-    basePath = Paths.get(URI.create(basePath().replaceAll("/$", 
""))).toString();
+    basePath = Paths.get(URI.create(basePath.replaceAll("/$", ""))).toString();
     propsFilePath = basePath + "/" + PROPS_FILENAME_TEST_MULTI_WRITER;
-    tableBasePath = basePath + "/testtable_" + tableType;
-    prepareInitialConfigs(fs(), basePath, "foo");
-    TypedProperties props = prepareMultiWriterProps(fs(), basePath, 
propsFilePath);
+    tableBasePath = basePath + 
"/testLatestCheckpointCarryOverWithMultipleWriters_" + tableType;
+    prepareInitialConfigs(fs, basePath, "foo");
+    TypedProperties props = prepareMultiWriterProps(fs, basePath, 
propsFilePath);
     props.setProperty("hoodie.write.lock.provider", 
"org.apache.hudi.client.transaction.lock.InProcessLockProvider");
     
props.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,"3000");
-    UtilitiesTestBase.Helpers.savePropsToDFS(props, fs(), propsFilePath);
+    UtilitiesTestBase.Helpers.savePropsToDFS(props, fs, propsFilePath);
     // Keep it higher than batch-size to test continuous mode
     int totalRecords = 3000;
 
@@ -236,18 +241,18 @@ public class TestHoodieDeltaStreamerWithMultiWriter 
extends SparkClientFunctiona
     prepJobConfig.continuousMode = true;
     prepJobConfig.configs.add(String.format("%s=%d", 
SourceTestConfig.MAX_UNIQUE_RECORDS_PROP.key(), totalRecords));
     prepJobConfig.configs.add(String.format("%s=false", 
HoodieCleanConfig.AUTO_CLEAN.key()));
-    HoodieDeltaStreamer prepJob = new HoodieDeltaStreamer(prepJobConfig, 
jsc());
+    HoodieDeltaStreamer prepJob = new HoodieDeltaStreamer(prepJobConfig, jsc);
 
     // Prepare base dataset with some commits
     deltaStreamerTestRunner(prepJob, prepJobConfig, (r) -> {
       if (tableType.equals(HoodieTableType.MERGE_ON_READ)) {
-        TestHoodieDeltaStreamer.TestHelpers.assertAtleastNDeltaCommits(3, 
tableBasePath, fs());
-        TestHoodieDeltaStreamer.TestHelpers.assertAtleastNCompactionCommits(1, 
tableBasePath, fs());
+        TestHelpers.assertAtleastNDeltaCommits(3, tableBasePath, fs);
+        TestHelpers.assertAtleastNCompactionCommits(1, tableBasePath, fs);
       } else {
-        TestHoodieDeltaStreamer.TestHelpers.assertAtleastNCompactionCommits(3, 
tableBasePath, fs());
+        TestHelpers.assertAtleastNCompactionCommits(3, tableBasePath, fs);
       }
-      TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(totalRecords, 
tableBasePath, sqlContext());
-      TestHoodieDeltaStreamer.TestHelpers.assertDistanceCount(totalRecords, 
tableBasePath, sqlContext());
+      assertRecordCount(totalRecords, tableBasePath, sqlContext);
+      assertDistanceCount(totalRecords, tableBasePath, sqlContext);
       return true;
     });
 
@@ -255,17 +260,17 @@ public class TestHoodieDeltaStreamerWithMultiWriter 
extends SparkClientFunctiona
     HoodieDeltaStreamer.Config cfgBackfillJob = 
getDeltaStreamerConfig(tableBasePath, tableType.name(), 
WriteOperationType.UPSERT,
         propsFilePath, 
Collections.singletonList(TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName()));
     cfgBackfillJob.continuousMode = false;
-    HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(hadoopConf()).setBasePath(tableBasePath).build();
+    HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(tableBasePath).build();
 
     HoodieTimeline timeline = 
meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
     HoodieCommitMetadata commitMetadataForFirstInstant = HoodieCommitMetadata
         
.fromBytes(timeline.getInstantDetails(timeline.firstInstant().get()).get(), 
HoodieCommitMetadata.class);
 
     // run the backfill job
-    props = prepareMultiWriterProps(fs(), basePath, propsFilePath);
+    props = prepareMultiWriterProps(fs, basePath, propsFilePath);
     props.setProperty("hoodie.write.lock.provider", 
"org.apache.hudi.client.transaction.lock.InProcessLockProvider");
     
props.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY,"3000");
-    UtilitiesTestBase.Helpers.savePropsToDFS(props, fs(), propsFilePath);
+    UtilitiesTestBase.Helpers.savePropsToDFS(props, fs, propsFilePath);
 
     // get current checkpoint after preparing base dataset with some commits
     HoodieCommitMetadata commitMetadataForLastInstant = 
getLatestMetadata(meta);
@@ -274,7 +279,7 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends 
SparkClientFunctiona
     cfgBackfillJob.checkpoint = 
commitMetadataForLastInstant.getMetadata(CHECKPOINT_KEY);
     cfgBackfillJob.configs.add(String.format("%s=%d", 
SourceTestConfig.MAX_UNIQUE_RECORDS_PROP.key(), totalRecords));
     cfgBackfillJob.configs.add(String.format("%s=false", 
HoodieCleanConfig.AUTO_CLEAN.key()));
-    HoodieDeltaStreamer backfillJob = new HoodieDeltaStreamer(cfgBackfillJob, 
jsc());
+    HoodieDeltaStreamer backfillJob = new HoodieDeltaStreamer(cfgBackfillJob, 
jsc);
     backfillJob.sync();
 
     meta.reloadActiveTimeline();
@@ -286,7 +291,7 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends 
SparkClientFunctiona
     verifyCommitMetadataCheckpoint(meta, null);
 
     cfgBackfillJob.checkpoint = null;
-    new HoodieDeltaStreamer(cfgBackfillJob, jsc()).sync(); // if deltastreamer 
checkpoint fetch does not walk back to older commits, this sync will fail
+    new HoodieDeltaStreamer(cfgBackfillJob, jsc).sync(); // if deltastreamer 
checkpoint fetch does not walk back to older commits, this sync will fail
     meta.reloadActiveTimeline();
     Assertions.assertEquals(totalCommits + 2, 
meta.getCommitsTimeline().filterCompletedInstants().countInstants());
     verifyCommitMetadataCheckpoint(meta, "00008");
@@ -309,8 +314,8 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends 
SparkClientFunctiona
 
   private static TypedProperties prepareMultiWriterProps(FileSystem fs, String 
basePath, String propsFilePath) throws IOException {
     TypedProperties props = new TypedProperties();
-    HoodieDeltaStreamerTestBase.populateCommonProps(props, basePath);
-    HoodieDeltaStreamerTestBase.populateCommonHiveProps(props);
+    populateCommonProps(props, basePath);
+    populateCommonHiveProps(props);
 
     props.setProperty("include", "sql-transformer.properties");
     props.setProperty("hoodie.datasource.write.keygenerator.class", 
TestHoodieDeltaStreamer.TestGenerator.class.getName());
@@ -362,18 +367,18 @@ public class TestHoodieDeltaStreamerWithMultiWriter 
extends SparkClientFunctiona
       HoodieDeltaStreamer ingestionJob, HoodieDeltaStreamer.Config 
cfgIngestionJob, HoodieDeltaStreamer backfillJob,
       HoodieDeltaStreamer.Config cfgBackfillJob, boolean expectConflict, 
String jobId) throws Exception {
     ExecutorService service = Executors.newFixedThreadPool(2);
-    HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(hadoopConf()).setBasePath(tableBasePath).build();
+    HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(tableBasePath).build();
     HoodieTimeline timeline = 
meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
     String lastSuccessfulCommit = timeline.lastInstant().get().getTimestamp();
     // Condition for parallel ingestion job
     Function<Boolean, Boolean> conditionForRegularIngestion = (r) -> {
       if (tableType.equals(HoodieTableType.MERGE_ON_READ)) {
-        
TestHoodieDeltaStreamer.TestHelpers.assertAtleastNDeltaCommitsAfterCommit(3, 
lastSuccessfulCommit, tableBasePath, fs());
+        TestHelpers.assertAtleastNDeltaCommitsAfterCommit(3, 
lastSuccessfulCommit, tableBasePath, fs);
       } else {
-        
TestHoodieDeltaStreamer.TestHelpers.assertAtleastNCompactionCommitsAfterCommit(3,
 lastSuccessfulCommit, tableBasePath, fs());
+        TestHelpers.assertAtleastNCompactionCommitsAfterCommit(3, 
lastSuccessfulCommit, tableBasePath, fs);
       }
-      TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(totalRecords, 
tableBasePath, sqlContext());
-      TestHoodieDeltaStreamer.TestHelpers.assertDistanceCount(totalRecords, 
tableBasePath, sqlContext());
+      assertRecordCount(totalRecords, tableBasePath, sqlContext);
+      assertDistanceCount(totalRecords, tableBasePath, sqlContext);
       return true;
     };
 
@@ -445,7 +450,7 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends 
SparkClientFunctiona
     GetCommitsAfterInstant(String basePath, String lastSuccessfulCommit) {
       this.basePath = basePath;
       this.lastSuccessfulCommit = lastSuccessfulCommit;
-      meta = 
HoodieTableMetaClient.builder().setConf(fs().getConf()).setBasePath(basePath).build();
+      meta = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(basePath).build();
     }
 
     long getCommitsAfterInstant() {
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieMultiTableDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieMultiTableDeltaStreamer.java
index 9c858dd475a..a8ee0c694fd 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieMultiTableDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieMultiTableDeltaStreamer.java
@@ -193,8 +193,8 @@ public class TestHoodieMultiTableDeltaStreamer extends 
HoodieDeltaStreamerTestBa
     String targetBasePath2 = 
executionContexts.get(1).getConfig().targetBasePath;
     streamer.sync();
 
-    TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(5, targetBasePath1, 
sqlContext);
-    TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(10, targetBasePath2, 
sqlContext);
+    assertRecordCount(5, targetBasePath1, sqlContext);
+    assertRecordCount(10, targetBasePath2, sqlContext);
 
     //insert updates for already existing records in kafka topics
     testUtils.sendMessages(topicName1, 
Helpers.jsonifyRecords(dataGenerator.generateUpdatesAsPerSchema("001", 5, 
HoodieTestDataGenerator.TRIP_SCHEMA)));
@@ -209,8 +209,8 @@ public class TestHoodieMultiTableDeltaStreamer extends 
HoodieDeltaStreamerTestBa
     assertTrue(streamer.getFailedTables().isEmpty());
 
     //assert the record count matches now
-    TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(5, targetBasePath1, 
sqlContext);
-    TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(10, targetBasePath2, 
sqlContext);
+    assertRecordCount(5, targetBasePath1, sqlContext);
+    assertRecordCount(10, targetBasePath2, sqlContext);
     testNum++;
   }
 
@@ -307,7 +307,7 @@ public class TestHoodieMultiTableDeltaStreamer extends 
HoodieDeltaStreamerTestBa
 
   private void syncAndVerify(HoodieMultiTableDeltaStreamer streamer, String 
targetBasePath1, String targetBasePath2, long table1ExpectedRecords, long 
table2ExpectedRecords) {
     streamer.sync();
-    
TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(table1ExpectedRecords, 
targetBasePath1, sqlContext);
-    
TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(table2ExpectedRecords, 
targetBasePath2, sqlContext);
+    assertRecordCount(table1ExpectedRecords, targetBasePath1, sqlContext);
+    assertRecordCount(table2ExpectedRecords, targetBasePath2, sqlContext);
   }
 }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestTransformer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestTransformer.java
index e941aff8c04..888f5ebc2de 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestTransformer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestTransformer.java
@@ -59,7 +59,7 @@ public class TestTransformer extends 
HoodieDeltaStreamerTestBase {
         PARQUET_SOURCE_ROOT, false, "partition_path", "");
     String tableBasePath = basePath + 
"/testMultipleTransformersWithIdentifiers" + testNum;
     HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
-        TestHoodieDeltaStreamer.TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.INSERT, ParquetDFSSource.class.getName(),
+        HoodieDeltaStreamerTestBase.TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.INSERT, ParquetDFSSource.class.getName(),
             transformerClassNames, PROPS_FILENAME_TEST_PARQUET, false,
             useSchemaProvider, 100000, false, null, null, "timestamp", null), 
jsc);
 
@@ -78,7 +78,7 @@ public class TestTransformer extends 
HoodieDeltaStreamerTestBase {
     properties.setProperty("transformer.suffix", ".1,.2,.3");
     deltaStreamer.sync();
 
-    TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(parquetRecordsCount, 
tableBasePath, sqlContext);
+    assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
     assertEquals(0, 
sqlContext.read().format("org.apache.hudi").load(tableBasePath).where("timestamp
 != 110").count());
   }
 
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
index 058ed72a3be..24f645c404a 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
@@ -118,6 +118,7 @@ public class UtilitiesTestBase {
   protected static HoodieSparkEngineContext context;
   protected static SparkSession sparkSession;
   protected static SQLContext sqlContext;
+  protected static Configuration hadoopConf;
 
   @BeforeAll
   public static void setLogLevel() {
@@ -131,7 +132,7 @@ public class UtilitiesTestBase {
   }
 
   public static void initTestServices(boolean needsHdfs, boolean needsHive, 
boolean needsZookeeper) throws Exception {
-    final Configuration hadoopConf = HoodieTestUtils.getDefaultHadoopConf();
+    hadoopConf = HoodieTestUtils.getDefaultHadoopConf();
     hadoopConf.set("hive.exec.scratchdir", System.getenv("java.io.tmpdir") + 
"/hive");
 
     if (needsHdfs) {

Reply via email to