This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 7fe35fb895 [HUDI-5145] Avoid starting HDFS in hudi-utilities tests 
(#7171)
7fe35fb895 is described below

commit 7fe35fb895fb58f0ea599974defd7ffe310a964e
Author: Shiyan Xu <[email protected]>
AuthorDate: Thu Nov 10 12:23:24 2022 +0800

    [HUDI-5145] Avoid starting HDFS in hudi-utilities tests (#7171)
---
 .../testutils/minicluster/HdfsTestService.java     |   8 +-
 .../TestDFSHoodieTestSuiteWriterAdapter.java       |   8 +-
 .../integ/testsuite/TestFileDeltaInputWriter.java  |  10 +-
 .../testsuite/job/TestHoodieTestSuiteJob.java      |  74 ++---
 .../reader/TestDFSAvroDeltaInputReader.java        |  10 +-
 .../reader/TestDFSHoodieDatasetInputReader.java    |   6 +-
 .../functional/HoodieDeltaStreamerTestBase.java    |  10 +-
 .../functional/TestHoodieDeltaStreamer.java        | 314 ++++++++++-----------
 .../TestHoodieMultiTableDeltaStreamer.java         |  40 +--
 .../hudi/utilities/sources/TestAvroDFSSource.java  |   4 +-
 .../hudi/utilities/sources/TestCsvDFSSource.java   |   4 +-
 .../utilities/sources/TestGcsEventsSource.java     |   2 +-
 .../hudi/utilities/sources/TestJsonDFSSource.java  |   4 +-
 .../utilities/sources/TestParquetDFSSource.java    |   2 +-
 .../hudi/utilities/sources/TestS3EventsSource.java |   4 +-
 .../hudi/utilities/sources/TestSqlSource.java      |  11 +-
 .../debezium/TestAbstractDebeziumSource.java       |   2 +-
 .../utilities/testutils/UtilitiesTestBase.java     |  61 ++--
 .../AbstractCloudObjectsSourceTestBase.java        |   2 +-
 .../sources/AbstractDFSSourceTestBase.java         |   6 +-
 .../transform/TestSqlFileBasedTransformer.java     |  23 +-
 21 files changed, 313 insertions(+), 292 deletions(-)

diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/minicluster/HdfsTestService.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/minicluster/HdfsTestService.java
index 0766c61c67..727e1e4db6 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/minicluster/HdfsTestService.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/minicluster/HdfsTestService.java
@@ -53,8 +53,12 @@ public class HdfsTestService {
   private MiniDFSCluster miniDfsCluster;
 
   public HdfsTestService() throws IOException {
-    hadoopConf = new Configuration();
-    workDir = Files.createTempDirectory("temp").toAbsolutePath().toString();
+    this(new Configuration());
+  }
+
+  public HdfsTestService(Configuration hadoopConf) throws IOException {
+    this.hadoopConf = hadoopConf;
+    this.workDir = 
Files.createTempDirectory("temp").toAbsolutePath().toString();
   }
 
   public Configuration getHadoopConf() {
diff --git 
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestDFSHoodieTestSuiteWriterAdapter.java
 
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestDFSHoodieTestSuiteWriterAdapter.java
index 2b69a319a5..9c21ee6bd4 100644
--- 
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestDFSHoodieTestSuiteWriterAdapter.java
+++ 
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestDFSHoodieTestSuiteWriterAdapter.java
@@ -65,7 +65,7 @@ public class TestDFSHoodieTestSuiteWriterAdapter extends 
UtilitiesTestBase {
 
   @BeforeAll
   public static void initClass() throws Exception {
-    UtilitiesTestBase.initTestServices(false, false);
+    UtilitiesTestBase.initTestServices(true, false, false);
   }
 
   @AfterAll
@@ -131,15 +131,15 @@ public class TestDFSHoodieTestSuiteWriterAdapter extends 
UtilitiesTestBase {
   // TODO(HUDI-3668): Fix this test
   public void testDFSWorkloadSinkWithMultipleFilesFunctional() throws 
IOException {
     DeltaConfig dfsSinkConfig = new DFSDeltaConfig(DeltaOutputMode.DFS, 
DeltaInputType.AVRO,
-        new SerializableConfiguration(jsc.hadoopConfiguration()), dfsBasePath, 
dfsBasePath,
+        new SerializableConfiguration(jsc.hadoopConfiguration()), basePath, 
basePath,
         schemaProvider.getSourceSchema().toString(), 10240L, 
jsc.defaultParallelism(), false, false);
     DeltaWriterAdapter<GenericRecord> dfsDeltaWriterAdapter = 
DeltaWriterFactory
         .getDeltaWriterAdapter(dfsSinkConfig, 1);
     FlexibleSchemaRecordGenerationIterator itr = new 
FlexibleSchemaRecordGenerationIterator(1000,
         schemaProvider.getSourceSchema().toString());
     dfsDeltaWriterAdapter.write(itr);
-    FileSystem fs = FSUtils.getFs(dfsBasePath, jsc.hadoopConfiguration());
-    FileStatus[] fileStatuses = fs.listStatus(new Path(dfsBasePath));
+    FileSystem fs = FSUtils.getFs(basePath, jsc.hadoopConfiguration());
+    FileStatus[] fileStatuses = fs.listStatus(new Path(basePath));
     // Since maxFileSize was 10240L and we produced 1K records each close to 
1K size, we should produce more than
     // 1 file
     assertTrue(fileStatuses.length > 0);
diff --git 
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestFileDeltaInputWriter.java
 
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestFileDeltaInputWriter.java
index f3cda10a62..36c88d0242 100644
--- 
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestFileDeltaInputWriter.java
+++ 
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestFileDeltaInputWriter.java
@@ -58,7 +58,7 @@ public class TestFileDeltaInputWriter extends 
UtilitiesTestBase {
 
   @BeforeAll
   public static void initClass() throws Exception {
-    UtilitiesTestBase.initTestServices(false, false);
+    UtilitiesTestBase.initTestServices(true, false, false);
   }
 
   @AfterAll
@@ -82,7 +82,7 @@ public class TestFileDeltaInputWriter extends 
UtilitiesTestBase {
   public void testAvroFileSinkWriter() throws IOException {
     // 1. Create a Avro File Sink Writer
     DeltaInputWriter<GenericRecord> fileSinkWriter =
-        new AvroFileDeltaInputWriter(jsc.hadoopConfiguration(), dfsBasePath + 
"/input", schemaProvider.getSourceSchema()
+        new AvroFileDeltaInputWriter(jsc.hadoopConfiguration(), basePath + 
"/input", schemaProvider.getSourceSchema()
             .toString(), 1024 * 1024L);
     GenericRecordFullPayloadGenerator payloadGenerator =
         new 
GenericRecordFullPayloadGenerator(schemaProvider.getSourceSchema());
@@ -96,7 +96,7 @@ public class TestFileDeltaInputWriter extends 
UtilitiesTestBase {
     });
     fileSinkWriter.close();
     DeltaWriteStats deltaWriteStats = fileSinkWriter.getDeltaWriteStats();
-    FileSystem fs = FSUtils.getFs(dfsBasePath, jsc.hadoopConfiguration());
+    FileSystem fs = FSUtils.getFs(basePath, jsc.hadoopConfiguration());
     FileStatus[] fileStatuses = fs.listStatus(new 
Path(deltaWriteStats.getFilePath()));
     // Atleast 1 file was written
     assertEquals(1, fileStatuses.length);
@@ -104,7 +104,7 @@ public class TestFileDeltaInputWriter extends 
UtilitiesTestBase {
     assertTrue(fileStatuses[0].getLen() > 0);
     // File length should be the same as the number of bytes written
     assertTrue(deltaWriteStats.getBytesWritten() > 0);
-    List<String> paths = Arrays.asList(fs.globStatus(new Path(dfsBasePath + 
"/*/*.avro")))
+    List<String> paths = Arrays.asList(fs.globStatus(new Path(basePath + 
"/*/*.avro")))
         .stream().map(f -> 
f.getPath().toString()).collect(Collectors.toList());
     JavaRDD<GenericRecord> writtenRecords =
         SparkBasedReader.readAvro(sparkSession, 
schemaProvider.getSourceSchema().toString(), paths, Option.empty(),
@@ -119,7 +119,7 @@ public class TestFileDeltaInputWriter extends 
UtilitiesTestBase {
   public void testAvroFileSinkCreateNewWriter() throws IOException {
     // 1. Create a Avro File Sink Writer
     DeltaInputWriter<GenericRecord> fileSinkWriter =
-        new AvroFileDeltaInputWriter(jsc.hadoopConfiguration(), dfsBasePath,
+        new AvroFileDeltaInputWriter(jsc.hadoopConfiguration(), basePath,
             schemaProvider.getSourceSchema().toString(),
             1024 * 1024L);
     GenericRecordFullPayloadGenerator payloadGenerator =
diff --git 
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/job/TestHoodieTestSuiteJob.java
 
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/job/TestHoodieTestSuiteJob.java
index ddf5b07247..0ce0076f88 100644
--- 
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/job/TestHoodieTestSuiteJob.java
+++ 
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/job/TestHoodieTestSuiteJob.java
@@ -94,30 +94,30 @@ public class TestHoodieTestSuiteJob extends 
UtilitiesTestBase {
 
   @BeforeAll
   public static void initClass() throws Exception {
-    UtilitiesTestBase.initTestServices(true, true);
+    UtilitiesTestBase.initTestServices(true, true, true);
     // prepare the configs.
     
UtilitiesTestBase.Helpers.copyToDFSFromAbsolutePath(System.getProperty("user.dir")
 + "/.."
-        + BASE_PROPERTIES_DOCKER_DEMO_RELATIVE_PATH, dfs, dfsBasePath + 
"/base.properties");
+        + BASE_PROPERTIES_DOCKER_DEMO_RELATIVE_PATH, fs, basePath + 
"/base.properties");
     
UtilitiesTestBase.Helpers.copyToDFSFromAbsolutePath(System.getProperty("user.dir")
 + "/.."
-        + SOURCE_SCHEMA_DOCKER_DEMO_RELATIVE_PATH, dfs, dfsBasePath + 
"/source.avsc");
+        + SOURCE_SCHEMA_DOCKER_DEMO_RELATIVE_PATH, fs, basePath + 
"/source.avsc");
     
UtilitiesTestBase.Helpers.copyToDFSFromAbsolutePath(System.getProperty("user.dir")
 + "/.."
-        + TARGET_SCHEMA_DOCKER_DEMO_RELATIVE_PATH, dfs, dfsBasePath + 
"/target.avsc");
+        + TARGET_SCHEMA_DOCKER_DEMO_RELATIVE_PATH, fs, basePath + 
"/target.avsc");
 
     
UtilitiesTestBase.Helpers.copyToDFSFromAbsolutePath(System.getProperty("user.dir")
 + "/.."
-        + COW_DAG_SOURCE_PATH, dfs, dfsBasePath + "/" + COW_DAG_FILE_NAME);
+        + COW_DAG_SOURCE_PATH, fs, basePath + "/" + COW_DAG_FILE_NAME);
     
UtilitiesTestBase.Helpers.copyToDFSFromAbsolutePath(System.getProperty("user.dir")
 + "/.."
-        + MOR_DAG_SOURCE_PATH, dfs, dfsBasePath + "/" + MOR_DAG_FILE_NAME);
+        + MOR_DAG_SOURCE_PATH, fs, basePath + "/" + MOR_DAG_FILE_NAME);
 
     TypedProperties props = getProperties();
-    UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + 
"/test-source"
+    UtilitiesTestBase.Helpers.savePropsToDFS(props, fs, basePath + 
"/test-source"
         + ".properties");
 
     
UtilitiesTestBase.Helpers.copyToDFSFromAbsolutePath(System.getProperty("user.dir")
 + "/.."
-        + COW_DAG_SPARK_DATASOURCE_NODES_RELATIVE_PATH, dfs, dfsBasePath + "/" 
+ COW_DAG_FILE_NAME_SPARK_DATASOURCE_NODES);
-    UtilitiesTestBase.Helpers.savePropsToDFS(getProperties(), dfs, dfsBasePath 
+ "/test-source"
+        + COW_DAG_SPARK_DATASOURCE_NODES_RELATIVE_PATH, fs, basePath + "/" + 
COW_DAG_FILE_NAME_SPARK_DATASOURCE_NODES);
+    UtilitiesTestBase.Helpers.savePropsToDFS(getProperties(), fs, basePath + 
"/test-source"
         + ".properties");
     
UtilitiesTestBase.Helpers.copyToDFSFromAbsolutePath(System.getProperty("user.dir")
 + "/.."
-        + SPARK_SQL_DAG_SOURCE_PATH, dfs, dfsBasePath + "/" + 
SPARK_SQL_DAG_FILE_NAME);
+        + SPARK_SQL_DAG_SOURCE_PATH, fs, basePath + "/" + 
SPARK_SQL_DAG_FILE_NAME);
 
     // Properties used for the delta-streamer which incrementally pulls from 
upstream DFS Avro source and
     // writes to downstream hudi table
@@ -127,10 +127,10 @@ public class TestHoodieTestSuiteJob extends 
UtilitiesTestBase {
     downstreamProps.setProperty("hoodie.datasource.write.partitionpath.field", 
"timestamp");
 
     // Source schema is the target schema of upstream table
-    
downstreamProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
 dfsBasePath + "/source.avsc");
-    
downstreamProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
 dfsBasePath + "/source.avsc");
-    UtilitiesTestBase.Helpers.savePropsToDFS(downstreamProps, dfs,
-        dfsBasePath + "/test-downstream-source.properties");
+    
downstreamProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
 basePath + "/source.avsc");
+    
downstreamProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
 basePath + "/source.avsc");
+    UtilitiesTestBase.Helpers.savePropsToDFS(downstreamProps, fs,
+        basePath + "/test-downstream-source.properties");
     // these tests cause a lot of log verbosity from spark, turning it down
     Logger.getLogger("org.apache.spark").setLevel(Level.WARN);
   }
@@ -151,8 +151,8 @@ public class TestHoodieTestSuiteJob extends 
UtilitiesTestBase {
   }
 
   private void cleanDFSDirs() throws Exception {
-    dfs.delete(new Path(dfsBasePath + "/input"), true);
-    dfs.delete(new Path(dfsBasePath + "/result"), true);
+    fs.delete(new Path(basePath + "/input"), true);
+    fs.delete(new Path(basePath + "/result"), true);
   }
 
   private static TypedProperties getProperties() {
@@ -161,9 +161,9 @@ public class TestHoodieTestSuiteJob extends 
UtilitiesTestBase {
     props.setProperty("hoodie.datasource.write.partitionpath.field", 
"timestamp");
     props.setProperty("hoodie.deltastreamer.keygen.timebased.timestamp.type", 
"UNIX_TIMESTAMP");
     
props.setProperty("hoodie.deltastreamer.keygen.timebased.output.dateformat", 
"yyyy/MM/dd");
-    
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", 
dfsBasePath + "/source.avsc");
-    
props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", 
dfsBasePath + "/source.avsc");
-    props.setProperty("hoodie.deltastreamer.source.dfs.root", dfsBasePath + 
"/input");
+    
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", 
basePath + "/source.avsc");
+    
props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", 
basePath + "/source.avsc");
+    props.setProperty("hoodie.deltastreamer.source.dfs.root", basePath + 
"/input");
     props.setProperty("hoodie.datasource.hive_sync.assume_date_partitioning", 
"true");
     props.setProperty("hoodie.datasource.hive_sync.skip_ro_suffix", "true");
     props.setProperty("hoodie.datasource.write.keytranslator.class", 
"org.apache.hudi"
@@ -205,8 +205,8 @@ public class TestHoodieTestSuiteJob extends 
UtilitiesTestBase {
   @MethodSource("configParams")
   public void testDagWithInsertUpsertAndValidate(boolean useDeltaStreamer, 
String tableType) throws Exception {
     this.cleanDFSDirs();
-    String inputBasePath = dfsBasePath + "/input/" + 
UUID.randomUUID().toString();
-    String outputBasePath = dfsBasePath + "/result/" + 
UUID.randomUUID().toString();
+    String inputBasePath = basePath + "/input/" + UUID.randomUUID().toString();
+    String outputBasePath = basePath + "/result/" + 
UUID.randomUUID().toString();
     HoodieTestSuiteConfig cfg = makeConfig(inputBasePath, outputBasePath, 
useDeltaStreamer, tableType);
     cfg.workloadDagGenerator = ComplexDagGenerator.class.getName();
     HoodieTestSuiteJob hoodieTestSuiteJob = new HoodieTestSuiteJob(cfg, jsc);
@@ -220,8 +220,8 @@ public class TestHoodieTestSuiteJob extends 
UtilitiesTestBase {
     boolean useDeltaStreamer = false;
     String tableType = "COPY_ON_WRITE";
     this.cleanDFSDirs();
-    String inputBasePath = dfsBasePath + "/input";
-    String outputBasePath = dfsBasePath + "/result";
+    String inputBasePath = basePath + "/input";
+    String outputBasePath = basePath + "/result";
     HoodieTestSuiteConfig cfg = makeConfig(inputBasePath, outputBasePath, 
useDeltaStreamer, tableType);
     if (tableType == HoodieTableType.COPY_ON_WRITE.name()) {
       cfg.workloadDagGenerator = HiveSyncDagGenerator.class.getName();
@@ -238,11 +238,11 @@ public class TestHoodieTestSuiteJob extends 
UtilitiesTestBase {
   public void testCOWFullDagFromYaml() throws Exception {
     boolean useDeltaStreamer = false;
     this.cleanDFSDirs();
-    String inputBasePath = dfsBasePath + "/input";
-    String outputBasePath = dfsBasePath + "/result";
+    String inputBasePath = basePath + "/input";
+    String outputBasePath = basePath + "/result";
     HoodieTestSuiteConfig cfg = makeConfig(inputBasePath, outputBasePath, 
useDeltaStreamer, HoodieTableType
         .COPY_ON_WRITE.name());
-    cfg.workloadYamlPath = dfsBasePath + "/" + COW_DAG_FILE_NAME;
+    cfg.workloadYamlPath = basePath + "/" + COW_DAG_FILE_NAME;
     HoodieTestSuiteJob hoodieTestSuiteJob = new HoodieTestSuiteJob(cfg, jsc);
     hoodieTestSuiteJob.runTestSuite();
     HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(new 
Configuration()).setBasePath(cfg.targetBasePath).build();
@@ -253,11 +253,11 @@ public class TestHoodieTestSuiteJob extends 
UtilitiesTestBase {
   public void testMORFullDagFromYaml() throws Exception {
     boolean useDeltaStreamer = false;
     this.cleanDFSDirs();
-    String inputBasePath = dfsBasePath + "/input";
-    String outputBasePath = dfsBasePath + "/result";
+    String inputBasePath = basePath + "/input";
+    String outputBasePath = basePath + "/result";
     HoodieTestSuiteConfig cfg = makeConfig(inputBasePath, outputBasePath, 
useDeltaStreamer, HoodieTableType
         .MERGE_ON_READ.name());
-    cfg.workloadYamlPath = dfsBasePath + "/" + MOR_DAG_FILE_NAME;
+    cfg.workloadYamlPath = basePath + "/" + MOR_DAG_FILE_NAME;
     HoodieTestSuiteJob hoodieTestSuiteJob = new HoodieTestSuiteJob(cfg, jsc);
     hoodieTestSuiteJob.runTestSuite();
     HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(new 
Configuration()).setBasePath(cfg.targetBasePath).build();
@@ -272,13 +272,13 @@ public class TestHoodieTestSuiteJob extends 
UtilitiesTestBase {
     TypedProperties props = getProperties();
     props.setProperty("hoodie.write.concurrency.mode", 
"optimistic_concurrency_control");
     props.setProperty("hoodie.failed.writes.cleaner.policy", "LAZY");
-    UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + 
"/test-source"
+    UtilitiesTestBase.Helpers.savePropsToDFS(props, fs, basePath + 
"/test-source"
         + ".properties");
-    String inputBasePath = dfsBasePath + "/input";
-    String outputBasePath = dfsBasePath + "/result";
+    String inputBasePath = basePath + "/input";
+    String outputBasePath = basePath + "/result";
     HoodieTestSuiteConfig cfg = makeConfig(inputBasePath, outputBasePath, 
useDeltaStreamer, HoodieTableType
         .COPY_ON_WRITE.name());
-    cfg.workloadYamlPath = dfsBasePath + "/" + 
COW_DAG_FILE_NAME_SPARK_DATASOURCE_NODES;
+    cfg.workloadYamlPath = basePath + "/" + 
COW_DAG_FILE_NAME_SPARK_DATASOURCE_NODES;
     HoodieTestSuiteJob hoodieTestSuiteJob = new HoodieTestSuiteJob(cfg, jsc);
     hoodieTestSuiteJob.runTestSuite();
     HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(new 
Configuration()).setBasePath(cfg.targetBasePath).build();
@@ -289,11 +289,11 @@ public class TestHoodieTestSuiteJob extends 
UtilitiesTestBase {
   public void testSparkSqlDag() throws Exception {
     boolean useDeltaStreamer = false;
     this.cleanDFSDirs();
-    String inputBasePath = dfsBasePath + "/input";
-    String outputBasePath = dfsBasePath + "/result";
+    String inputBasePath = basePath + "/input";
+    String outputBasePath = basePath + "/result";
     HoodieTestSuiteConfig cfg = makeConfig(inputBasePath, outputBasePath, 
useDeltaStreamer, HoodieTableType
         .COPY_ON_WRITE.name());
-    cfg.workloadYamlPath = dfsBasePath + "/" + SPARK_SQL_DAG_FILE_NAME;
+    cfg.workloadYamlPath = basePath + "/" + SPARK_SQL_DAG_FILE_NAME;
     HoodieTestSuiteJob hoodieTestSuiteJob = new HoodieTestSuiteJob(cfg, jsc);
     hoodieTestSuiteJob.runTestSuite();
   }
@@ -307,7 +307,7 @@ public class TestHoodieTestSuiteJob extends 
UtilitiesTestBase {
     cfg.tableType = tableType;
     cfg.sourceClassName = AvroDFSSource.class.getName();
     cfg.sourceOrderingField = SchemaUtils.SOURCE_ORDERING_FIELD;
-    cfg.propsFilePath = dfsBasePath + "/test-source.properties";
+    cfg.propsFilePath = basePath + "/test-source.properties";
     cfg.outputTypeName = DeltaOutputMode.DFS.name();
     cfg.inputFormatName = DeltaInputType.AVRO.name();
     cfg.limitFileSize = 1024 * 1024L;
diff --git 
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/reader/TestDFSAvroDeltaInputReader.java
 
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/reader/TestDFSAvroDeltaInputReader.java
index 9f9439f376..631601fb2c 100644
--- 
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/reader/TestDFSAvroDeltaInputReader.java
+++ 
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/reader/TestDFSAvroDeltaInputReader.java
@@ -43,7 +43,7 @@ public class TestDFSAvroDeltaInputReader extends 
UtilitiesTestBase {
 
   @BeforeAll
   public static void initClass() throws Exception {
-    UtilitiesTestBase.initTestServices(false, false);
+    UtilitiesTestBase.initTestServices(true, false, false);
   }
 
   @AfterAll
@@ -59,12 +59,12 @@ public class TestDFSAvroDeltaInputReader extends 
UtilitiesTestBase {
   @Test
   @Disabled
   public void testDFSSinkReader() throws IOException {
-    FileSystem fs = FSUtils.getFs(dfsBasePath, new Configuration());
+    FileSystem fs = FSUtils.getFs(basePath, new Configuration());
     // Create 10 avro files with 10 records each
-    TestUtils.createAvroFiles(jsc, sparkSession, dfsBasePath, 10, 10);
-    FileStatus[] statuses = fs.globStatus(new Path(dfsBasePath + "/*/*.avro"));
+    TestUtils.createAvroFiles(jsc, sparkSession, basePath, 10, 10);
+    FileStatus[] statuses = fs.globStatus(new Path(basePath + "/*/*.avro"));
     DFSAvroDeltaInputReader reader =
-        new DFSAvroDeltaInputReader(sparkSession, 
TestUtils.getSchema().toString(), dfsBasePath, Option.empty(),
+        new DFSAvroDeltaInputReader(sparkSession, 
TestUtils.getSchema().toString(), basePath, Option.empty(),
             Option.empty());
     assertEquals(reader.analyzeSingleFile(statuses[0].getPath().toString()), 
5);
     assertEquals(reader.read(100).count(), 100);
diff --git 
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/reader/TestDFSHoodieDatasetInputReader.java
 
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/reader/TestDFSHoodieDatasetInputReader.java
index 80f6e2548c..cf2921d41e 100644
--- 
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/reader/TestDFSHoodieDatasetInputReader.java
+++ 
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/reader/TestDFSHoodieDatasetInputReader.java
@@ -51,7 +51,7 @@ public class TestDFSHoodieDatasetInputReader extends 
UtilitiesTestBase {
 
   @BeforeAll
   public static void initClass() throws Exception {
-    UtilitiesTestBase.initTestServices(false, false);
+    UtilitiesTestBase.initTestServices(true, false, false);
   }
 
   @AfterAll
@@ -62,7 +62,7 @@ public class TestDFSHoodieDatasetInputReader extends 
UtilitiesTestBase {
   @BeforeEach
   public void setup() throws Exception {
     super.setup();
-    HoodieTestUtils.init(jsc.hadoopConfiguration(), dfsBasePath);
+    HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath);
   }
 
   @AfterEach
@@ -117,7 +117,7 @@ public class TestDFSHoodieDatasetInputReader extends 
UtilitiesTestBase {
 
   private HoodieWriteConfig.Builder makeHoodieClientConfigBuilder() throws 
Exception {
     // Prepare the AvroParquetIO
-    return HoodieWriteConfig.newBuilder().withPath(dfsBasePath)
+    return HoodieWriteConfig.newBuilder().withPath(basePath)
         .withParallelism(2, 2)
         .withDeleteParallelism(2)
         .withSchema(HoodieTestDataGenerator
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/HoodieDeltaStreamerTestBase.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/HoodieDeltaStreamerTestBase.java
index ad74235ae0..1d85ba0eae 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/HoodieDeltaStreamerTestBase.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/HoodieDeltaStreamerTestBase.java
@@ -104,14 +104,14 @@ public class HoodieDeltaStreamerTestBase extends 
UtilitiesTestBase {
 
   @BeforeAll
   public static void initClass() throws Exception {
-    UtilitiesTestBase.initTestServices(true, true);
-    PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFiles";
-    ORC_SOURCE_ROOT = dfsBasePath + "/orcFiles";
-    JSON_KAFKA_SOURCE_ROOT = dfsBasePath + "/jsonKafkaFiles";
+    UtilitiesTestBase.initTestServices(false, true, true);
+    PARQUET_SOURCE_ROOT = basePath + "/parquetFiles";
+    ORC_SOURCE_ROOT = basePath + "/orcFiles";
+    JSON_KAFKA_SOURCE_ROOT = basePath + "/jsonKafkaFiles";
     testUtils = new KafkaTestUtils();
     testUtils.setup();
     topicName = "topic" + testNum;
-    prepareInitialConfigs(dfs, dfsBasePath, testUtils.brokerAddress());
+    prepareInitialConfigs(fs, basePath, testUtils.brokerAddress());
 
     prepareParquetDFSFiles(PARQUET_NUM_RECORDS, PARQUET_SOURCE_ROOT);
     prepareORCDFSFiles(ORC_NUM_RECORDS, ORC_SOURCE_ROOT);
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
index e1c296617a..5c1365ad4f 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
@@ -250,7 +250,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
       cfg.operation = op;
       cfg.enableHiveSync = enableHiveSync;
       cfg.sourceOrderingField = sourceOrderingField;
-      cfg.propsFilePath = dfsBasePath + "/" + propsFilename;
+      cfg.propsFilePath = UtilitiesTestBase.basePath + "/" + propsFilename;
       cfg.sourceLimit = sourceLimit;
       cfg.checkpoint = checkpoint;
       if (updatePayloadClass) {
@@ -272,7 +272,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
       cfg.sourceClassName = HoodieIncrSource.class.getName();
       cfg.operation = op;
       cfg.sourceOrderingField = "timestamp";
-      cfg.propsFilePath = dfsBasePath + "/test-downstream-source.properties";
+      cfg.propsFilePath = UtilitiesTestBase.basePath + 
"/test-downstream-source.properties";
       cfg.sourceLimit = 1000;
       if (null != schemaProviderClassName) {
         cfg.schemaProviderClassName = schemaProviderClassName;
@@ -448,7 +448,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
   @Test
   public void testProps() {
     TypedProperties props =
-        new DFSPropertiesConfiguration(dfs.getConf(), new Path(dfsBasePath + 
"/" + PROPS_FILENAME_TEST_SOURCE)).getProps();
+        new DFSPropertiesConfiguration(fs.getConf(), new Path(basePath + "/" + 
PROPS_FILENAME_TEST_SOURCE)).getProps();
     assertEquals(2, props.getInteger("hoodie.upsert.shuffle.parallelism"));
     assertEquals("_row_key", 
props.getString("hoodie.datasource.write.recordkey.field"));
     
assertEquals("org.apache.hudi.utilities.functional.TestHoodieDeltaStreamer$TestGenerator",
@@ -561,30 +561,30 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
 
   @Test
   public void testKafkaConnectCheckpointProvider() throws IOException {
-    String tableBasePath = dfsBasePath + "/test_table";
-    String bootstrapPath = dfsBasePath + "/kafka_topic1";
+    String tableBasePath = basePath + "/test_table";
+    String bootstrapPath = basePath + "/kafka_topic1";
     String partitionPath = bootstrapPath + "/year=2016/month=05/day=01";
     String filePath = partitionPath + "/kafka_topic1+0+100+200.parquet";
     String checkpointProviderClass = 
"org.apache.hudi.utilities.checkpointing.KafkaConnectHdfsProvider";
     HoodieDeltaStreamer.Config cfg = 
TestHelpers.makeDropAllConfig(tableBasePath, WriteOperationType.UPSERT);
     TypedProperties props =
-        new DFSPropertiesConfiguration(dfs.getConf(), new Path(dfsBasePath + 
"/" + PROPS_FILENAME_TEST_SOURCE)).getProps();
+        new DFSPropertiesConfiguration(fs.getConf(), new Path(basePath + "/" + 
PROPS_FILENAME_TEST_SOURCE)).getProps();
     props.put("hoodie.deltastreamer.checkpoint.provider.path", bootstrapPath);
     cfg.initialCheckpointProvider = checkpointProviderClass;
     // create regular kafka connect hdfs dirs
-    dfs.mkdirs(new Path(bootstrapPath));
-    dfs.mkdirs(new Path(partitionPath));
+    fs.mkdirs(new Path(bootstrapPath));
+    fs.mkdirs(new Path(partitionPath));
     // generate parquet files using kafka connect naming convention
     HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
     
Helpers.saveParquetToDFS(Helpers.toGenericRecords(dataGenerator.generateInserts("000",
 100)), new Path(filePath));
-    HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(cfg, jsc, dfs, 
hdfsTestService.getHadoopConf(), Option.ofNullable(props));
+    HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(cfg, jsc, fs, 
jsc.hadoopConfiguration(), Option.ofNullable(props));
     assertEquals("kafka_topic1,0:200", deltaStreamer.getConfig().checkpoint);
   }
 
   @Test
   public void testPropsWithInvalidKeyGenerator() throws Exception {
     Exception e = assertThrows(IOException.class, () -> {
-      String tableBasePath = dfsBasePath + "/test_table_invalid_key_gen";
+      String tableBasePath = basePath + "/test_table_invalid_key_gen";
       HoodieDeltaStreamer deltaStreamer =
           new HoodieDeltaStreamer(TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.BULK_INSERT,
               
Collections.singletonList(TripsWithDistanceTransformer.class.getName()), 
PROPS_FILENAME_TEST_INVALID, false), jsc);
@@ -598,9 +598,9 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
   @Test
   public void testTableCreation() throws Exception {
     Exception e = assertThrows(TableNotFoundException.class, () -> {
-      dfs.mkdirs(new Path(dfsBasePath + "/not_a_table"));
+      fs.mkdirs(new Path(basePath + "/not_a_table"));
       HoodieDeltaStreamer deltaStreamer =
-          new HoodieDeltaStreamer(TestHelpers.makeConfig(dfsBasePath + 
"/not_a_table", WriteOperationType.BULK_INSERT), jsc);
+          new HoodieDeltaStreamer(TestHelpers.makeConfig(basePath + 
"/not_a_table", WriteOperationType.BULK_INSERT), jsc);
       deltaStreamer.sync();
     }, "Should error out when pointed out at a dir thats not a table");
     // expected
@@ -609,7 +609,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
 
   @Test
   public void testBulkInsertsAndUpsertsWithBootstrap() throws Exception {
-    String tableBasePath = dfsBasePath + "/test_table";
+    String tableBasePath = basePath + "/test_table";
 
     // Initial bulk insert
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.BULK_INSERT);
@@ -627,7 +627,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     assertEquals(1950, counts.stream().mapToLong(entry -> 
entry.getLong(1)).sum());
 
     // Perform bootstrap with tableBasePath as source
-    String bootstrapSourcePath = dfsBasePath + "/src_bootstrapped";
+    String bootstrapSourcePath = basePath + "/src_bootstrapped";
     Dataset<Row> sourceDf = sqlContext.read()
         .format("org.apache.hudi")
         .load(tableBasePath);
@@ -635,7 +635,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     //  in the value.  Currently it fails the tests due to slash encoding.
     
sourceDf.write().format("parquet").partitionBy("rider").save(bootstrapSourcePath);
 
-    String newDatasetBasePath = dfsBasePath + "/test_dataset_bootstrapped";
+    String newDatasetBasePath = basePath + "/test_dataset_bootstrapped";
     cfg.runBootstrap = true;
     cfg.configs.add(String.format("hoodie.bootstrap.base.path=%s", 
bootstrapSourcePath));
     cfg.configs.add(String.format("%s=%s", 
DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), "rider"));
@@ -661,7 +661,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
 
   @Test
   public void testModifiedTableConfigs() throws Exception {
-    String tableBasePath = dfsBasePath + "/test_table_modified_configs";
+    String tableBasePath = basePath + "/test_table_modified_configs";
 
     // Initial bulk insert
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.BULK_INSERT);
@@ -693,32 +693,32 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     new HoodieDeltaStreamer(cfg, jsc).sync();
     TestHelpers.assertRecordCount(expected, tableBasePath, sqlContext);
     TestHelpers.assertDistanceCount(expected, tableBasePath, sqlContext);
-    TestHelpers.assertCommitMetadata(metadata, tableBasePath, dfs, 
totalCommits);
+    TestHelpers.assertCommitMetadata(metadata, tableBasePath, fs, 
totalCommits);
   }
 
   @ParameterizedTest
   @MethodSource("schemaEvolArgs")
   public void testSchemaEvolution(String tableType, boolean 
useUserProvidedSchema, boolean useSchemaPostProcessor) throws Exception {
-    String tableBasePath = dfsBasePath + "/test_table_schema_evolution" + 
tableType + "_" + useUserProvidedSchema + "_" + useSchemaPostProcessor;
+    String tableBasePath = basePath + "/test_table_schema_evolution" + 
tableType + "_" + useUserProvidedSchema + "_" + useSchemaPostProcessor;
     defaultSchemaProviderClassName = FilebasedSchemaProvider.class.getName();
     // Insert data produced with Schema A, pass Schema A
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.INSERT, 
Collections.singletonList(TestIdentityTransformer.class.getName()),
         PROPS_FILENAME_TEST_SOURCE, false, true, false, null, tableType);
-    cfg.configs.add("hoodie.deltastreamer.schemaprovider.source.schema.file=" 
+ dfsBasePath + "/source.avsc");
-    cfg.configs.add("hoodie.deltastreamer.schemaprovider.target.schema.file=" 
+ dfsBasePath + "/source.avsc");
+    cfg.configs.add("hoodie.deltastreamer.schemaprovider.source.schema.file=" 
+ basePath + "/source.avsc");
+    cfg.configs.add("hoodie.deltastreamer.schemaprovider.target.schema.file=" 
+ basePath + "/source.avsc");
     cfg.configs.add(DataSourceWriteOptions.RECONCILE_SCHEMA().key() + "=true");
     if (!useSchemaPostProcessor) {
       
cfg.configs.add(SparkAvroPostProcessor.Config.SPARK_AVRO_POST_PROCESSOR_PROP_ENABLE
 + "=false");
     }
     new HoodieDeltaStreamer(cfg, jsc).sync();
     TestHelpers.assertRecordCount(1000, tableBasePath, sqlContext);
-    TestHelpers.assertCommitMetadata("00000", tableBasePath, dfs, 1);
+    TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
 
     // Upsert data produced with Schema B, pass Schema B
     cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, 
Collections.singletonList(TripsWithEvolvedOptionalFieldTransformer.class.getName()),
         PROPS_FILENAME_TEST_SOURCE, false, true, false, null, tableType);
-    cfg.configs.add("hoodie.deltastreamer.schemaprovider.source.schema.file=" 
+ dfsBasePath + "/source.avsc");
-    cfg.configs.add("hoodie.deltastreamer.schemaprovider.target.schema.file=" 
+ dfsBasePath + "/source_evolved.avsc");
+    cfg.configs.add("hoodie.deltastreamer.schemaprovider.source.schema.file=" 
+ basePath + "/source.avsc");
+    cfg.configs.add("hoodie.deltastreamer.schemaprovider.target.schema.file=" 
+ basePath + "/source_evolved.avsc");
     cfg.configs.add(DataSourceWriteOptions.RECONCILE_SCHEMA().key() + "=true");
     if (!useSchemaPostProcessor) {
       
cfg.configs.add(SparkAvroPostProcessor.Config.SPARK_AVRO_POST_PROCESSOR_PROP_ENABLE
 + "=false");
@@ -726,7 +726,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     new HoodieDeltaStreamer(cfg, jsc).sync();
     // out of 1000 new records, 500 are inserts, 450 are updates and 50 are 
deletes.
     TestHelpers.assertRecordCount(1450, tableBasePath, sqlContext);
-    TestHelpers.assertCommitMetadata("00001", tableBasePath, dfs, 2);
+    TestHelpers.assertCommitMetadata("00001", tableBasePath, fs, 2);
     List<Row> counts = TestHelpers.countsPerCommit(tableBasePath, sqlContext);
     assertEquals(1450, counts.stream().mapToLong(entry -> 
entry.getLong(1)).sum());
 
@@ -741,9 +741,9 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     }
     cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, 
Collections.singletonList(TestIdentityTransformer.class.getName()),
         PROPS_FILENAME_TEST_SOURCE, false, true, false, null, tableType);
-    cfg.configs.add("hoodie.deltastreamer.schemaprovider.source.schema.file=" 
+ dfsBasePath + "/source.avsc");
+    cfg.configs.add("hoodie.deltastreamer.schemaprovider.source.schema.file=" 
+ basePath + "/source.avsc");
     if (useUserProvidedSchema) {
-      
cfg.configs.add("hoodie.deltastreamer.schemaprovider.target.schema.file=" + 
dfsBasePath + "/source_evolved.avsc");
+      
cfg.configs.add("hoodie.deltastreamer.schemaprovider.target.schema.file=" + 
basePath + "/source_evolved.avsc");
     }
     if (!useSchemaPostProcessor) {
       
cfg.configs.add(SparkAvroPostProcessor.Config.SPARK_AVRO_POST_PROCESSOR_PROP_ENABLE
 + "=false");
@@ -752,15 +752,15 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     new HoodieDeltaStreamer(cfg, jsc).sync();
     // again, 1000 new records, 500 are inserts, 450 are updates and 50 are 
deletes.
     TestHelpers.assertRecordCount(1900, tableBasePath, sqlContext);
-    TestHelpers.assertCommitMetadata("00002", tableBasePath, dfs, 3);
+    TestHelpers.assertCommitMetadata("00002", tableBasePath, fs, 3);
     counts = TestHelpers.countsPerCommit(tableBasePath, sqlContext);
     assertEquals(1900, counts.stream().mapToLong(entry -> 
entry.getLong(1)).sum());
 
-    TableSchemaResolver tableSchemaResolver = new 
TableSchemaResolver(HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(dfs.getConf()).build());
+    TableSchemaResolver tableSchemaResolver = new 
TableSchemaResolver(HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(fs.getConf()).build());
     Schema tableSchema = 
tableSchemaResolver.getTableAvroSchemaWithoutMetadataFields();
     assertNotNull(tableSchema);
 
-    Schema expectedSchema = new Schema.Parser().parse(dfs.open(new 
Path(dfsBasePath + "/source_evolved.avsc")));
+    Schema expectedSchema = new Schema.Parser().parse(fs.open(new 
Path(basePath + "/source_evolved.avsc")));
     if (!useUserProvidedSchema || useSchemaPostProcessor) {
       expectedSchema = AvroConversionUtils.convertStructTypeToAvroSchema(
           AvroConversionUtils.convertAvroSchemaToStructType(expectedSchema), 
HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE);
@@ -768,8 +768,8 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     assertEquals(tableSchema, expectedSchema);
 
     // clean up and reinit
-    
UtilitiesTestBase.Helpers.deleteFileFromDfs(FSUtils.getFs(cfg.targetBasePath, 
jsc.hadoopConfiguration()), dfsBasePath + "/" + PROPS_FILENAME_TEST_SOURCE);
-    writeCommonPropsToFile(dfs, dfsBasePath);
+    
UtilitiesTestBase.Helpers.deleteFileFromDfs(FSUtils.getFs(cfg.targetBasePath, 
jsc.hadoopConfiguration()), basePath + "/" + PROPS_FILENAME_TEST_SOURCE);
+    writeCommonPropsToFile(fs, basePath);
     defaultSchemaProviderClassName = FilebasedSchemaProvider.class.getName();
   }
 
@@ -780,7 +780,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
 
   @Test
   public void testUpsertsCOW_ContinuousModeDisabled() throws Exception {
-    String tableBasePath = dfsBasePath  + "/non_continuous_cow";
+    String tableBasePath = basePath + "/non_continuous_cow";
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.UPSERT);
     cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
     cfg.configs.add(String.format("%s=%s", TURN_METRICS_ON.key(), "true"));
@@ -804,7 +804,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
 
   @Test
   public void testUpsertsMOR_ContinuousModeDisabled() throws Exception {
-    String tableBasePath = dfsBasePath  + "/non_continuous_mor";
+    String tableBasePath = basePath + "/non_continuous_mor";
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.UPSERT);
     cfg.tableType = HoodieTableType.MERGE_ON_READ.name();
     cfg.configs.add(String.format("%s=%s", TURN_METRICS_ON.key(), "true"));
@@ -821,7 +821,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
   }
 
   private void testUpsertsContinuousMode(HoodieTableType tableType, String 
tempDir, boolean testShutdownGracefully) throws Exception {
-    String tableBasePath = dfsBasePath + "/" + tempDir;
+    String tableBasePath = basePath + "/" + tempDir;
     // Keep it higher than batch-size to test continuous mode
     int totalRecords = 3000;
     // Initial bulk insert
@@ -836,10 +836,10 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
     deltaStreamerTestRunner(ds, cfg, (r) -> {
       if (tableType.equals(HoodieTableType.MERGE_ON_READ)) {
-        TestHelpers.assertAtleastNDeltaCommits(5, tableBasePath, dfs);
-        TestHelpers.assertAtleastNCompactionCommits(2, tableBasePath, dfs);
+        TestHelpers.assertAtleastNDeltaCommits(5, tableBasePath, fs);
+        TestHelpers.assertAtleastNCompactionCommits(2, tableBasePath, fs);
       } else {
-        TestHelpers.assertAtleastNCompactionCommits(5, tableBasePath, dfs);
+        TestHelpers.assertAtleastNCompactionCommits(5, tableBasePath, fs);
       }
       TestHelpers.assertRecordCount(totalRecords, tableBasePath, sqlContext);
       TestHelpers.assertDistanceCount(totalRecords, tableBasePath, sqlContext);
@@ -902,7 +902,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
   @ParameterizedTest
   @ValueSource(strings = {"true", "false"})
   public void testInlineClustering(String preserveCommitMetadata) throws 
Exception {
-    String tableBasePath = dfsBasePath + "/inlineClustering";
+    String tableBasePath = basePath + "/inlineClustering";
     // Keep it higher than batch-size to test continuous mode
     int totalRecords = 3000;
 
@@ -913,15 +913,15 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "true", 
"2", "", "", preserveCommitMetadata));
     HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
     deltaStreamerTestRunner(ds, cfg, (r) -> {
-      TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs);
-      TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, dfs);
+      TestHelpers.assertAtLeastNCommits(2, tableBasePath, fs);
+      TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, fs);
       return true;
     });
   }
 
   @Test
   public void testDeltaSyncWithPendingClustering() throws Exception {
-    String tableBasePath = dfsBasePath + "/inlineClusteringPending";
+    String tableBasePath = basePath + "/inlineClusteringPending";
     // ingest data
     int totalRecords = 2000;
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.INSERT);
@@ -930,12 +930,12 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
     ds.sync();
     // assert ingest successful
-    TestHelpers.assertAtLeastNCommits(1, tableBasePath, dfs);
+    TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs);
 
     // schedule a clustering job to build a clustering plan and transition to 
inflight
     HoodieClusteringJob clusteringJob = 
initialHoodieClusteringJob(tableBasePath, null, false, "schedule");
     clusteringJob.cluster(0);
-    HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(dfs.getConf()).setBasePath(tableBasePath).build();
+    HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tableBasePath).build();
     List<HoodieInstant> hoodieClusteringInstants = 
meta.getActiveTimeline().filterPendingReplaceTimeline().getInstants().collect(Collectors.toList());
     HoodieInstant clusteringRequest = hoodieClusteringInstants.get(0);
     
meta.getActiveTimeline().transitionReplaceRequestedToInflight(clusteringRequest,
 Option.empty());
@@ -947,14 +947,14 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     ds2.sync();
     String completeClusteringTimeStamp = 
meta.reloadActiveTimeline().getCompletedReplaceTimeline().lastInstant().get().getTimestamp();
     assertEquals(clusteringRequest.getTimestamp(), 
completeClusteringTimeStamp);
-    TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs);
-    TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, dfs);
+    TestHelpers.assertAtLeastNCommits(2, tableBasePath, fs);
+    TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, fs);
   }
 
   @ParameterizedTest
   @ValueSource(booleans = {true, false})
   public void testCleanerDeleteReplacedDataWithArchive(Boolean asyncClean) 
throws Exception {
-    String tableBasePath = dfsBasePath + 
"/cleanerDeleteReplacedDataWithArchive" + asyncClean;
+    String tableBasePath = basePath + "/cleanerDeleteReplacedDataWithArchive" 
+ asyncClean;
 
     int totalRecords = 3000;
 
@@ -969,15 +969,15 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     cfg.configs.add(String.format("%s=%s", 
HoodieWriteConfig.MARKERS_TYPE.key(), "DIRECT"));
     HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
     deltaStreamerTestRunner(ds, cfg, (r) -> {
-      TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, dfs);
+      TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, fs);
       return true;
     });
 
-    TestHelpers.assertAtLeastNCommits(6, tableBasePath, dfs);
-    TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, dfs);
+    TestHelpers.assertAtLeastNCommits(6, tableBasePath, fs);
+    TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, fs);
 
     // Step 2 : Get the first replacecommit and extract the corresponding 
replaced file IDs.
-    HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(dfs.getConf()).setBasePath(tableBasePath).build();
+    HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tableBasePath).build();
     HoodieTimeline replacedTimeline = 
meta.reloadActiveTimeline().getCompletedReplaceTimeline();
     Option<HoodieInstant> firstReplaceHoodieInstant = 
replacedTimeline.nthFromLastInstant(1);
     assertTrue(firstReplaceHoodieInstant.isPresent());
@@ -1087,7 +1087,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     config.basePath = basePath;
     config.clusteringInstantTime = clusteringInstantTime;
     config.runSchedule = runSchedule;
-    config.propsFilePath = dfsBasePath + "/clusteringjob.properties";
+    config.propsFilePath = UtilitiesTestBase.basePath + 
"/clusteringjob.properties";
     config.runningMode = runningMode;
     if (retryLastFailedClusteringJob != null) {
       config.retryLastFailedClusteringJob = retryLastFailedClusteringJob;
@@ -1104,7 +1104,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     config.basePath = basePath;
     config.tableName = tableName;
     config.indexInstantTime = indexInstantTime;
-    config.propsFilePath = dfsBasePath + "/indexer.properties";
+    config.propsFilePath = UtilitiesTestBase.basePath + "/indexer.properties";
     config.runningMode = runningMode;
     config.indexTypes = indexTypes;
     return config;
@@ -1112,11 +1112,11 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
 
   @Test
   public void testHoodieIndexer() throws Exception {
-    String tableBasePath = dfsBasePath + "/asyncindexer";
+    String tableBasePath = basePath + "/asyncindexer";
     HoodieDeltaStreamer ds = initialHoodieDeltaStreamer(tableBasePath, 1000, 
"false");
 
     deltaStreamerTestRunner(ds, (r) -> {
-      TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs);
+      TestHelpers.assertAtLeastNCommits(2, tableBasePath, fs);
 
       Option<String> scheduleIndexInstantTime = Option.empty();
       try {
@@ -1128,13 +1128,13 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
         return false;
       }
       if (scheduleIndexInstantTime.isPresent()) {
-        TestHelpers.assertPendingIndexCommit(tableBasePath, dfs);
+        TestHelpers.assertPendingIndexCommit(tableBasePath, fs);
         LOG.info("Schedule indexing success, now build index with instant time 
" + scheduleIndexInstantTime.get());
         HoodieIndexer runIndexingJob = new HoodieIndexer(jsc,
             buildIndexerConfig(tableBasePath, ds.getConfig().targetTableName, 
scheduleIndexInstantTime.get(), EXECUTE, "COLUMN_STATS"));
         runIndexingJob.start(0);
         LOG.info("Metadata indexing success");
-        TestHelpers.assertCompletedIndexCommit(tableBasePath, dfs);
+        TestHelpers.assertCompletedIndexCommit(tableBasePath, fs);
       } else {
         LOG.warn("Metadata indexing failed");
       }
@@ -1145,12 +1145,12 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
   @ParameterizedTest
   @ValueSource(booleans = {true, false})
   public void testHoodieAsyncClusteringJob(boolean 
shouldPassInClusteringInstantTime) throws Exception {
-    String tableBasePath = dfsBasePath + "/asyncClusteringJob";
+    String tableBasePath = basePath + "/asyncClusteringJob";
     HoodieDeltaStreamer ds = initialHoodieDeltaStreamer(tableBasePath, 3000, 
"false");
     CountDownLatch countDownLatch = new CountDownLatch(1);
 
     deltaStreamerTestRunner(ds, (r) -> {
-      TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs);
+      TestHelpers.assertAtLeastNCommits(2, tableBasePath, fs);
       countDownLatch.countDown();
       return true;
     });
@@ -1171,7 +1171,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
             shouldPassInClusteringInstantTime ? 
scheduleClusteringInstantTime.get() : null, false);
         HoodieClusteringJob clusterClusteringJob = new 
HoodieClusteringJob(jsc, clusterClusteringConfig);
         clusterClusteringJob.cluster(clusterClusteringConfig.retry);
-        TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, dfs);
+        TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, fs);
         LOG.info("Cluster success");
       } else {
         LOG.warn("Clustering execution failed");
@@ -1184,7 +1184,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
 
   @Test
   public void testAsyncClusteringService() throws Exception {
-    String tableBasePath = dfsBasePath + "/asyncClustering";
+    String tableBasePath = basePath + "/asyncClustering";
     // Keep it higher than batch-size to test continuous mode
     int totalRecords = 2000;
 
@@ -1195,12 +1195,12 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "", 
"true", "3"));
     HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
     deltaStreamerTestRunner(ds, cfg, (r) -> {
-      TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, dfs);
+      TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, fs);
       return true;
     });
     // There should be 4 commits, one of which should be a replace commit
-    TestHelpers.assertAtLeastNCommits(4, tableBasePath, dfs);
-    TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, dfs);
+    TestHelpers.assertAtLeastNCommits(4, tableBasePath, fs);
+    TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, fs);
     TestHelpers.assertDistinctRecordCount(totalRecords, tableBasePath, 
sqlContext);
   }
 
@@ -1212,7 +1212,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
    */
   @Test
   public void testAsyncClusteringServiceWithConflicts() throws Exception {
-    String tableBasePath = dfsBasePath + "/asyncClusteringWithConflicts";
+    String tableBasePath = basePath + "/asyncClusteringWithConflicts";
     // Keep it higher than batch-size to test continuous mode
     int totalRecords = 2000;
 
@@ -1223,18 +1223,18 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "", 
"true", "3"));
     HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
     deltaStreamerTestRunner(ds, cfg, (r) -> {
-      TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, dfs);
+      TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, fs);
       return true;
     });
     // There should be 4 commits, one of which should be a replace commit
-    TestHelpers.assertAtLeastNCommits(4, tableBasePath, dfs);
-    TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, dfs);
+    TestHelpers.assertAtLeastNCommits(4, tableBasePath, fs);
+    TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, fs);
     TestHelpers.assertDistinctRecordCount(1900, tableBasePath, sqlContext);
   }
 
   @Test
   public void testAsyncClusteringServiceWithCompaction() throws Exception {
-    String tableBasePath = dfsBasePath + "/asyncClusteringCompaction";
+    String tableBasePath = basePath + "/asyncClusteringCompaction";
     // Keep it higher than batch-size to test continuous mode
     int totalRecords = 2000;
 
@@ -1245,20 +1245,20 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "", 
"true", "3"));
     HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
     deltaStreamerTestRunner(ds, cfg, (r) -> {
-      TestHelpers.assertAtleastNCompactionCommits(2, tableBasePath, dfs);
-      TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, dfs);
+      TestHelpers.assertAtleastNCompactionCommits(2, tableBasePath, fs);
+      TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, fs);
       return true;
     });
     // There should be 4 commits, one of which should be a replace commit
-    TestHelpers.assertAtLeastNCommits(4, tableBasePath, dfs);
-    TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, dfs);
+    TestHelpers.assertAtLeastNCommits(4, tableBasePath, fs);
+    TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, fs);
     TestHelpers.assertDistinctRecordCount(totalRecords, tableBasePath, 
sqlContext);
   }
 
   @ParameterizedTest
   @ValueSource(booleans = {true, false})
   public void testAsyncClusteringJobWithRetry(boolean 
retryLastFailedClusteringJob) throws Exception {
-    String tableBasePath = dfsBasePath + "/asyncClustering3";
+    String tableBasePath = basePath + "/asyncClustering3";
 
     // ingest data
     int totalRecords = 3000;
@@ -1270,7 +1270,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     ds.sync();
 
     // assert ingest successful
-    TestHelpers.assertAtLeastNCommits(1, tableBasePath, dfs);
+    TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs);
 
     // schedule a clustering job to build a clustering plan
     HoodieClusteringJob schedule = initialHoodieClusteringJob(tableBasePath, 
null, false, "schedule");
@@ -1281,7 +1281,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     ds2.sync();
 
     // convert clustering request into inflight, Simulate the last clustering 
failed scenario
-    HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(dfs.getConf()).setBasePath(tableBasePath).build();
+    HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tableBasePath).build();
     List<HoodieInstant> hoodieClusteringInstants = 
meta.getActiveTimeline().filterPendingReplaceTimeline().getInstants().collect(Collectors.toList());
     HoodieInstant clusteringRequest = hoodieClusteringInstants.get(0);
     HoodieInstant hoodieInflightInstant = 
meta.getActiveTimeline().transitionReplaceRequestedToInflight(clusteringRequest,
 Option.empty());
@@ -1304,13 +1304,13 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
   @ParameterizedTest
   @ValueSource(strings = {"execute", "schedule", "scheduleAndExecute"})
   public void testHoodieAsyncClusteringJobWithScheduleAndExecute(String 
runningMode) throws Exception {
-    String tableBasePath = dfsBasePath + "/asyncClustering2";
+    String tableBasePath = basePath + "/asyncClustering2";
     HoodieDeltaStreamer ds = initialHoodieDeltaStreamer(tableBasePath, 3000, 
"false");
     HoodieClusteringJob scheduleClusteringJob = 
initialHoodieClusteringJob(tableBasePath, null, true, runningMode);
 
     deltaStreamerTestRunner(ds, (r) -> {
       Exception exception = null;
-      TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs);
+      TestHelpers.assertAtLeastNCommits(2, tableBasePath, fs);
       try {
         int result = scheduleClusteringJob.cluster(0);
         if (result == 0) {
@@ -1330,16 +1330,16 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
       }
       switch (runningMode.toLowerCase()) {
         case SCHEDULE_AND_EXECUTE: {
-          TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, dfs);
+          TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, fs);
           return true;
         }
         case SCHEDULE: {
-          TestHelpers.assertAtLeastNReplaceRequests(2, tableBasePath, dfs);
-          TestHelpers.assertNoReplaceCommits(tableBasePath, dfs);
+          TestHelpers.assertAtLeastNReplaceRequests(2, tableBasePath, fs);
+          TestHelpers.assertNoReplaceCommits(tableBasePath, fs);
           return true;
         }
         case EXECUTE: {
-          TestHelpers.assertNoReplaceCommits(tableBasePath, dfs);
+          TestHelpers.assertNoReplaceCommits(tableBasePath, fs);
           return true;
         }
         default:
@@ -1355,37 +1355,37 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
    */
   @Test
   public void 
testBulkInsertsAndUpsertsWithSQLBasedTransformerFor2StepPipeline() throws 
Exception {
-    String tableBasePath = dfsBasePath + "/test_table2";
-    String downstreamTableBasePath = dfsBasePath + "/test_downstream_table2";
+    String tableBasePath = basePath + "/test_table2";
+    String downstreamTableBasePath = basePath + "/test_downstream_table2";
 
     // Initial bulk insert to ingest to first hudi table
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.BULK_INSERT,
         Collections.singletonList(SqlQueryBasedTransformer.class.getName()), 
PROPS_FILENAME_TEST_SOURCE, true);
     // NOTE: We should not have need to set below config, 'datestr' should 
have assumed date partitioning
     
cfg.configs.add("hoodie.datasource.hive_sync.partition_fields=year,month,day");
-    new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync();
+    new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync();
     TestHelpers.assertRecordCount(1000, tableBasePath, sqlContext);
     TestHelpers.assertDistanceCount(1000, tableBasePath, sqlContext);
     TestHelpers.assertDistanceCountWithExactValue(1000, tableBasePath, 
sqlContext);
-    String lastInstantForUpstreamTable = 
TestHelpers.assertCommitMetadata("00000", tableBasePath, dfs, 1);
+    String lastInstantForUpstreamTable = 
TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
 
     // Now incrementally pull from the above hudi table and ingest to second 
table
     HoodieDeltaStreamer.Config downstreamCfg =
         TestHelpers.makeConfigForHudiIncrSrc(tableBasePath, 
downstreamTableBasePath, WriteOperationType.BULK_INSERT,
             true, null);
-    new HoodieDeltaStreamer(downstreamCfg, jsc, dfs, 
hiveServer.getHiveConf()).sync();
+    new HoodieDeltaStreamer(downstreamCfg, jsc, fs, 
hiveServer.getHiveConf()).sync();
     TestHelpers.assertRecordCount(1000, downstreamTableBasePath, sqlContext);
     TestHelpers.assertDistanceCount(1000, downstreamTableBasePath, sqlContext);
     TestHelpers.assertDistanceCountWithExactValue(1000, 
downstreamTableBasePath, sqlContext);
-    TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable, 
downstreamTableBasePath, dfs, 1);
+    TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable, 
downstreamTableBasePath, fs, 1);
 
     // No new data => no commits for upstream table
     cfg.sourceLimit = 0;
-    new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync();
+    new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync();
     TestHelpers.assertRecordCount(1000, tableBasePath, sqlContext);
     TestHelpers.assertDistanceCount(1000, tableBasePath, sqlContext);
     TestHelpers.assertDistanceCountWithExactValue(1000, tableBasePath, 
sqlContext);
-    TestHelpers.assertCommitMetadata("00000", tableBasePath, dfs, 1);
+    TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
 
     // with no change in upstream table, no change in downstream too when 
pulled.
     HoodieDeltaStreamer.Config downstreamCfg1 =
@@ -1395,16 +1395,16 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     TestHelpers.assertRecordCount(1000, downstreamTableBasePath, sqlContext);
     TestHelpers.assertDistanceCount(1000, downstreamTableBasePath, sqlContext);
     TestHelpers.assertDistanceCountWithExactValue(1000, 
downstreamTableBasePath, sqlContext);
-    TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable, 
downstreamTableBasePath, dfs, 1);
+    TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable, 
downstreamTableBasePath, fs, 1);
 
     // upsert() #1 on upstream hudi table
     cfg.sourceLimit = 2000;
     cfg.operation = WriteOperationType.UPSERT;
-    new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync();
+    new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync();
     TestHelpers.assertRecordCount(1950, tableBasePath, sqlContext);
     TestHelpers.assertDistanceCount(1950, tableBasePath, sqlContext);
     TestHelpers.assertDistanceCountWithExactValue(1950, tableBasePath, 
sqlContext);
-    lastInstantForUpstreamTable = TestHelpers.assertCommitMetadata("00001", 
tableBasePath, dfs, 2);
+    lastInstantForUpstreamTable = TestHelpers.assertCommitMetadata("00001", 
tableBasePath, fs, 2);
     List<Row> counts = TestHelpers.countsPerCommit(tableBasePath, sqlContext);
     assertEquals(1950, counts.stream().mapToLong(entry -> 
entry.getLong(1)).sum());
 
@@ -1418,7 +1418,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     TestHelpers.assertDistanceCount(2000, downstreamTableBasePath, sqlContext);
     TestHelpers.assertDistanceCountWithExactValue(2000, 
downstreamTableBasePath, sqlContext);
     String finalInstant =
-        TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable, 
downstreamTableBasePath, dfs, 2);
+        TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable, 
downstreamTableBasePath, fs, 2);
     counts = TestHelpers.countsPerCommit(downstreamTableBasePath, sqlContext);
     assertEquals(2000, counts.stream().mapToLong(entry -> 
entry.getLong(1)).sum());
 
@@ -1438,12 +1438,12 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
 
   @Test
   public void testNullSchemaProvider() throws Exception {
-    String tableBasePath = dfsBasePath + "/test_table";
+    String tableBasePath = basePath + "/test_table";
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.BULK_INSERT,
         Collections.singletonList(SqlQueryBasedTransformer.class.getName()), 
PROPS_FILENAME_TEST_SOURCE, true,
         false, false, null, null);
     Exception e = assertThrows(HoodieException.class, () -> {
-      new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync();
+      new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync();
     }, "Should error out when schema provider is not provided");
     LOG.debug("Expected error during reading data from source ", e);
     assertTrue(e.getMessage().contains("Please provide a valid schema provider 
class!"));
@@ -1451,18 +1451,18 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
 
   @Test
   public void testPayloadClassUpdate() throws Exception {
-    String dataSetBasePath = dfsBasePath + 
"/test_dataset_mor_payload_class_update";
+    String dataSetBasePath = basePath + 
"/test_dataset_mor_payload_class_update";
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(dataSetBasePath, 
WriteOperationType.BULK_INSERT,
         Collections.singletonList(SqlQueryBasedTransformer.class.getName()), 
PROPS_FILENAME_TEST_SOURCE, false,
         true, false, null, "MERGE_ON_READ");
-    new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync();
+    new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync();
     TestHelpers.assertRecordCount(1000, dataSetBasePath, sqlContext);
 
     //now create one more deltaStreamer instance and update payload class
     cfg = TestHelpers.makeConfig(dataSetBasePath, 
WriteOperationType.BULK_INSERT,
         Collections.singletonList(SqlQueryBasedTransformer.class.getName()), 
PROPS_FILENAME_TEST_SOURCE, false,
         true, true, DummyAvroPayload.class.getName(), "MERGE_ON_READ");
-    new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf());
+    new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf());
 
     //now assert that hoodie.properties file now has updated payload class name
     Properties props = new Properties();
@@ -1477,11 +1477,11 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
 
   @Test
   public void testPartialPayloadClass() throws Exception {
-    String dataSetBasePath = dfsBasePath + "/test_dataset_mor";
+    String dataSetBasePath = basePath + "/test_dataset_mor";
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(dataSetBasePath, 
WriteOperationType.BULK_INSERT,
           Collections.singletonList(SqlQueryBasedTransformer.class.getName()), 
PROPS_FILENAME_TEST_SOURCE, false,
           true, true, PartialUpdateAvroPayload.class.getName(), 
"MERGE_ON_READ");
-    new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync();
+    new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync();
     TestHelpers.assertRecordCount(1000, dataSetBasePath, sqlContext);
 
     //now assert that hoodie.properties file now has updated payload class name
@@ -1496,18 +1496,18 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
 
   @Test
   public void testPayloadClassUpdateWithCOWTable() throws Exception {
-    String dataSetBasePath = dfsBasePath + "/test_dataset_cow";
+    String dataSetBasePath = basePath + "/test_dataset_cow";
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(dataSetBasePath, 
WriteOperationType.BULK_INSERT,
         Collections.singletonList(SqlQueryBasedTransformer.class.getName()), 
PROPS_FILENAME_TEST_SOURCE, false,
         true, false, null, null);
-    new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync();
+    new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync();
     TestHelpers.assertRecordCount(1000, dataSetBasePath, sqlContext);
 
     //now create one more deltaStreamer instance and update payload class
     cfg = TestHelpers.makeConfig(dataSetBasePath, 
WriteOperationType.BULK_INSERT,
         Collections.singletonList(SqlQueryBasedTransformer.class.getName()), 
PROPS_FILENAME_TEST_SOURCE, false,
         true, true, DummyAvroPayload.class.getName(), null);
-    new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf());
+    new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf());
 
     //now assert that hoodie.properties file does not have payload class prop 
since it is a COW table
     Properties props = new Properties();
@@ -1522,13 +1522,13 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
 
   @Test
   public void testFilterDupes() throws Exception {
-    String tableBasePath = dfsBasePath + "/test_dupes_table";
+    String tableBasePath = basePath + "/test_dupes_table";
 
     // Initial bulk insert
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.BULK_INSERT);
     new HoodieDeltaStreamer(cfg, jsc).sync();
     TestHelpers.assertRecordCount(1000, tableBasePath, sqlContext);
-    TestHelpers.assertCommitMetadata("00000", tableBasePath, dfs, 1);
+    TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
 
     // Generate the same 1000 records + 1000 new ones for upsert
     cfg.filterDupes = true;
@@ -1536,7 +1536,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     cfg.operation = WriteOperationType.INSERT;
     new HoodieDeltaStreamer(cfg, jsc).sync();
     TestHelpers.assertRecordCount(2000, tableBasePath, sqlContext);
-    TestHelpers.assertCommitMetadata("00001", tableBasePath, dfs, 2);
+    TestHelpers.assertCommitMetadata("00001", tableBasePath, fs, 2);
     // 1000 records for commit 00000 & 1000 for commit 00001
     List<Row> counts = TestHelpers.countsPerCommit(tableBasePath, sqlContext);
     assertEquals(1000, counts.get(0).getLong(1));
@@ -1621,7 +1621,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     TypedProperties parquetProps = new TypedProperties();
 
     if (addCommonProps) {
-      populateCommonProps(parquetProps, dfsBasePath);
+      populateCommonProps(parquetProps, basePath);
     }
 
     parquetProps.setProperty("hoodie.datasource.write.keygenerator.class", 
TestHoodieDeltaStreamer.TestGenerator.class.getName());
@@ -1631,16 +1631,16 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     parquetProps.setProperty("hoodie.datasource.write.recordkey.field", 
"_row_key");
     parquetProps.setProperty("hoodie.datasource.write.partitionpath.field", 
partitionPath);
     if (useSchemaProvider) {
-      
parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
 dfsBasePath + "/" + sourceSchemaFile);
+      
parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
 basePath + "/" + sourceSchemaFile);
       if (hasTransformer) {
-        
parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
 dfsBasePath + "/" + targetSchemaFile);
+        
parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
 basePath + "/" + targetSchemaFile);
       }
     }
     parquetProps.setProperty("hoodie.deltastreamer.source.dfs.root", 
parquetSourceRoot);
     if (!StringUtils.isNullOrEmpty(emptyBatchParam)) {
       
parquetProps.setProperty(TestParquetDFSSourceEmptyBatch.RETURN_EMPTY_BATCH, 
emptyBatchParam);
     }
-    UtilitiesTestBase.Helpers.savePropsToDFS(parquetProps, dfs, dfsBasePath + 
"/" + propsFileName);
+    UtilitiesTestBase.Helpers.savePropsToDFS(parquetProps, fs, basePath + "/" 
+ propsFileName);
   }
 
   private void testParquetDFSSource(boolean useSchemaProvider, List<String> 
transformerClassNames) throws Exception {
@@ -1648,14 +1648,14 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
   }
 
   private void testParquetDFSSource(boolean useSchemaProvider, List<String> 
transformerClassNames, boolean testEmptyBatch) throws Exception {
-    PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFilesDfs" + testNum;
+    PARQUET_SOURCE_ROOT = basePath + "/parquetFilesDfs" + testNum;
     int parquetRecordsCount = 10;
     boolean hasTransformer = transformerClassNames != null && 
!transformerClassNames.isEmpty();
     prepareParquetDFSFiles(parquetRecordsCount, PARQUET_SOURCE_ROOT, 
FIRST_PARQUET_FILE_NAME, false, null, null);
     prepareParquetDFSSource(useSchemaProvider, hasTransformer, "source.avsc", 
"target.avsc", PROPS_FILENAME_TEST_PARQUET,
         PARQUET_SOURCE_ROOT, false, "partition_path", testEmptyBatch ? "1" : 
"");
 
-    String tableBasePath = dfsBasePath + "/test_parquet_table" + testNum;
+    String tableBasePath = basePath + "/test_parquet_table" + testNum;
     HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
         TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, 
testEmptyBatch ? TestParquetDFSSourceEmptyBatch.class.getName()
                 : ParquetDFSSource.class.getName(),
@@ -1706,15 +1706,15 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     orcProps.setProperty("hoodie.datasource.write.recordkey.field", 
"_row_key");
     orcProps.setProperty("hoodie.datasource.write.partitionpath.field", 
"partition_path");
     if (useSchemaProvider) {
-      
orcProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", 
dfsBasePath + "/" + "source.avsc");
+      
orcProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", 
basePath + "/" + "source.avsc");
       if (transformerClassNames != null) {
-        
orcProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", 
dfsBasePath + "/" + "target.avsc");
+        
orcProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", 
basePath + "/" + "target.avsc");
       }
     }
     orcProps.setProperty("hoodie.deltastreamer.source.dfs.root", 
ORC_SOURCE_ROOT);
-    UtilitiesTestBase.Helpers.savePropsToDFS(orcProps, dfs, dfsBasePath + "/" 
+ PROPS_FILENAME_TEST_ORC);
+    UtilitiesTestBase.Helpers.savePropsToDFS(orcProps, fs, basePath + "/" + 
PROPS_FILENAME_TEST_ORC);
 
-    String tableBasePath = dfsBasePath + "/test_orc_source_table" + testNum;
+    String tableBasePath = basePath + "/test_orc_source_table" + testNum;
     HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
         TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, 
ORCDFSSource.class.getName(),
             transformerClassNames, PROPS_FILENAME_TEST_ORC, false,
@@ -1727,7 +1727,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
   private void prepareJsonKafkaDFSSource(String propsFileName, String 
autoResetValue, String topicName) throws IOException {
     // Properties used for testing delta-streamer with JsonKafka source
     TypedProperties props = new TypedProperties();
-    populateAllCommonProps(props, dfsBasePath, testUtils.brokerAddress());
+    populateAllCommonProps(props, basePath, testUtils.brokerAddress());
     props.setProperty("include", "base.properties");
     props.setProperty("hoodie.embed.timeline.server", "false");
     props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
@@ -1735,11 +1735,11 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     props.setProperty("hoodie.deltastreamer.source.dfs.root", 
JSON_KAFKA_SOURCE_ROOT);
     props.setProperty("hoodie.deltastreamer.source.kafka.topic", topicName);
     props.setProperty("hoodie.deltastreamer.source.kafka.checkpoint.type", 
kafkaCheckpointType);
-    
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", 
dfsBasePath + "/source_uber.avsc");
-    
props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", 
dfsBasePath + "/target_uber.avsc");
+    
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", 
basePath + "/source_uber.avsc");
+    
props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", 
basePath + "/target_uber.avsc");
     props.setProperty("auto.offset.reset", autoResetValue);
 
-    UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/" + 
propsFileName);
+    UtilitiesTestBase.Helpers.savePropsToDFS(props, fs, basePath + "/" + 
propsFileName);
   }
 
   /**
@@ -1750,14 +1750,14 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
    */
   private void testDeltaStreamerTransitionFromParquetToKafkaSource(boolean 
autoResetToLatest) throws Exception {
     // prep parquet source
-    PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFilesDfsToKafka" + testNum;
+    PARQUET_SOURCE_ROOT = basePath + "/parquetFilesDfsToKafka" + testNum;
     int parquetRecords = 10;
     prepareParquetDFSFiles(parquetRecords, PARQUET_SOURCE_ROOT, 
FIRST_PARQUET_FILE_NAME, true, HoodieTestDataGenerator.TRIP_SCHEMA, 
HoodieTestDataGenerator.AVRO_TRIP_SCHEMA);
 
     prepareParquetDFSSource(true, false, "source_uber.avsc", 
"target_uber.avsc", PROPS_FILENAME_TEST_PARQUET,
         PARQUET_SOURCE_ROOT, false, "driver");
     // delta streamer w/ parquet source
-    String tableBasePath = dfsBasePath + "/test_dfs_to_kafka" + testNum;
+    String tableBasePath = basePath + "/test_dfs_to_kafka" + testNum;
     HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
         TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, 
ParquetDFSSource.class.getName(),
             Collections.emptyList(), PROPS_FILENAME_TEST_PARQUET, false,
@@ -1793,7 +1793,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     topicName = "topic" + testNum;
     prepareJsonKafkaDFSFiles(JSON_KAFKA_NUM_RECORDS, true, topicName);
     prepareJsonKafkaDFSSource(PROPS_FILENAME_TEST_JSON_KAFKA, "earliest", 
topicName);
-    String tableBasePath = dfsBasePath + "/test_json_kafka_table" + testNum;
+    String tableBasePath = basePath + "/test_json_kafka_table" + testNum;
     HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
         TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, 
JsonKafkaSource.class.getName(),
             Collections.emptyList(), PROPS_FILENAME_TEST_JSON_KAFKA, false,
@@ -1815,7 +1815,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     kafkaCheckpointType = "timestamp";
     prepareJsonKafkaDFSFiles(JSON_KAFKA_NUM_RECORDS, true, topicName);
     prepareJsonKafkaDFSSource(PROPS_FILENAME_TEST_JSON_KAFKA, "earliest", 
topicName);
-    String tableBasePath = dfsBasePath + "/test_json_kafka_table" + testNum;
+    String tableBasePath = basePath + "/test_json_kafka_table" + testNum;
     HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
         TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, 
JsonKafkaSource.class.getName(),
             Collections.emptyList(), PROPS_FILENAME_TEST_JSON_KAFKA, false,
@@ -1865,7 +1865,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
   }
 
   private void testDeltaStreamerRestartAfterMissingHoodieProps(boolean 
testInitFailure) throws Exception {
-    PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFilesDfs" + testNum;
+    PARQUET_SOURCE_ROOT = basePath + "/parquetFilesDfs" + testNum;
     int parquetRecordsCount = 10;
     boolean hasTransformer = false;
     boolean useSchemaProvider = false;
@@ -1873,7 +1873,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     prepareParquetDFSSource(useSchemaProvider, hasTransformer, "source.avsc", 
"target.avsc", PROPS_FILENAME_TEST_PARQUET,
         PARQUET_SOURCE_ROOT, false, "partition_path", "0");
 
-    String tableBasePath = dfsBasePath + "/test_parquet_table" + testNum;
+    String tableBasePath = basePath + "/test_parquet_table" + testNum;
     HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
         TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, 
testInitFailure ? TestParquetDFSSourceEmptyBatch.class.getName() : 
ParquetDFSSource.class.getName(),
             null, PROPS_FILENAME_TEST_PARQUET, false,
@@ -1881,17 +1881,17 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     deltaStreamer.sync();
 
     if (testInitFailure) {
-      FileStatus[] fileStatuses = dfs.listStatus(new Path(tableBasePath + 
"/.hoodie/"));
+      FileStatus[] fileStatuses = fs.listStatus(new Path(tableBasePath + 
"/.hoodie/"));
       Arrays.stream(fileStatuses).filter(entry -> 
entry.getPath().getName().contains("commit") || 
entry.getPath().getName().contains("inflight")).forEach(entry -> {
         try {
-          dfs.delete(entry.getPath());
+          fs.delete(entry.getPath());
         } catch (IOException e) {
           e.printStackTrace();
         }
       });
     }
     // delete hoodie.properties
-    dfs.delete(new Path(tableBasePath + "/.hoodie/hoodie.properties"));
+    fs.delete(new Path(tableBasePath + "/.hoodie/hoodie.properties"));
 
     // restart the pipeline.
     if (testInitFailure) { // should succeed.
@@ -1937,7 +1937,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
 
   private void prepareCsvDFSSource(
       boolean hasHeader, char sep, boolean useSchemaProvider, boolean 
hasTransformer) throws IOException {
-    String sourceRoot = dfsBasePath + "/csvFiles";
+    String sourceRoot = basePath + "/csvFiles";
     String recordKeyField = (hasHeader || useSchemaProvider) ? "_row_key" : 
"_c1";
     String partitionPath = (hasHeader || useSchemaProvider) ? "partition_path" 
: "_c2";
 
@@ -1947,9 +1947,9 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     csvProps.setProperty("hoodie.datasource.write.recordkey.field", 
recordKeyField);
     csvProps.setProperty("hoodie.datasource.write.partitionpath.field", 
partitionPath);
     if (useSchemaProvider) {
-      
csvProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", 
dfsBasePath + "/source-flattened.avsc");
+      
csvProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", 
basePath + "/source-flattened.avsc");
       if (hasTransformer) {
-        
csvProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", 
dfsBasePath + "/target-flattened.avsc");
+        
csvProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", 
basePath + "/target-flattened.avsc");
       }
     }
     csvProps.setProperty("hoodie.deltastreamer.source.dfs.root", sourceRoot);
@@ -1965,20 +1965,20 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
       csvProps.setProperty("hoodie.deltastreamer.csv.header", 
Boolean.toString(hasHeader));
     }
 
-    UtilitiesTestBase.Helpers.savePropsToDFS(csvProps, dfs, dfsBasePath + "/" 
+ PROPS_FILENAME_TEST_CSV);
+    UtilitiesTestBase.Helpers.savePropsToDFS(csvProps, fs, basePath + "/" + 
PROPS_FILENAME_TEST_CSV);
 
     String path = sourceRoot + "/1.csv";
     HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
     UtilitiesTestBase.Helpers.saveCsvToDFS(
         hasHeader, sep,
         Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 
CSV_NUM_RECORDS, true)),
-        dfs, path);
+        fs, path);
   }
 
   private void testCsvDFSSource(
       boolean hasHeader, char sep, boolean useSchemaProvider, List<String> 
transformerClassNames) throws Exception {
     prepareCsvDFSSource(hasHeader, sep, useSchemaProvider, 
transformerClassNames != null);
-    String tableBasePath = dfsBasePath + "/test_csv_table" + testNum;
+    String tableBasePath = basePath + "/test_csv_table" + testNum;
     String sourceOrderingField = (hasHeader || useSchemaProvider) ? 
"timestamp" : "_c0";
     HoodieDeltaStreamer deltaStreamer =
         new HoodieDeltaStreamer(TestHelpers.makeConfig(
@@ -2075,7 +2075,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
   }
 
   private void prepareSqlSource() throws IOException {
-    String sourceRoot = dfsBasePath + "sqlSourceFiles";
+    String sourceRoot = basePath + "sqlSourceFiles";
     TypedProperties sqlSourceProps = new TypedProperties();
     sqlSourceProps.setProperty("include", "base.properties");
     sqlSourceProps.setProperty("hoodie.embed.timeline.server", "false");
@@ -2083,7 +2083,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     sqlSourceProps.setProperty("hoodie.datasource.write.partitionpath.field", 
"partition_path");
     
sqlSourceProps.setProperty("hoodie.deltastreamer.source.sql.sql.query","select 
* from test_sql_table");
 
-    UtilitiesTestBase.Helpers.savePropsToDFS(sqlSourceProps, dfs, dfsBasePath 
+ "/" + PROPS_FILENAME_TEST_SQL_SOURCE);
+    UtilitiesTestBase.Helpers.savePropsToDFS(sqlSourceProps, fs, basePath + 
"/" + PROPS_FILENAME_TEST_SQL_SOURCE);
 
     // Data generation
     HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
@@ -2099,7 +2099,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
   @Test
   public void testSqlSourceSource() throws Exception {
     prepareSqlSource();
-    String tableBasePath = dfsBasePath + "/test_sql_source_table" + testNum++;
+    String tableBasePath = basePath + "/test_sql_source_table" + testNum++;
     HoodieDeltaStreamer deltaStreamer =
         new HoodieDeltaStreamer(TestHelpers.makeConfig(
             tableBasePath, WriteOperationType.INSERT, 
SqlSource.class.getName(),
@@ -2126,11 +2126,11 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
       props.setProperty("hoodie.datasource.write.recordkey.field", "ID");
       props.setProperty("hoodie.datasource.write.partitionpath.field", 
"partition_path");
 
-      UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + 
"/test-jdbc-source.properties");
+      UtilitiesTestBase.Helpers.savePropsToDFS(props, fs, basePath + 
"/test-jdbc-source.properties");
 
       int numRecords = 1000;
       int sourceLimit = 100;
-      String tableBasePath = dfsBasePath + "/triprec";
+      String tableBasePath = basePath + "/triprec";
       HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.INSERT, JdbcSource.class.getName(),
           null, "test-jdbc-source.properties", false,
           false, sourceLimit, false, null, null, "timestamp", null);
@@ -2140,7 +2140,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
 
       HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(cfg, jsc);
       deltaStreamerTestRunner(deltaStreamer, cfg, (r) -> {
-        TestHelpers.assertAtleastNCompactionCommits(numRecords / sourceLimit + 
((numRecords % sourceLimit == 0) ? 0 : 1), tableBasePath, dfs);
+        TestHelpers.assertAtleastNCompactionCommits(numRecords / sourceLimit + 
((numRecords % sourceLimit == 0) ? 0 : 1), tableBasePath, fs);
         TestHelpers.assertRecordCount(numRecords, tableBasePath, sqlContext);
         return true;
       });
@@ -2151,8 +2151,8 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
 
   @Test
   public void testHoodieIncrFallback() throws Exception {
-    String tableBasePath = dfsBasePath + "/incr_test_table";
-    String downstreamTableBasePath = dfsBasePath + 
"/incr_test_downstream_table";
+    String tableBasePath = basePath + "/incr_test_table";
+    String downstreamTableBasePath = basePath + "/incr_test_downstream_table";
 
     insertInTable(tableBasePath, 1, WriteOperationType.BULK_INSERT);
     HoodieDeltaStreamer.Config downstreamCfg =
@@ -2202,12 +2202,12 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
 
   @Test
   public void testInsertOverwrite() throws Exception {
-    testDeltaStreamerWithSpecifiedOperation(dfsBasePath + "/insert_overwrite", 
WriteOperationType.INSERT_OVERWRITE);
+    testDeltaStreamerWithSpecifiedOperation(basePath + "/insert_overwrite", 
WriteOperationType.INSERT_OVERWRITE);
   }
 
   @Test
   public void testInsertOverwriteTable() throws Exception {
-    testDeltaStreamerWithSpecifiedOperation(dfsBasePath + 
"/insert_overwrite_table", WriteOperationType.INSERT_OVERWRITE_TABLE);
+    testDeltaStreamerWithSpecifiedOperation(basePath + 
"/insert_overwrite_table", WriteOperationType.INSERT_OVERWRITE_TABLE);
   }
 
   @Disabled("Local run passing; flaky in CI environment.")
@@ -2215,7 +2215,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
   public void testDeletePartitions() throws Exception {
     prepareParquetDFSSource(false, false, "source.avsc", "target.avsc",
         PROPS_FILENAME_TEST_PARQUET, PARQUET_SOURCE_ROOT, false, 
"partition_path");
-    String tableBasePath = dfsBasePath + "/test_parquet_table" + testNum;
+    String tableBasePath = basePath + "/test_parquet_table" + testNum;
     HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
         TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, 
ParquetDFSSource.class.getName(),
             null, PROPS_FILENAME_TEST_PARQUET, false,
@@ -2242,7 +2242,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     new HoodieDeltaStreamer(cfg, jsc).sync();
     TestHelpers.assertRecordCount(1000, tableBasePath, sqlContext);
     TestHelpers.assertDistanceCount(1000, tableBasePath, sqlContext);
-    TestHelpers.assertCommitMetadata("00000", tableBasePath, dfs, 1);
+    TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
 
     // setting the operationType
     cfg.operation = operationType;
@@ -2251,27 +2251,27 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     new HoodieDeltaStreamer(cfg, jsc).sync();
     TestHelpers.assertRecordCount(1000, tableBasePath, sqlContext);
     TestHelpers.assertDistanceCount(1000, tableBasePath, sqlContext);
-    TestHelpers.assertCommitMetadata("00000", tableBasePath, dfs, 1);
+    TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
 
     cfg.sourceLimit = 1000;
     new HoodieDeltaStreamer(cfg, jsc).sync();
     TestHelpers.assertRecordCount(1950, tableBasePath, sqlContext);
     TestHelpers.assertDistanceCount(1950, tableBasePath, sqlContext);
-    TestHelpers.assertCommitMetadata("00001", tableBasePath, dfs, 2);
+    TestHelpers.assertCommitMetadata("00001", tableBasePath, fs, 2);
   }
 
   @Test
   public void testFetchingCheckpointFromPreviousCommits() throws IOException {
-    HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(dfsBasePath + 
"/testFetchPreviousCheckpoint", WriteOperationType.BULK_INSERT);
+    HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(basePath + 
"/testFetchPreviousCheckpoint", WriteOperationType.BULK_INSERT);
 
     TypedProperties properties = new TypedProperties();
     properties.setProperty("hoodie.datasource.write.recordkey.field","key");
     properties.setProperty("hoodie.datasource.write.partitionpath.field","pp");
     TestDeltaSync testDeltaSync = new TestDeltaSync(cfg, sparkSession, null, 
properties,
-        jsc, dfs, jsc.hadoopConfiguration(), null);
+        jsc, fs, jsc.hadoopConfiguration(), null);
 
     properties.put(HoodieTableConfig.NAME.key(), "sample_tbl");
-    HoodieTableMetaClient metaClient = 
HoodieTestUtils.init(jsc.hadoopConfiguration(), dfsBasePath, 
HoodieTableType.COPY_ON_WRITE, properties);
+    HoodieTableMetaClient metaClient = 
HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, 
HoodieTableType.COPY_ON_WRITE, properties);
 
     Map<String, String> extraMetadata = new HashMap<>();
     extraMetadata.put(HoodieWriteConfig.DELTASTREAMER_CHECKPOINT_KEY, "abc");
@@ -2295,17 +2295,17 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
 
   @Test
   public void testDropPartitionColumns() throws Exception {
-    String tableBasePath = dfsBasePath + "/test_drop_partition_columns" + 
testNum++;
+    String tableBasePath = basePath + "/test_drop_partition_columns" + 
testNum++;
     // ingest data with dropping partition columns enabled
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.UPSERT);
     cfg.configs.add(String.format("%s=%s", 
HoodieTableConfig.DROP_PARTITION_COLUMNS.key(), "true"));
     HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
     ds.sync();
     // assert ingest successful
-    TestHelpers.assertAtLeastNCommits(1, tableBasePath, dfs);
+    TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs);
 
     TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(
-            
HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(dfs.getConf()).build());
+            
HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(fs.getConf()).build());
     // get schema from data file written in the latest commit
     Schema tableSchema = tableSchemaResolver.getTableAvroSchemaFromDataFile();
     assertNotNull(tableSchema);
@@ -2317,7 +2317,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
 
   @Test
   public void testForceEmptyMetaSync() throws Exception {
-    String tableBasePath = dfsBasePath + "/test_force_empty_meta_sync";
+    String tableBasePath = basePath + "/test_force_empty_meta_sync";
 
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.BULK_INSERT);
     cfg.sourceLimit = 0;
@@ -2325,7 +2325,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     cfg.enableMetaSync = true;
     cfg.forceEmptyMetaSync = true;
 
-    new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync();
+    new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync();
     TestHelpers.assertRecordCount(0, tableBasePath, sqlContext);
 
     // make sure hive table is present
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java
index 8f54b0d34d..8ac8b616b5 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java
@@ -62,8 +62,8 @@ public class TestHoodieMultiTableDeltaStreamer extends 
HoodieDeltaStreamerTestBa
       HoodieMultiTableDeltaStreamer.Config config = new 
HoodieMultiTableDeltaStreamer.Config();
       config.configFolder = configFolder;
       config.targetTableName = "dummy_table";
-      config.basePathPrefix = dfsBasePath + "/" + basePathPrefix;
-      config.propsFilePath = dfsBasePath + "/" + fileName;
+      config.basePathPrefix = basePath + "/" + basePathPrefix;
+      config.propsFilePath = basePath + "/" + fileName;
       config.tableType = "COPY_ON_WRITE";
       config.sourceClassName = sourceClassName;
       config.sourceOrderingField = "timestamp";
@@ -79,7 +79,7 @@ public class TestHoodieMultiTableDeltaStreamer extends 
HoodieDeltaStreamerTestBa
 
   @Test
   public void testMetaSyncConfig() throws IOException {
-    HoodieMultiTableDeltaStreamer.Config cfg = 
TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", 
TestDataSource.class.getName(), true, true, null);
+    HoodieMultiTableDeltaStreamer.Config cfg = 
TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, basePath + "/config", 
TestDataSource.class.getName(), true, true, null);
     HoodieMultiTableDeltaStreamer streamer = new 
HoodieMultiTableDeltaStreamer(cfg, jsc);
     TableExecutionContext executionContext = 
streamer.getTableExecutionContexts().get(1);
     assertEquals("com.example.DummySyncTool1,com.example.DummySyncTool2", 
executionContext.getConfig().syncClientToolClassNames);
@@ -87,7 +87,7 @@ public class TestHoodieMultiTableDeltaStreamer extends 
HoodieDeltaStreamerTestBa
 
   @Test
   public void testInvalidHiveSyncProps() throws IOException {
-    HoodieMultiTableDeltaStreamer.Config cfg = 
TestHelpers.getConfig(PROPS_INVALID_HIVE_SYNC_TEST_SOURCE1, dfsBasePath + 
"/config", TestDataSource.class.getName(), true, true, null);
+    HoodieMultiTableDeltaStreamer.Config cfg = 
TestHelpers.getConfig(PROPS_INVALID_HIVE_SYNC_TEST_SOURCE1, basePath + 
"/config", TestDataSource.class.getName(), true, true, null);
     Exception e = assertThrows(HoodieException.class, () -> {
       new HoodieMultiTableDeltaStreamer(cfg, jsc);
     }, "Should fail when hive sync table not provided with enableHiveSync 
flag");
@@ -97,7 +97,7 @@ public class TestHoodieMultiTableDeltaStreamer extends 
HoodieDeltaStreamerTestBa
 
   @Test
   public void testInvalidPropsFilePath() throws IOException {
-    HoodieMultiTableDeltaStreamer.Config cfg = 
TestHelpers.getConfig(PROPS_INVALID_FILE, dfsBasePath + "/config", 
TestDataSource.class.getName(), true, true, null);
+    HoodieMultiTableDeltaStreamer.Config cfg = 
TestHelpers.getConfig(PROPS_INVALID_FILE, basePath + "/config", 
TestDataSource.class.getName(), true, true, null);
     Exception e = assertThrows(IllegalArgumentException.class, () -> {
       new HoodieMultiTableDeltaStreamer(cfg, jsc);
     }, "Should fail when invalid props file is provided");
@@ -107,7 +107,7 @@ public class TestHoodieMultiTableDeltaStreamer extends 
HoodieDeltaStreamerTestBa
 
   @Test
   public void testInvalidTableConfigFilePath() throws IOException {
-    HoodieMultiTableDeltaStreamer.Config cfg = 
TestHelpers.getConfig(PROPS_INVALID_TABLE_CONFIG_FILE, dfsBasePath + "/config", 
TestDataSource.class.getName(), true, true, null);
+    HoodieMultiTableDeltaStreamer.Config cfg = 
TestHelpers.getConfig(PROPS_INVALID_TABLE_CONFIG_FILE, basePath + "/config", 
TestDataSource.class.getName(), true, true, null);
     Exception e = assertThrows(IllegalArgumentException.class, () -> {
       new HoodieMultiTableDeltaStreamer(cfg, jsc);
     }, "Should fail when invalid table config props file path is provided");
@@ -117,11 +117,11 @@ public class TestHoodieMultiTableDeltaStreamer extends 
HoodieDeltaStreamerTestBa
 
   @Test
   public void testCustomConfigProps() throws IOException {
-    HoodieMultiTableDeltaStreamer.Config cfg = 
TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", 
TestDataSource.class.getName(), false, false, SchemaRegistryProvider.class);
+    HoodieMultiTableDeltaStreamer.Config cfg = 
TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, basePath + "/config", 
TestDataSource.class.getName(), false, false, SchemaRegistryProvider.class);
     HoodieMultiTableDeltaStreamer streamer = new 
HoodieMultiTableDeltaStreamer(cfg, jsc);
     TableExecutionContext executionContext = 
streamer.getTableExecutionContexts().get(1);
     assertEquals(2, streamer.getTableExecutionContexts().size());
-    assertEquals(dfsBasePath + 
"/multi_table_dataset/uber_db/dummy_table_uber", 
executionContext.getConfig().targetBasePath);
+    assertEquals(basePath + "/multi_table_dataset/uber_db/dummy_table_uber", 
executionContext.getConfig().targetBasePath);
     assertEquals("uber_db.dummy_table_uber", 
executionContext.getConfig().targetTableName);
     assertEquals("topic1", 
executionContext.getProperties().getString(HoodieMultiTableDeltaStreamer.Constants.KAFKA_TOPIC_PROP));
     assertEquals("_row_key", 
executionContext.getProperties().getString(DataSourceWriteOptions.RECORDKEY_FIELD().key()));
@@ -136,7 +136,7 @@ public class TestHoodieMultiTableDeltaStreamer extends 
HoodieDeltaStreamerTestBa
   @Disabled
   public void testInvalidIngestionProps() {
     Exception e = assertThrows(Exception.class, () -> {
-      HoodieMultiTableDeltaStreamer.Config cfg = 
TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", 
TestDataSource.class.getName(), true, true, null);
+      HoodieMultiTableDeltaStreamer.Config cfg = 
TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, basePath + "/config", 
TestDataSource.class.getName(), true, true, null);
       new HoodieMultiTableDeltaStreamer(cfg, jsc);
     }, "Creation of execution object should fail without kafka topic");
     LOG.debug("Creation of execution object failed with error: " + 
e.getMessage(), e);
@@ -155,18 +155,18 @@ public class TestHoodieMultiTableDeltaStreamer extends 
HoodieDeltaStreamerTestBa
     testUtils.sendMessages(topicName1, 
Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 5, 
HoodieTestDataGenerator.TRIP_SCHEMA)));
     testUtils.sendMessages(topicName2, 
Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 10, 
HoodieTestDataGenerator.SHORT_TRIP_SCHEMA)));
 
-    HoodieMultiTableDeltaStreamer.Config cfg = 
TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", 
JsonKafkaSource.class.getName(), false, false, null);
+    HoodieMultiTableDeltaStreamer.Config cfg = 
TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, basePath + "/config", 
JsonKafkaSource.class.getName(), false, false, null);
     HoodieMultiTableDeltaStreamer streamer = new 
HoodieMultiTableDeltaStreamer(cfg, jsc);
     List<TableExecutionContext> executionContexts = 
streamer.getTableExecutionContexts();
     TypedProperties properties = executionContexts.get(1).getProperties();
-    
properties.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
 dfsBasePath + "/source_uber.avsc");
-    
properties.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
 dfsBasePath + "/target_uber.avsc");
+    
properties.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
 basePath + "/source_uber.avsc");
+    
properties.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
 basePath + "/target_uber.avsc");
     properties.setProperty("hoodie.datasource.write.partitionpath.field", 
"timestamp");
     properties.setProperty("hoodie.deltastreamer.source.kafka.topic", 
topicName2);
     executionContexts.get(1).setProperties(properties);
     TypedProperties properties1 = executionContexts.get(0).getProperties();
-    
properties1.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
 dfsBasePath + "/source_short_trip_uber.avsc");
-    
properties1.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
 dfsBasePath + "/target_short_trip_uber.avsc");
+    
properties1.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
 basePath + "/source_short_trip_uber.avsc");
+    
properties1.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
 basePath + "/target_short_trip_uber.avsc");
     properties1.setProperty("hoodie.datasource.write.partitionpath.field", 
"timestamp");
     properties1.setProperty("hoodie.deltastreamer.source.kafka.topic", 
topicName1);
     executionContexts.get(0).setProperties(properties1);
@@ -198,15 +198,15 @@ public class TestHoodieMultiTableDeltaStreamer extends 
HoodieDeltaStreamerTestBa
   @Test
   public void testMultiTableExecutionWithParquetSource() throws IOException {
     // ingest test data to 2 parquet source paths
-    String parquetSourceRoot1 = dfsBasePath + "/parquetSrcPath1/";
+    String parquetSourceRoot1 = basePath + "/parquetSrcPath1/";
     prepareParquetDFSFiles(10, parquetSourceRoot1);
-    String parquetSourceRoot2 = dfsBasePath + "/parquetSrcPath2/";
+    String parquetSourceRoot2 = basePath + "/parquetSrcPath2/";
     prepareParquetDFSFiles(5, parquetSourceRoot2);
 
     // add only common props. later we can add per table props
     String parquetPropsFile = populateCommonPropsAndWriteToFile();
 
-    HoodieMultiTableDeltaStreamer.Config cfg = 
TestHelpers.getConfig(parquetPropsFile, dfsBasePath + "/config", 
ParquetDFSSource.class.getName(), false, false,
+    HoodieMultiTableDeltaStreamer.Config cfg = 
TestHelpers.getConfig(parquetPropsFile, basePath + "/config", 
ParquetDFSSource.class.getName(), false, false,
         false, "multi_table_parquet", null);
     HoodieMultiTableDeltaStreamer streamer = new 
HoodieMultiTableDeltaStreamer(cfg, jsc);
 
@@ -237,7 +237,7 @@ public class TestHoodieMultiTableDeltaStreamer extends 
HoodieDeltaStreamerTestBa
 
   @Test
   public void testTableLevelProperties() throws IOException {
-    HoodieMultiTableDeltaStreamer.Config cfg = 
TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", 
TestDataSource.class.getName(), false, false, null);
+    HoodieMultiTableDeltaStreamer.Config cfg = 
TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, basePath + "/config", 
TestDataSource.class.getName(), false, false, null);
     HoodieMultiTableDeltaStreamer streamer = new 
HoodieMultiTableDeltaStreamer(cfg, jsc);
     List<TableExecutionContext> tableExecutionContexts = 
streamer.getTableExecutionContexts();
     tableExecutionContexts.forEach(tableExecutionContext -> {
@@ -255,8 +255,8 @@ public class TestHoodieMultiTableDeltaStreamer extends 
HoodieDeltaStreamerTestBa
 
   private String populateCommonPropsAndWriteToFile() throws IOException {
     TypedProperties commonProps = new TypedProperties();
-    populateCommonProps(commonProps, dfsBasePath);
-    UtilitiesTestBase.Helpers.savePropsToDFS(commonProps, dfs, dfsBasePath + 
"/" + PROPS_FILENAME_TEST_PARQUET);
+    populateCommonProps(commonProps, basePath);
+    UtilitiesTestBase.Helpers.savePropsToDFS(commonProps, fs, basePath + "/" + 
PROPS_FILENAME_TEST_PARQUET);
     return PROPS_FILENAME_TEST_PARQUET;
   }
 
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroDFSSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroDFSSource.java
index 37abaa56b1..1cda910b70 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroDFSSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroDFSSource.java
@@ -34,7 +34,7 @@ public class TestAvroDFSSource extends 
AbstractDFSSourceTestBase {
   @BeforeEach
   public void setup() throws Exception {
     super.setup();
-    this.dfsRoot = dfsBasePath + "/avroFiles";
+    this.dfsRoot = basePath + "/avroFiles";
     this.fileSuffix = ".avro";
   }
 
@@ -53,4 +53,4 @@ public class TestAvroDFSSource extends 
AbstractDFSSourceTestBase {
   protected void writeNewDataToFile(List<HoodieRecord> records, Path path) 
throws IOException {
     Helpers.saveAvroToDFS(Helpers.toGenericRecords(records), path);
   }
-}
\ No newline at end of file
+}
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestCsvDFSSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestCsvDFSSource.java
index 7b8eead14f..8ffa3173e1 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestCsvDFSSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestCsvDFSSource.java
@@ -38,7 +38,7 @@ public class TestCsvDFSSource extends 
AbstractDFSSourceTestBase {
   @BeforeEach
   public void setup() throws Exception {
     super.setup();
-    this.dfsRoot = dfsBasePath + "/jsonFiles";
+    this.dfsRoot = basePath + "/jsonFiles";
     this.fileSuffix = ".json";
     this.useFlattenedSchema = true;
     this.schemaProvider = new FilebasedSchemaProvider(
@@ -57,6 +57,6 @@ public class TestCsvDFSSource extends 
AbstractDFSSourceTestBase {
   @Override
   public void writeNewDataToFile(List<HoodieRecord> records, Path path) throws 
IOException {
     UtilitiesTestBase.Helpers.saveCsvToDFS(
-        true, '\t', Helpers.jsonifyRecords(records), dfs, path.toString());
+        true, '\t', Helpers.jsonifyRecords(records), fs, path.toString());
   }
 }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsSource.java
index 1ac3f91fa8..802a338b5c 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsSource.java
@@ -58,7 +58,7 @@ public class TestGcsEventsSource extends UtilitiesTestBase {
 
   @BeforeAll
   public static void beforeAll() throws Exception {
-    UtilitiesTestBase.initTestServices(false, false);
+    UtilitiesTestBase.initTestServices();
   }
 
   @BeforeEach
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonDFSSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonDFSSource.java
index 76c1c50b09..fde10b2d9a 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonDFSSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonDFSSource.java
@@ -37,7 +37,7 @@ public class TestJsonDFSSource extends 
AbstractDFSSourceTestBase {
   @BeforeEach
   public void setup() throws Exception {
     super.setup();
-    this.dfsRoot = dfsBasePath + "/jsonFiles";
+    this.dfsRoot = basePath + "/jsonFiles";
     this.fileSuffix = ".json";
   }
 
@@ -51,6 +51,6 @@ public class TestJsonDFSSource extends 
AbstractDFSSourceTestBase {
   @Override
   public void writeNewDataToFile(List<HoodieRecord> records, Path path) throws 
IOException {
     UtilitiesTestBase.Helpers.saveStringsToDFS(
-        Helpers.jsonifyRecords(records), dfs, path.toString());
+        Helpers.jsonifyRecords(records), fs, path.toString());
   }
 }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestParquetDFSSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestParquetDFSSource.java
index 5ad590a82f..44489037e8 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestParquetDFSSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestParquetDFSSource.java
@@ -36,7 +36,7 @@ public class TestParquetDFSSource extends 
AbstractDFSSourceTestBase {
   @BeforeEach
   public void setup() throws Exception {
     super.setup();
-    this.dfsRoot = dfsBasePath + "/parquetFiles";
+    this.dfsRoot = basePath + "/parquetFiles";
     this.fileSuffix = ".parquet";
   }
 
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsSource.java
index 3d89dc2bc9..55c9a6b86f 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsSource.java
@@ -47,9 +47,9 @@ public class TestS3EventsSource extends 
AbstractCloudObjectsSourceTestBase {
   @BeforeEach
   public void setup() throws Exception {
     super.setup();
-    this.dfsRoot = dfsBasePath + "/parquetFiles";
+    this.dfsRoot = basePath + "/parquetFiles";
     this.fileSuffix = ".parquet";
-    dfs.mkdirs(new Path(dfsRoot));
+    fs.mkdirs(new Path(dfsRoot));
   }
 
   @AfterEach
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlSource.java
index f2b3b1df94..35bc7884d7 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlSource.java
@@ -18,8 +18,6 @@
 
 package org.apache.hudi.utilities.sources;
 
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.fs.Path;
 import org.apache.hudi.AvroConversionUtils;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
@@ -27,6 +25,9 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter;
 import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
 import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.Path;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.sql.AnalysisException;
 import org.apache.spark.sql.Dataset;
@@ -59,7 +60,7 @@ public class TestSqlSource extends UtilitiesTestBase {
 
   @BeforeAll
   public static void initClass() throws Exception {
-    UtilitiesTestBase.initTestServices(false, false);
+    UtilitiesTestBase.initTestServices();
   }
 
   @AfterAll
@@ -69,8 +70,8 @@ public class TestSqlSource extends UtilitiesTestBase {
 
   @BeforeEach
   public void setup() throws Exception {
-    dfsRoot = UtilitiesTestBase.dfsBasePath + "/parquetFiles";
-    UtilitiesTestBase.dfs.mkdirs(new Path(dfsRoot));
+    dfsRoot = UtilitiesTestBase.basePath + "/parquetFiles";
+    UtilitiesTestBase.fs.mkdirs(new Path(dfsRoot));
     props = new TypedProperties();
     super.setup();
     schemaProvider = new FilebasedSchemaProvider(Helpers.setupSchemaOnDFS(), 
jsc);
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java
index e3bf39ad7d..2ac1b8b0bf 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java
@@ -64,7 +64,7 @@ public abstract class TestAbstractDebeziumSource extends 
UtilitiesTestBase {
 
   @BeforeAll
   public static void initClass() throws Exception {
-    UtilitiesTestBase.initTestServices(false, false);
+    UtilitiesTestBase.initTestServices();
   }
 
   @AfterAll
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
index ff7d6cc2ed..534e14cc73 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
@@ -52,9 +52,9 @@ import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hive.service.server.HiveServer2;
 import org.apache.log4j.Level;
@@ -74,6 +74,7 @@ import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.io.TempDir;
 
 import java.io.BufferedReader;
 import java.io.FileInputStream;
@@ -102,19 +103,22 @@ import static 
org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_TABLE_NAME;
  */
 public class UtilitiesTestBase {
 
-  protected static String dfsBasePath;
+  @TempDir
+  protected static java.nio.file.Path sharedTempDir;
+  protected static FileSystem fs;
+  protected static String basePath;
   protected static HdfsTestService hdfsTestService;
   protected static MiniDFSCluster dfsCluster;
-  protected static DistributedFileSystem dfs;
-  protected transient JavaSparkContext jsc = null;
-  protected transient HoodieSparkEngineContext context = null;
-  protected transient SparkSession sparkSession = null;
-  protected transient SQLContext sqlContext;
   protected static HiveServer2 hiveServer;
   protected static HiveTestService hiveTestService;
   protected static ZookeeperTestService zookeeperTestService;
   private static final ObjectMapper MAPPER = new ObjectMapper();
 
+  protected transient JavaSparkContext jsc;
+  protected transient HoodieSparkEngineContext context;
+  protected transient SparkSession sparkSession;
+  protected transient SQLContext sqlContext;
+
   @BeforeAll
   public static void setLogLevel() {
     Logger rootLogger = Logger.getRootLogger();
@@ -122,22 +126,33 @@ public class UtilitiesTestBase {
     Logger.getLogger("org.apache.spark").setLevel(Level.WARN);
   }
 
-  public static void initTestServices(boolean needsHive, boolean 
needsZookeeper) throws Exception {
+  public static void initTestServices() throws Exception {
+    initTestServices(false, false, false);
+  }
 
-    if (hdfsTestService == null) {
-      hdfsTestService = new HdfsTestService();
+  public static void initTestServices(boolean needsHdfs, boolean needsHive, 
boolean needsZookeeper) throws Exception {
+    final Configuration hadoopConf = HoodieTestUtils.getDefaultHadoopConf();
+    hadoopConf.set("hive.exec.scratchdir", System.getenv("java.io.tmpdir") + 
"/hive");
+
+    if (needsHdfs) {
+      hdfsTestService = new HdfsTestService(hadoopConf);
       dfsCluster = hdfsTestService.start(true);
-      dfs = dfsCluster.getFileSystem();
-      dfsBasePath = dfs.getWorkingDirectory().toString();
-      dfs.mkdirs(new Path(dfsBasePath));
+      fs = dfsCluster.getFileSystem();
+      basePath = fs.getWorkingDirectory().toString();
+      fs.mkdirs(new Path(basePath));
+    } else {
+      fs = FileSystem.getLocal(hadoopConf);
+      basePath = sharedTempDir.toUri().toString();
     }
+
     if (needsHive) {
-      hiveTestService = new HiveTestService(hdfsTestService.getHadoopConf());
+      hiveTestService = new HiveTestService(hadoopConf);
       hiveServer = hiveTestService.start();
-      clearHiveDb();
+      clearHiveDb(basePath + "/dummy" + System.currentTimeMillis());
     }
+
     if (needsZookeeper) {
-      zookeeperTestService = new 
ZookeeperTestService(hdfsTestService.getHadoopConf());
+      zookeeperTestService = new ZookeeperTestService(hadoopConf);
       zookeeperTestService.start();
     }
   }
@@ -213,14 +228,14 @@ public class UtilitiesTestBase {
    * 
    * @throws IOException
    */
-  private static void clearHiveDb() throws Exception {
+  private static void clearHiveDb(String tempWriteablePath) throws Exception {
     // Create Dummy hive sync config
-    HiveSyncConfig hiveSyncConfig = getHiveSyncConfig("/dummy", "dummy");
+    HiveSyncConfig hiveSyncConfig = getHiveSyncConfig(tempWriteablePath, 
"dummy");
     hiveSyncConfig.setHadoopConf(hiveTestService.getHiveConf());
     HoodieTableMetaClient.withPropertyBuilder()
       .setTableType(HoodieTableType.COPY_ON_WRITE)
       .setTableName(hiveSyncConfig.getString(META_SYNC_TABLE_NAME))
-      .initTable(dfs.getConf(), hiveSyncConfig.getString(META_SYNC_BASE_PATH));
+      .initTable(fs.getConf(), hiveSyncConfig.getString(META_SYNC_BASE_PATH));
 
     QueryBasedDDLExecutor ddlExecutor = new JDBCExecutor(hiveSyncConfig);
     ddlExecutor.runSQL("drop database if exists " + 
hiveSyncConfig.getString(META_SYNC_DATABASE_NAME));
@@ -375,16 +390,16 @@ public class UtilitiesTestBase {
     }
 
     public static TypedProperties setupSchemaOnDFS(String scope, String 
filename) throws IOException {
-      UtilitiesTestBase.Helpers.copyToDFS(scope + "/" + filename, dfs, 
dfsBasePath + "/" + filename);
+      UtilitiesTestBase.Helpers.copyToDFS(scope + "/" + filename, fs, basePath 
+ "/" + filename);
       TypedProperties props = new TypedProperties();
-      
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", 
dfsBasePath + "/" + filename);
+      
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", 
basePath + "/" + filename);
       return props;
     }
 
     public static TypedProperties setupSchemaOnDFSWithAbsoluteScope(String 
scope, String filename) throws IOException {
-      UtilitiesTestBase.Helpers.copyToDFSFromAbsolutePath(scope + "/" + 
filename, dfs, dfsBasePath + "/" + filename);
+      UtilitiesTestBase.Helpers.copyToDFSFromAbsolutePath(scope + "/" + 
filename, fs, basePath + "/" + filename);
       TypedProperties props = new TypedProperties();
-      
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", 
dfsBasePath + "/" + filename);
+      
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", 
basePath + "/" + filename);
       return props;
     }
 
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractCloudObjectsSourceTestBase.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractCloudObjectsSourceTestBase.java
index 510843b170..d4593f64fd 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractCloudObjectsSourceTestBase.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractCloudObjectsSourceTestBase.java
@@ -54,7 +54,7 @@ public abstract class AbstractCloudObjectsSourceTestBase 
extends UtilitiesTestBa
 
   @BeforeAll
   public static void initClass() throws Exception {
-    UtilitiesTestBase.initTestServices(false, false);
+    UtilitiesTestBase.initTestServices();
   }
 
   @AfterAll
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractDFSSourceTestBase.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractDFSSourceTestBase.java
index ea218f53be..d49d197fd5 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractDFSSourceTestBase.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractDFSSourceTestBase.java
@@ -62,7 +62,7 @@ public abstract class AbstractDFSSourceTestBase extends 
UtilitiesTestBase {
 
   @BeforeAll
   public static void initClass() throws Exception {
-    UtilitiesTestBase.initTestServices(false, false);
+    UtilitiesTestBase.initTestServices(true, false, false);
   }
 
   @AfterAll
@@ -117,7 +117,7 @@ public abstract class AbstractDFSSourceTestBase extends 
UtilitiesTestBase {
    */
   @Test
   public void testReadingFromSource() throws IOException {
-    dfs.mkdirs(new Path(dfsRoot));
+    fs.mkdirs(new Path(dfsRoot));
     SourceFormatAdapter sourceFormatAdapter = new 
SourceFormatAdapter(prepareDFSSource());
 
     // 1. Extract without any checkpoint => get all the data, respecting 
sourceLimit
@@ -125,7 +125,7 @@ public abstract class AbstractDFSSourceTestBase extends 
UtilitiesTestBase {
         sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), 
Long.MAX_VALUE).getBatch());
     // Test respecting sourceLimit
     int sourceLimit = 10;
-    RemoteIterator<LocatedFileStatus> files = 
dfs.listFiles(generateOneFile("1", "000", 100), true);
+    RemoteIterator<LocatedFileStatus> files = 
fs.listFiles(generateOneFile("1", "000", 100), true);
     FileStatus file1Status = files.next();
     assertTrue(file1Status.getLen() > sourceLimit);
     assertEquals(Option.empty(),
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlFileBasedTransformer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlFileBasedTransformer.java
index 3e1c5a92b1..74dafbd2ff 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlFileBasedTransformer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlFileBasedTransformer.java
@@ -22,6 +22,7 @@ package org.apache.hudi.utilities.transform;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
+
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
@@ -49,19 +50,19 @@ public class TestSqlFileBasedTransformer extends 
UtilitiesTestBase {
 
   @BeforeAll
   public static void initClass() throws Exception {
-    UtilitiesTestBase.initTestServices(false, false);
+    UtilitiesTestBase.initTestServices();
     UtilitiesTestBase.Helpers.copyToDFS(
         "delta-streamer-config/sql-file-transformer.sql",
-        UtilitiesTestBase.dfs,
-        UtilitiesTestBase.dfsBasePath + "/sql-file-transformer.sql");
+        UtilitiesTestBase.fs,
+        UtilitiesTestBase.basePath + "/sql-file-transformer.sql");
     UtilitiesTestBase.Helpers.copyToDFS(
         "delta-streamer-config/sql-file-transformer-invalid.sql",
-        UtilitiesTestBase.dfs,
-        UtilitiesTestBase.dfsBasePath + "/sql-file-transformer-invalid.sql");
+        UtilitiesTestBase.fs,
+        UtilitiesTestBase.basePath + "/sql-file-transformer-invalid.sql");
     UtilitiesTestBase.Helpers.copyToDFS(
         "delta-streamer-config/sql-file-transformer-empty.sql",
-        UtilitiesTestBase.dfs,
-        UtilitiesTestBase.dfsBasePath + "/sql-file-transformer-empty.sql");
+        UtilitiesTestBase.fs,
+        UtilitiesTestBase.basePath + "/sql-file-transformer-empty.sql");
   }
 
   @AfterAll
@@ -98,7 +99,7 @@ public class TestSqlFileBasedTransformer extends 
UtilitiesTestBase {
     // Test if the class throws hoodie IO exception correctly when given a 
incorrect config.
     props.setProperty(
         "hoodie.deltastreamer.transformer.sql.file",
-        UtilitiesTestBase.dfsBasePath + "/non-exist-sql-file.sql");
+        UtilitiesTestBase.basePath + "/non-exist-sql-file.sql");
     assertThrows(
         HoodieIOException.class,
         () -> sqlFileTransformer.apply(jsc, sparkSession, inputDatasetRows, 
props));
@@ -109,7 +110,7 @@ public class TestSqlFileBasedTransformer extends 
UtilitiesTestBase {
     // Test if the SQL file based transformer works as expected for the 
invalid SQL statements.
     props.setProperty(
         "hoodie.deltastreamer.transformer.sql.file",
-        UtilitiesTestBase.dfsBasePath + "/sql-file-transformer-invalid.sql");
+        UtilitiesTestBase.basePath + "/sql-file-transformer-invalid.sql");
     assertThrows(
         ParseException.class,
         () -> sqlFileTransformer.apply(jsc, sparkSession, inputDatasetRows, 
props));
@@ -120,7 +121,7 @@ public class TestSqlFileBasedTransformer extends 
UtilitiesTestBase {
     // Test if the SQL file based transformer works as expected for the empty 
SQL statements.
     props.setProperty(
         "hoodie.deltastreamer.transformer.sql.file",
-        UtilitiesTestBase.dfsBasePath + "/sql-file-transformer-empty.sql");
+        UtilitiesTestBase.basePath + "/sql-file-transformer-empty.sql");
     Dataset<Row> emptyRow = sqlFileTransformer.apply(jsc, sparkSession, 
inputDatasetRows, props);
     String[] actualRows = 
emptyRow.as(Encoders.STRING()).collectAsList().toArray(new String[0]);
     String[] expectedRows = emptyDatasetRow.collectAsList().toArray(new 
String[0]);
@@ -132,7 +133,7 @@ public class TestSqlFileBasedTransformer extends 
UtilitiesTestBase {
     // Test if the SQL file based transformer works as expected for the 
correct input.
     props.setProperty(
         "hoodie.deltastreamer.transformer.sql.file",
-        UtilitiesTestBase.dfsBasePath + "/sql-file-transformer.sql");
+        UtilitiesTestBase.basePath + "/sql-file-transformer.sql");
     Dataset<Row> transformedRow =
         sqlFileTransformer.apply(jsc, sparkSession, inputDatasetRows, props);
 

Reply via email to