This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 556be9428fc [HUDI-7188] Revert "[HUDI-6836] Shutting down 
deltastreamer in tests and shutting down metrics for write client (#9667)" 
(#10264)
556be9428fc is described below

commit 556be9428fc19824c2533b76cb6240509799e684
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Wed Dec 6 18:43:05 2023 -0800

    [HUDI-7188] Revert "[HUDI-6836] Shutting down deltastreamer in tests and 
shutting down metrics for write client (#9667)" (#10264)
    
    This reverts commit 594b0ea4abe0bebfaa1ad742877e707d2a39f2d4.
---
 .../apache/hudi/client/BaseHoodieWriteClient.java  |    4 -
 .../apache/hudi/functional/TestCOWDataSource.scala |    1 -
 .../deltastreamer/TestHoodieDeltaStreamer.java     | 1204 ++++++++------------
 3 files changed, 479 insertions(+), 730 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index d94249e08e8..a3aa6699027 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -1358,10 +1358,6 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
     // before this point
     this.index.close();
     this.tableServiceClient.close();
-    // shutdown metrics
-    if (this.metrics != null && this.metrics.getMetrics() != null) {
-      this.metrics.getMetrics().shutdown();
-    }
   }
 
   public void setWriteTimer(String commitType) {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index c71226d0130..f500ea83120 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -1604,7 +1604,6 @@ class TestCOWDataSource extends HoodieSparkClientTestBase 
with ScalaAssertionSup
 
     assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
     assertEquals(false, Metrics.isInitialized(basePath), "Metrics should be 
shutdown")
-    assertEquals(false, Metrics.isInitialized(basePath + "/.hoodie/metadata"), 
"Metrics should be shutdown for metadata table")
   }
 
   @ParameterizedTest
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 5ae242234a0..03208a0c0e5 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -419,17 +419,13 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
         new HoodieDeltaStreamer(TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.UPSERT,
             
Collections.singletonList(TripsWithDistanceTransformer.class.getName()),
             propsFilename, false), jsc);
-    try {
-      deltaStreamer.sync();
-      HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
-          .setConf(new Configuration()).setBasePath(tableBasePath).build();
-      assertEquals(
-          expectedKeyGeneratorClassName, 
metaClient.getTableConfig().getKeyGeneratorClassName());
-      Dataset<Row> res = sqlContext.read().format("hudi").load(tableBasePath);
-      assertEquals(1000, res.count());
-    } finally {
-      deltaStreamer.shutdownGracefully();
-    }
+    deltaStreamer.sync();
+    HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
+        .setConf(new Configuration()).setBasePath(tableBasePath).build();
+    assertEquals(
+        expectedKeyGeneratorClassName, 
metaClient.getTableConfig().getKeyGeneratorClassName());
+    Dataset<Row> res = sqlContext.read().format("hudi").load(tableBasePath);
+    assertEquals(1000, res.count());
   }
 
   @Test
@@ -453,16 +449,12 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     cfg.configs.add(HIVE_STYLE_PARTITIONING_ENABLE.key() + "=" + configFlag);
     cfg.configs.add(URL_ENCODE_PARTITIONING.key() + "=" + configFlag);
     HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(cfg, jsc);
-    try {
-      deltaStreamer.getIngestionService().ingestOnce();
-      // Create new metaClient from tablePath
-      HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(context.getHadoopConf().get())
-          .setBasePath(tablePath).build();
-      assertEquals(configFlag, 
Boolean.parseBoolean(metaClient.getTableConfig().getHiveStylePartitioningEnable()));
-      assertEquals(configFlag, 
Boolean.parseBoolean(metaClient.getTableConfig().getUrlEncodePartitioning()));
-    } finally {
-      deltaStreamer.shutdownGracefully();
-    }
+    deltaStreamer.getIngestionService().ingestOnce();
+    // Create new metaClient from tablePath
+    HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(context.getHadoopConf().get())
+        .setBasePath(tablePath).build();
+    assertEquals(configFlag, 
Boolean.parseBoolean(metaClient.getTableConfig().getHiveStylePartitioningEnable()));
+    assertEquals(configFlag, 
Boolean.parseBoolean(metaClient.getTableConfig().getUrlEncodePartitioning()));
   }
 
   @ParameterizedTest
@@ -503,31 +495,27 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     cfg.configs.add("hoodie.datasource.write.hive_style_partitioning=true");
     cfg.configs.add("hoodie.bootstrap.parallelism=5");
     cfg.targetBasePath = newDatasetBasePath;
-    HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
-    try {
-      ds.sync();
-      Dataset<Row> res = 
sqlContext.read().format("org.apache.hudi").load(newDatasetBasePath);
-      LOG.info("Schema :");
-      res.printSchema();
-      assertRecordCount(1950, newDatasetBasePath, sqlContext);
-      res.registerTempTable("bootstrapped");
-      assertEquals(1950, sqlContext.sql("select distinct _hoodie_record_key 
from bootstrapped").count());
-      // NOTE: To fetch record's count Spark will optimize the query fetching 
minimal possible amount
-      //       of data, which might not provide adequate amount of test 
coverage
-      sqlContext.sql("select * from bootstrapped").show();
-
-      StructField[] fields = res.schema().fields();
-      List<String> fieldNames = Arrays.asList(res.schema().fieldNames());
-      List<String> expectedFieldNames = 
Arrays.asList(sourceDf.schema().fieldNames());
-      assertEquals(expectedFieldNames.size(), fields.length);
-      assertTrue(fieldNames.containsAll(HoodieRecord.HOODIE_META_COLUMNS));
-      assertTrue(fieldNames.containsAll(expectedFieldNames));
-      UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
-      UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, bootstrapSourcePath);
-      UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, newDatasetBasePath);
-    } finally {
-      ds.shutdownGracefully();
-    }
+    new HoodieDeltaStreamer(cfg, jsc).sync();
+    Dataset<Row> res = 
sqlContext.read().format("org.apache.hudi").load(newDatasetBasePath);
+    LOG.info("Schema :");
+    res.printSchema();
+
+    assertRecordCount(1950, newDatasetBasePath, sqlContext);
+    res.registerTempTable("bootstrapped");
+    assertEquals(1950, sqlContext.sql("select distinct _hoodie_record_key from 
bootstrapped").count());
+    // NOTE: To fetch record's count Spark will optimize the query fetching 
minimal possible amount
+    //       of data, which might not provide adequate amount of test coverage
+    sqlContext.sql("select * from bootstrapped").show();
+
+    StructField[] fields = res.schema().fields();
+    List<String> fieldNames = Arrays.asList(res.schema().fieldNames());
+    List<String> expectedFieldNames = 
Arrays.asList(sourceDf.schema().fieldNames());
+    assertEquals(expectedFieldNames.size(), fields.length);
+    assertTrue(fieldNames.containsAll(HoodieRecord.HOODIE_META_COLUMNS));
+    assertTrue(fieldNames.containsAll(expectedFieldNames));
+    UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
+    UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, bootstrapSourcePath);
+    UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, newDatasetBasePath);
   }
 
   @Test
@@ -560,15 +548,10 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
   }
 
   private void syncAndAssertRecordCount(HoodieDeltaStreamer.Config cfg, 
Integer expected, String tableBasePath, String metadata, Integer totalCommits) 
throws Exception {
-    HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
-    try {
-      ds.sync();
-      assertRecordCount(expected, tableBasePath, sqlContext);
-      assertDistanceCount(expected, tableBasePath, sqlContext);
-      TestHelpers.assertCommitMetadata(metadata, tableBasePath, fs, 
totalCommits);
-    } finally {
-      ds.shutdownGracefully();
-    }
+    new HoodieDeltaStreamer(cfg, jsc).sync();
+    assertRecordCount(expected, tableBasePath, sqlContext);
+    assertDistanceCount(expected, tableBasePath, sqlContext);
+    TestHelpers.assertCommitMetadata(metadata, tableBasePath, fs, 
totalCommits);
   }
 
   // TODO add tests w/ disabled reconciliation
@@ -587,14 +570,10 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     if (!useSchemaPostProcessor) {
       
cfg.configs.add(HoodieSchemaProviderConfig.SPARK_AVRO_POST_PROCESSOR_ENABLE.key()
 + "=false");
     }
-    HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
-    try {
-      ds.sync();
-      assertRecordCount(1000, tableBasePath, sqlContext);
-      TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
-    } finally {
-      ds.shutdownGracefully();
-    }
+    new HoodieDeltaStreamer(cfg, jsc).sync();
+    assertRecordCount(1000, tableBasePath, sqlContext);
+    TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
+
     // Upsert data produced with Schema B, pass Schema B
     cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, 
Collections.singletonList(TripsWithEvolvedOptionalFieldTransformer.class.getName()),
         PROPS_FILENAME_TEST_SOURCE, false, true, false, null, tableType);
@@ -605,12 +584,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     if (!useSchemaPostProcessor) {
       
cfg.configs.add(HoodieSchemaProviderConfig.SPARK_AVRO_POST_PROCESSOR_ENABLE.key()
 + "=false");
     }
-    ds = new HoodieDeltaStreamer(cfg, jsc);
-    try {
-      ds.sync();
-    } finally {
-      ds.shutdownGracefully();
-    }
+    new HoodieDeltaStreamer(cfg, jsc).sync();
     // out of 1000 new records, 500 are inserts, 450 are updates and 50 are 
deletes.
     assertRecordCount(1450, tableBasePath, sqlContext);
     TestHelpers.assertCommitMetadata("00001", tableBasePath, fs, 2);
@@ -637,17 +611,12 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
       
cfg.configs.add(HoodieSchemaProviderConfig.SPARK_AVRO_POST_PROCESSOR_ENABLE.key()
 + "=false");
     }
     cfg.configs.add(DataSourceWriteOptions.RECONCILE_SCHEMA().key() + "=true");
-    ds = new HoodieDeltaStreamer(cfg, jsc);
-    try {
-      ds.sync();
-      // again, 1000 new records, 500 are inserts, 450 are updates and 50 are 
deletes.
-      assertRecordCount(1900, tableBasePath, sqlContext);
-      TestHelpers.assertCommitMetadata("00002", tableBasePath, fs, 3);
-      counts = countsPerCommit(tableBasePath, sqlContext);
-      assertEquals(1900, counts.stream().mapToLong(entry -> 
entry.getLong(1)).sum());
-    } finally {
-      ds.shutdownGracefully();
-    }
+    new HoodieDeltaStreamer(cfg, jsc).sync();
+    // again, 1000 new records, 500 are inserts, 450 are updates and 50 are 
deletes.
+    assertRecordCount(1900, tableBasePath, sqlContext);
+    TestHelpers.assertCommitMetadata("00002", tableBasePath, fs, 3);
+    counts = countsPerCommit(tableBasePath, sqlContext);
+    assertEquals(1900, counts.stream().mapToLong(entry -> 
entry.getLong(1)).sum());
 
     TableSchemaResolver tableSchemaResolver = new 
TableSchemaResolver(HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(fs.getConf()).build());
     Schema tableSchema = tableSchemaResolver.getTableAvroSchema(false);
@@ -686,15 +655,10 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     cfg.configs.add(String.format("%s=%s", METRICS_REPORTER_TYPE_VALUE.key(), 
MetricsReporterType.INMEMORY.name()));
     cfg.continuousMode = false;
     HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
-    try {
-      ds.sync();
-      assertRecordCount(SQL_SOURCE_NUM_RECORDS, tableBasePath, sqlContext);
-      assertFalse(Metrics.isInitialized(tableBasePath), "Metrics should be 
shutdown");
-      assertFalse(Metrics.isInitialized(tableBasePath + "/.hoodie/metadata"), 
"Metrics should be shutdown for metadata table");
-      UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
-    }  finally {
-      ds.shutdownGracefully();
-    }
+    ds.sync();
+    assertRecordCount(SQL_SOURCE_NUM_RECORDS, tableBasePath, sqlContext);
+    assertFalse(Metrics.isInitialized(tableBasePath), "Metrics should be 
shutdown");
+    UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
   }
 
   @Timeout(600)
@@ -722,15 +686,10 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     cfg.configs.add(String.format("%s=%s", METRICS_REPORTER_TYPE_VALUE.key(), 
MetricsReporterType.INMEMORY.name()));
     cfg.continuousMode = false;
     HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
-    try {
-      ds.sync();
-      assertRecordCount(SQL_SOURCE_NUM_RECORDS, tableBasePath, sqlContext);
-      assertFalse(Metrics.isInitialized(tableBasePath), "Metrics should be 
shutdown");
-      assertFalse(Metrics.isInitialized(tableBasePath + "/.hoodie/metadata"), 
"Metrics should be shutdown for metadata table");
-      UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
-    }  finally {
-      ds.shutdownGracefully();
-    }
+    ds.sync();
+    assertRecordCount(SQL_SOURCE_NUM_RECORDS, tableBasePath, sqlContext);
+    assertFalse(Metrics.isInitialized(tableBasePath), "Metrics should be 
shutdown");
+    UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
   }
 
   private void testUpsertsContinuousMode(HoodieTableType tableType, String 
tempDir, HoodieRecordType recordType) throws Exception {
@@ -851,35 +810,27 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
     cfg.configs.add(String.format("%s=%s", 
"hoodie.datasource.write.row.writer.enable", "false"));
     HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
-    HoodieDeltaStreamer ds2 = null;
-    try {
-      ds.sync();
-      // assert ingest successful
-      TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs);
-
-      // schedule a clustering job to build a clustering plan and transition 
to inflight
-      HoodieClusteringJob clusteringJob = 
initialHoodieClusteringJob(tableBasePath, null, false, "schedule");
-      clusteringJob.cluster(0);
-      HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tableBasePath).build();
-      List<HoodieInstant> hoodieClusteringInstants = 
meta.getActiveTimeline().filterPendingReplaceTimeline().getInstants();
-      HoodieInstant clusteringRequest = hoodieClusteringInstants.get(0);
-      
meta.getActiveTimeline().transitionReplaceRequestedToInflight(clusteringRequest,
 Option.empty());
-
-      // do another ingestion with inline clustering enabled
-      cfg.configs.addAll(getTableServicesConfigs(totalRecords, "false", 
"true", "2", "", ""));
-      cfg.retryLastPendingInlineClusteringJob = true;
-      ds2 = new HoodieDeltaStreamer(cfg, jsc);
-      ds2.sync();
-      String completeClusteringTimeStamp = 
meta.reloadActiveTimeline().getCompletedReplaceTimeline().lastInstant().get().getTimestamp();
-      assertEquals(clusteringRequest.getTimestamp(), 
completeClusteringTimeStamp);
-      TestHelpers.assertAtLeastNCommits(2, tableBasePath, fs);
-      TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, fs);
-    } finally {
-      ds.shutdownGracefully();
-      if (ds2 != null) {
-        ds2.shutdownGracefully();
-      }
-    }
+    ds.sync();
+    // assert ingest successful
+    TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs);
+
+    // schedule a clustering job to build a clustering plan and transition to 
inflight
+    HoodieClusteringJob clusteringJob = 
initialHoodieClusteringJob(tableBasePath, null, false, "schedule");
+    clusteringJob.cluster(0);
+    HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tableBasePath).build();
+    List<HoodieInstant> hoodieClusteringInstants = 
meta.getActiveTimeline().filterPendingReplaceTimeline().getInstants();
+    HoodieInstant clusteringRequest = hoodieClusteringInstants.get(0);
+    
meta.getActiveTimeline().transitionReplaceRequestedToInflight(clusteringRequest,
 Option.empty());
+
+    // do another ingestion with inline clustering enabled
+    cfg.configs.addAll(getTableServicesConfigs(totalRecords, "false", "true", 
"2", "", ""));
+    cfg.retryLastPendingInlineClusteringJob = true;
+    HoodieDeltaStreamer ds2 = new HoodieDeltaStreamer(cfg, jsc);
+    ds2.sync();
+    String completeClusteringTimeStamp = 
meta.reloadActiveTimeline().getCompletedReplaceTimeline().lastInstant().get().getTimestamp();
+    assertEquals(clusteringRequest.getTimestamp(), 
completeClusteringTimeStamp);
+    TestHelpers.assertAtLeastNCommits(2, tableBasePath, fs);
+    TestHelpers.assertAtLeastNReplaceCommits(1, tableBasePath, fs);
   }
 
   @Test
@@ -902,32 +853,28 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
 
     // sync twice and trigger compaction
     HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(deltaCfg, jsc);
-    try {
-      deltaStreamer.sync();
-      assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
-      prepareParquetDFSUpdates(100, PARQUET_SOURCE_ROOT, "2.parquet", false, 
null, null, dataGenerator, "001");
-      deltaStreamer.sync();
-      TestHelpers.assertAtleastNDeltaCommits(2, tableBasePath, fs);
-      TestHelpers.assertAtleastNCompactionCommits(1, tableBasePath, fs);
-
-      // delete compaction commit
-      HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tableBasePath).build();
-      HoodieTimeline timeline = 
meta.getActiveTimeline().getCommitTimeline().filterCompletedInstants();
-      HoodieInstant commitInstant = timeline.lastInstant().get();
-      String commitFileName = tableBasePath + "/.hoodie/" + 
commitInstant.getFileName();
-      fs.delete(new Path(commitFileName), false);
-
-      // sync again
-      prepareParquetDFSUpdates(100, PARQUET_SOURCE_ROOT, "3.parquet", false, 
null, null, dataGenerator, "002");
-      deltaStreamer = new HoodieDeltaStreamer(deltaCfg, jsc);
-      deltaStreamer.sync();
-      TestHelpers.assertAtleastNDeltaCommits(3, tableBasePath, fs);
-      meta = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tableBasePath).build();
-      timeline = meta.getActiveTimeline().getRollbackTimeline();
-      assertEquals(1, timeline.getInstants().size());
-    } finally {
-      deltaStreamer.shutdownGracefully();
-    }
+    deltaStreamer.sync();
+    assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
+    prepareParquetDFSUpdates(100, PARQUET_SOURCE_ROOT, "2.parquet", false, 
null, null, dataGenerator, "001");
+    deltaStreamer.sync();
+    TestHelpers.assertAtleastNDeltaCommits(2, tableBasePath, fs);
+    TestHelpers.assertAtleastNCompactionCommits(1, tableBasePath, fs);
+
+    // delete compaction commit
+    HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tableBasePath).build();
+    HoodieTimeline timeline = 
meta.getActiveTimeline().getCommitTimeline().filterCompletedInstants();
+    HoodieInstant commitInstant = timeline.lastInstant().get();
+    String commitFileName = tableBasePath + "/.hoodie/" + 
commitInstant.getFileName();
+    fs.delete(new Path(commitFileName), false);
+
+    // sync again
+    prepareParquetDFSUpdates(100, PARQUET_SOURCE_ROOT, "3.parquet", false, 
null, null, dataGenerator, "002");
+    deltaStreamer = new HoodieDeltaStreamer(deltaCfg, jsc);
+    deltaStreamer.sync();
+    TestHelpers.assertAtleastNDeltaCommits(3, tableBasePath, fs);
+    meta = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tableBasePath).build();
+    timeline = meta.getActiveTimeline().getRollbackTimeline();
+    assertEquals(1, timeline.getInstants().size());
   }
 
   @ParameterizedTest
@@ -949,104 +896,100 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     cfg.configs.add(String.format("%s=%s", 
HoodieWriteConfig.MARKERS_TYPE.key(), "DIRECT"));
     cfg.configs.add(String.format("%s=%s", 
"hoodie.datasource.write.row.writer.enable", "false"));
     HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
-    try {
-      deltaStreamerTestRunner(ds, cfg, (r) -> {
-        TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, fs);
-        return true;
-      });
-
-      TestHelpers.assertAtLeastNCommits(6, tableBasePath, fs);
+    deltaStreamerTestRunner(ds, cfg, (r) -> {
       TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, fs);
+      return true;
+    });
 
-      // Step 2 : Get the first replacecommit and extract the corresponding 
replaced file IDs.
-      HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tableBasePath).build();
-      HoodieTimeline replacedTimeline = 
meta.reloadActiveTimeline().getCompletedReplaceTimeline();
-      Option<HoodieInstant> firstReplaceHoodieInstant = 
replacedTimeline.nthFromLastInstant(1);
-      assertTrue(firstReplaceHoodieInstant.isPresent());
-
-      Option<byte[]> firstReplaceHoodieInstantDetails = 
replacedTimeline.getInstantDetails(firstReplaceHoodieInstant.get());
-      HoodieReplaceCommitMetadata firstReplaceMetadata = 
HoodieReplaceCommitMetadata.fromBytes(firstReplaceHoodieInstantDetails.get(), 
HoodieReplaceCommitMetadata.class);
-      Map<String, List<String>> partitionToReplaceFileIds = 
firstReplaceMetadata.getPartitionToReplaceFileIds();
-      String partitionName = null;
-      List replacedFileIDs = null;
-      for (Map.Entry entry : partitionToReplaceFileIds.entrySet()) {
-        partitionName = String.valueOf(entry.getKey());
-        replacedFileIDs = (List) entry.getValue();
-      }
-
-      assertNotNull(partitionName);
-      assertNotNull(replacedFileIDs);
-
-      // Step 3 : Based to replacedFileIDs , get the corresponding complete 
path.
-      ArrayList<String> replacedFilePaths = new ArrayList<>();
-      Path partitionPath = new Path(meta.getBasePath(), partitionName);
-      RemoteIterator<LocatedFileStatus> hoodieFiles = 
meta.getFs().listFiles(partitionPath, true);
-      while (hoodieFiles.hasNext()) {
-        LocatedFileStatus f = hoodieFiles.next();
-        String file = f.getPath().toUri().toString();
-        for (Object replacedFileID : replacedFileIDs) {
-          if (file.contains(String.valueOf(replacedFileID))) {
-            replacedFilePaths.add(file);
-          }
+    TestHelpers.assertAtLeastNCommits(6, tableBasePath, fs);
+    TestHelpers.assertAtLeastNReplaceCommits(2, tableBasePath, fs);
+
+    // Step 2 : Get the first replacecommit and extract the corresponding 
replaced file IDs.
+    HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tableBasePath).build();
+    HoodieTimeline replacedTimeline = 
meta.reloadActiveTimeline().getCompletedReplaceTimeline();
+    Option<HoodieInstant> firstReplaceHoodieInstant = 
replacedTimeline.nthFromLastInstant(1);
+    assertTrue(firstReplaceHoodieInstant.isPresent());
+
+    Option<byte[]> firstReplaceHoodieInstantDetails = 
replacedTimeline.getInstantDetails(firstReplaceHoodieInstant.get());
+    HoodieReplaceCommitMetadata firstReplaceMetadata = 
HoodieReplaceCommitMetadata.fromBytes(firstReplaceHoodieInstantDetails.get(), 
HoodieReplaceCommitMetadata.class);
+    Map<String, List<String>> partitionToReplaceFileIds = 
firstReplaceMetadata.getPartitionToReplaceFileIds();
+    String partitionName = null;
+    List replacedFileIDs = null;
+    for (Map.Entry entry : partitionToReplaceFileIds.entrySet()) {
+      partitionName = String.valueOf(entry.getKey());
+      replacedFileIDs = (List) entry.getValue();
+    }
+
+    assertNotNull(partitionName);
+    assertNotNull(replacedFileIDs);
+
+    // Step 3 : Based to replacedFileIDs , get the corresponding complete path.
+    ArrayList<String> replacedFilePaths = new ArrayList<>();
+    Path partitionPath = new Path(meta.getBasePath(), partitionName);
+    RemoteIterator<LocatedFileStatus> hoodieFiles = 
meta.getFs().listFiles(partitionPath, true);
+    while (hoodieFiles.hasNext()) {
+      LocatedFileStatus f = hoodieFiles.next();
+      String file = f.getPath().toUri().toString();
+      for (Object replacedFileID : replacedFileIDs) {
+        if (file.contains(String.valueOf(replacedFileID))) {
+          replacedFilePaths.add(file);
         }
       }
+    }
 
-      assertFalse(replacedFilePaths.isEmpty());
-
-      // Step 4 : Add commits with insert of 1 record and trigger sync/async 
cleaner and archive.
-      List<String> configs = getTableServicesConfigs(1, "true", "true", "6", 
"", "");
-      configs.add(String.format("%s=%s", 
HoodieCleanConfig.CLEANER_POLICY.key(), "KEEP_LATEST_COMMITS"));
-      configs.add(String.format("%s=%s", 
HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key(), "1"));
-      configs.add(String.format("%s=%s", 
HoodieArchivalConfig.MIN_COMMITS_TO_KEEP.key(), "4"));
-      configs.add(String.format("%s=%s", 
HoodieArchivalConfig.MAX_COMMITS_TO_KEEP.key(), "5"));
-      configs.add(String.format("%s=%s", HoodieCleanConfig.ASYNC_CLEAN.key(), 
asyncClean));
-      configs.add(String.format("%s=%s", 
HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key(), "1"));
-      configs.add(String.format("%s=%s", HoodieWriteConfig.MARKERS_TYPE.key(), 
"DIRECT"));
-      if (asyncClean) {
-        configs.add(String.format("%s=%s", 
HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(),
-            WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name()));
-        configs.add(String.format("%s=%s", 
HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.key(),
-            HoodieFailedWritesCleaningPolicy.LAZY.name()));
-        configs.add(String.format("%s=%s", 
HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(),
-            InProcessLockProvider.class.getName()));
-      }
-      addRecordMerger(recordType, configs);
-      cfg.configs = configs;
-      cfg.continuousMode = false;
-      // timeline as of now. no cleaner and archival kicked in.
-      // c1, c2, rc3, c4, c5, rc6,
-
-      ds = new HoodieDeltaStreamer(cfg, jsc);
-      ds.sync();
-      // after 1 round of sync, timeline will be as follows
-      // just before clean
-      // c1, c2, rc3, c4, c5, rc6, c7
-      // after clean
-      // c1, c2, rc3, c4, c5, rc6, c7, c8.clean (earliest commit to retain is 
c7)
-      // after archival (retain 4 commits)
-      // c4, c5, rc6, c7, c8.clean
-
-      // old code has 2 sync() calls. book-keeping the sequence for now.
-      // after 2nd round of sync
-      // just before clean
-      // c4, c5, rc6, c7, c8.clean, c9
-      // after clean
-      // c4, c5, rc6, c7, c8.clean, c9, c10.clean (earliest commit to retain 
c9)
-      // after archival
-      // c5, rc6, c7, c8.clean, c9, c10.clean
-
-      // Step 5 : FirstReplaceHoodieInstant should not be retained.
-      long count = 
meta.reloadActiveTimeline().getCompletedReplaceTimeline().getInstantsAsStream().filter(instant
 -> firstReplaceHoodieInstant.get().equals(instant)).count();
-      assertEquals(0, count);
-
-      // Step 6 : All the replaced files in firstReplaceHoodieInstant should 
be deleted through sync/async cleaner.
-      for (String replacedFilePath : replacedFilePaths) {
-        assertFalse(meta.getFs().exists(new Path(replacedFilePath)));
-      }
-      UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
-    } finally {
-      ds.shutdownGracefully();
+    assertFalse(replacedFilePaths.isEmpty());
+
+    // Step 4 : Add commits with insert of 1 record and trigger sync/async 
cleaner and archive.
+    List<String> configs = getTableServicesConfigs(1, "true", "true", "6", "", 
"");
+    configs.add(String.format("%s=%s", HoodieCleanConfig.CLEANER_POLICY.key(), 
"KEEP_LATEST_COMMITS"));
+    configs.add(String.format("%s=%s", 
HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key(), "1"));
+    configs.add(String.format("%s=%s", 
HoodieArchivalConfig.MIN_COMMITS_TO_KEEP.key(), "4"));
+    configs.add(String.format("%s=%s", 
HoodieArchivalConfig.MAX_COMMITS_TO_KEEP.key(), "5"));
+    configs.add(String.format("%s=%s", HoodieCleanConfig.ASYNC_CLEAN.key(), 
asyncClean));
+    configs.add(String.format("%s=%s", 
HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key(), "1"));
+    configs.add(String.format("%s=%s", HoodieWriteConfig.MARKERS_TYPE.key(), 
"DIRECT"));
+    if (asyncClean) {
+      configs.add(String.format("%s=%s", 
HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(),
+          WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name()));
+      configs.add(String.format("%s=%s", 
HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.key(),
+          HoodieFailedWritesCleaningPolicy.LAZY.name()));
+      configs.add(String.format("%s=%s", 
HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(),
+          InProcessLockProvider.class.getName()));
+    }
+    addRecordMerger(recordType, configs);
+    cfg.configs = configs;
+    cfg.continuousMode = false;
+    // timeline as of now. no cleaner and archival kicked in.
+    // c1, c2, rc3, c4, c5, rc6,
+
+    ds = new HoodieDeltaStreamer(cfg, jsc);
+    ds.sync();
+    // after 1 round of sync, timeline will be as follows
+    // just before clean
+    // c1, c2, rc3, c4, c5, rc6, c7
+    // after clean
+    // c1, c2, rc3, c4, c5, rc6, c7, c8.clean (earliest commit to retain is c7)
+    // after archival (retain 4 commits)
+    // c4, c5, rc6, c7, c8.clean
+
+    // old code has 2 sync() calls. book-keeping the sequence for now.
+    // after 2nd round of sync
+    // just before clean
+    // c4, c5, rc6, c7, c8.clean, c9
+    // after clean
+    // c4, c5, rc6, c7, c8.clean, c9, c10.clean (earliest commit to retain c9)
+    // after archival
+    // c5, rc6, c7, c8.clean, c9, c10.clean
+
+    // Step 5 : FirstReplaceHoodieInstant should not be retained.
+    long count = 
meta.reloadActiveTimeline().getCompletedReplaceTimeline().getInstantsAsStream().filter(instant
 -> firstReplaceHoodieInstant.get().equals(instant)).count();
+    assertEquals(0, count);
+
+    // Step 6 : All the replaced files in firstReplaceHoodieInstant should be 
deleted through sync/async cleaner.
+    for (String replacedFilePath : replacedFilePaths) {
+      assertFalse(meta.getFs().exists(new Path(replacedFilePath)));
     }
+    UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
   }
 
   private List<String> getAllMultiWriterConfigs() {
@@ -1284,47 +1227,39 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     cfg.configs.addAll(getTableServicesConfigs(totalRecords, "false", "false", 
"0", "false", "0"));
     cfg.configs.addAll(getAllMultiWriterConfigs());
     HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
-    HoodieDeltaStreamer ds2 = null;
-    try {
-      ds.sync();
+    ds.sync();
 
-      // assert ingest successful
-      TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs);
+    // assert ingest successful
+    TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs);
 
-      // schedule a clustering job to build a clustering plan
-      HoodieClusteringJob schedule = initialHoodieClusteringJob(tableBasePath, 
null, false, "schedule");
-      schedule.cluster(0);
+    // schedule a clustering job to build a clustering plan
+    HoodieClusteringJob schedule = initialHoodieClusteringJob(tableBasePath, 
null, false, "schedule");
+    schedule.cluster(0);
 
-      // do another ingestion
-      ds2 = new HoodieDeltaStreamer(cfg, jsc);
-      ds2.sync();
+    // do another ingestion
+    HoodieDeltaStreamer ds2 = new HoodieDeltaStreamer(cfg, jsc);
+    ds2.sync();
 
-      // convert clustering request into inflight, Simulate the last 
clustering failed scenario
-      HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tableBasePath).build();
-      List<HoodieInstant> hoodieClusteringInstants = 
meta.getActiveTimeline().filterPendingReplaceTimeline().getInstants();
-      HoodieInstant clusteringRequest = hoodieClusteringInstants.get(0);
-      HoodieInstant hoodieInflightInstant = 
meta.getActiveTimeline().transitionReplaceRequestedToInflight(clusteringRequest,
 Option.empty());
+    // convert clustering request into inflight, Simulate the last clustering 
failed scenario
+    HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tableBasePath).build();
+    List<HoodieInstant> hoodieClusteringInstants = 
meta.getActiveTimeline().filterPendingReplaceTimeline().getInstants();
+    HoodieInstant clusteringRequest = hoodieClusteringInstants.get(0);
+    HoodieInstant hoodieInflightInstant = 
meta.getActiveTimeline().transitionReplaceRequestedToInflight(clusteringRequest,
 Option.empty());
 
-      // trigger a scheduleAndExecute clustering job
-      // when retryFailedClustering true => will rollback and re-execute 
failed clustering plan with same instant timestamp.
-      // when retryFailedClustering false => will make and execute a new 
clustering plan with new instant timestamp.
-      HoodieClusteringJob scheduleAndExecute = 
initialHoodieClusteringJob(tableBasePath, null, false, "scheduleAndExecute", 
retryLastFailedClusteringJob, recordType);
-      scheduleAndExecute.cluster(0);
+    // trigger a scheduleAndExecute clustering job
+    // when retryFailedClustering true => will rollback and re-execute failed 
clustering plan with same instant timestamp.
+    // when retryFailedClustering false => will make and execute a new 
clustering plan with new instant timestamp.
+    HoodieClusteringJob scheduleAndExecute = 
initialHoodieClusteringJob(tableBasePath, null, false, "scheduleAndExecute", 
retryLastFailedClusteringJob, recordType);
+    scheduleAndExecute.cluster(0);
 
-      String completeClusteringTimeStamp = 
meta.getActiveTimeline().reload().getCompletedReplaceTimeline().lastInstant().get().getTimestamp();
+    String completeClusteringTimeStamp = 
meta.getActiveTimeline().reload().getCompletedReplaceTimeline().lastInstant().get().getTimestamp();
 
-      if (retryLastFailedClusteringJob) {
-        assertEquals(clusteringRequest.getTimestamp(), 
completeClusteringTimeStamp);
-      } else {
-        
assertFalse(clusteringRequest.getTimestamp().equalsIgnoreCase(completeClusteringTimeStamp));
-      }
-      UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
-    } finally {
-      ds.shutdownGracefully();
-      if (ds2 != null) {
-        ds2.shutdownGracefully();
-      }
+    if (retryLastFailedClusteringJob) {
+      assertEquals(clusteringRequest.getTimestamp(), 
completeClusteringTimeStamp);
+    } else {
+      
assertFalse(clusteringRequest.getTimestamp().equalsIgnoreCase(completeClusteringTimeStamp));
     }
+    UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
   }
 
   @ParameterizedTest
@@ -1550,77 +1485,51 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     addRecordMerger(recordType, cfg.configs);
     // NOTE: We should not have need to set below config, 'datestr' should 
have assumed date partitioning
     
cfg.configs.add("hoodie.datasource.hive_sync.partition_fields=year,month,day");
-    String lastInstantForUpstreamTable = null;
-    HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc, fs, 
hiveServer.getHiveConf());
-    try {
-      ds.sync();
-      assertRecordCount(1000, tableBasePath, sqlContext);
-      assertDistanceCount(1000, tableBasePath, sqlContext);
-      assertDistanceCountWithExactValue(1000, tableBasePath, sqlContext);
-      lastInstantForUpstreamTable = TestHelpers.assertCommitMetadata("00000", 
tableBasePath, fs, 1);
-    } finally {
-      ds.shutdownGracefully();
-    }
+    new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync();
+    assertRecordCount(1000, tableBasePath, sqlContext);
+    assertDistanceCount(1000, tableBasePath, sqlContext);
+    assertDistanceCountWithExactValue(1000, tableBasePath, sqlContext);
+    String lastInstantForUpstreamTable = 
TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
 
     // Now incrementally pull from the above hudi table and ingest to second 
table
     HoodieDeltaStreamer.Config downstreamCfg =
         TestHelpers.makeConfigForHudiIncrSrc(tableBasePath, 
downstreamTableBasePath, WriteOperationType.BULK_INSERT,
             true, null);
     addRecordMerger(recordType, downstreamCfg.configs);
-    ds = new HoodieDeltaStreamer(downstreamCfg, jsc, fs, 
hiveServer.getHiveConf());
-    try {
-      ds.sync();
-      assertRecordCount(1000, downstreamTableBasePath, sqlContext);
-      assertDistanceCount(1000, downstreamTableBasePath, sqlContext);
-      assertDistanceCountWithExactValue(1000, downstreamTableBasePath, 
sqlContext);
-      TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable, 
downstreamTableBasePath, fs, 1);
-    } finally {
-      ds.shutdownGracefully();
-    }
+    new HoodieDeltaStreamer(downstreamCfg, jsc, fs, 
hiveServer.getHiveConf()).sync();
+    assertRecordCount(1000, downstreamTableBasePath, sqlContext);
+    assertDistanceCount(1000, downstreamTableBasePath, sqlContext);
+    assertDistanceCountWithExactValue(1000, downstreamTableBasePath, 
sqlContext);
+    TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable, 
downstreamTableBasePath, fs, 1);
 
     // No new data => no commits for upstream table
     cfg.sourceLimit = 0;
-    ds = new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf());
-    try {
-      ds.sync();
-      assertRecordCount(1000, tableBasePath, sqlContext);
-      assertDistanceCount(1000, tableBasePath, sqlContext);
-      assertDistanceCountWithExactValue(1000, tableBasePath, sqlContext);
-      TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
-    } finally {
-      ds.shutdownGracefully();
-    }
+    new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync();
+    assertRecordCount(1000, tableBasePath, sqlContext);
+    assertDistanceCount(1000, tableBasePath, sqlContext);
+    assertDistanceCountWithExactValue(1000, tableBasePath, sqlContext);
+    TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
 
     // with no change in upstream table, no change in downstream too when 
pulled.
     HoodieDeltaStreamer.Config downstreamCfg1 =
         TestHelpers.makeConfigForHudiIncrSrc(tableBasePath, 
downstreamTableBasePath,
             WriteOperationType.BULK_INSERT, true, 
DummySchemaProvider.class.getName());
-    ds = new HoodieDeltaStreamer(downstreamCfg1, jsc);
-    try {
-      ds.sync();
-      assertRecordCount(1000, downstreamTableBasePath, sqlContext);
-      assertDistanceCount(1000, downstreamTableBasePath, sqlContext);
-      assertDistanceCountWithExactValue(1000, downstreamTableBasePath, 
sqlContext);
-      TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable, 
downstreamTableBasePath, fs, 1);
-    } finally {
-      ds.shutdownGracefully();
-    }
+    new HoodieDeltaStreamer(downstreamCfg1, jsc).sync();
+    assertRecordCount(1000, downstreamTableBasePath, sqlContext);
+    assertDistanceCount(1000, downstreamTableBasePath, sqlContext);
+    assertDistanceCountWithExactValue(1000, downstreamTableBasePath, 
sqlContext);
+    TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable, 
downstreamTableBasePath, fs, 1);
 
     // upsert() #1 on upstream hudi table
     cfg.sourceLimit = 2000;
     cfg.operation = WriteOperationType.UPSERT;
-    ds = new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf());
-    try {
-      ds.sync();
-      assertRecordCount(1950, tableBasePath, sqlContext);
-      assertDistanceCount(1950, tableBasePath, sqlContext);
-      assertDistanceCountWithExactValue(1950, tableBasePath, sqlContext);
-      lastInstantForUpstreamTable = TestHelpers.assertCommitMetadata("00001", 
tableBasePath, fs, 2);
-      List<Row> counts = countsPerCommit(tableBasePath, sqlContext);
-      assertEquals(1950, counts.stream().mapToLong(entry -> 
entry.getLong(1)).sum());
-    } finally {
-      ds.shutdownGracefully();
-    }
+    new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync();
+    assertRecordCount(1950, tableBasePath, sqlContext);
+    assertDistanceCount(1950, tableBasePath, sqlContext);
+    assertDistanceCountWithExactValue(1950, tableBasePath, sqlContext);
+    lastInstantForUpstreamTable = TestHelpers.assertCommitMetadata("00001", 
tableBasePath, fs, 2);
+    List<Row> counts = countsPerCommit(tableBasePath, sqlContext);
+    assertEquals(1950, counts.stream().mapToLong(entry -> 
entry.getLong(1)).sum());
 
     // Incrementally pull changes in upstream hudi table and apply to 
downstream table
     downstreamCfg =
@@ -1628,19 +1537,14 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
             false, null);
     addRecordMerger(recordType, downstreamCfg.configs);
     downstreamCfg.sourceLimit = 2000;
-    ds = new HoodieDeltaStreamer(downstreamCfg, jsc);
-    try {
-      ds.sync();
-      assertRecordCount(2000, downstreamTableBasePath, sqlContext);
-      assertDistanceCount(2000, downstreamTableBasePath, sqlContext);
-      assertDistanceCountWithExactValue(2000, downstreamTableBasePath, 
sqlContext);
-      String finalInstant =
-          TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable, 
downstreamTableBasePath, fs, 2);
-      List<Row> counts = countsPerCommit(downstreamTableBasePath, sqlContext);
-      assertEquals(2000, counts.stream().mapToLong(entry -> 
entry.getLong(1)).sum());
-    } finally {
-      ds.shutdownGracefully();
-    }
+    new HoodieDeltaStreamer(downstreamCfg, jsc).sync();
+    assertRecordCount(2000, downstreamTableBasePath, sqlContext);
+    assertDistanceCount(2000, downstreamTableBasePath, sqlContext);
+    assertDistanceCountWithExactValue(2000, downstreamTableBasePath, 
sqlContext);
+    String finalInstant =
+        TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable, 
downstreamTableBasePath, fs, 2);
+    counts = countsPerCommit(downstreamTableBasePath, sqlContext);
+    assertEquals(2000, counts.stream().mapToLong(entry -> 
entry.getLong(1)).sum());
 
     // Test Hive integration
     HiveSyncConfig hiveSyncConfig = getHiveSyncConfig(tableBasePath, 
"hive_trips");
@@ -1677,27 +1581,18 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(dataSetBasePath, 
WriteOperationType.BULK_INSERT,
         Collections.singletonList(SqlQueryBasedTransformer.class.getName()), 
PROPS_FILENAME_TEST_SOURCE, false,
         true, false, null, "MERGE_ON_READ");
-    HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc, fs, 
hiveServer.getHiveConf());
-    try {
-      ds.sync();
-      assertRecordCount(1000, dataSetBasePath, sqlContext);
-    } finally {
-      ds.shutdownGracefully();
-    }
+    new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync();
+    assertRecordCount(1000, dataSetBasePath, sqlContext);
 
     //now create one more deltaStreamer instance and update payload class
     cfg = TestHelpers.makeConfig(dataSetBasePath, 
WriteOperationType.BULK_INSERT,
         Collections.singletonList(SqlQueryBasedTransformer.class.getName()), 
PROPS_FILENAME_TEST_SOURCE, false,
         true, true, DummyAvroPayload.class.getName(), "MERGE_ON_READ");
-    try {
-      ds = new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf());
+    new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf());
 
-      //now assert that hoodie.properties file now has updated payload class 
name
-      HoodieTableMetaClient metaClient = createMetaClient(jsc, 
dataSetBasePath, false);
-      assertEquals(metaClient.getTableConfig().getPayloadClass(), 
DummyAvroPayload.class.getName());
-    } finally {
-      ds.shutdownGracefully();
-    }
+    //now assert that hoodie.properties file now has updated payload class name
+    HoodieTableMetaClient metaClient = createMetaClient(jsc, dataSetBasePath, 
false);
+    assertEquals(metaClient.getTableConfig().getPayloadClass(), 
DummyAvroPayload.class.getName());
   }
 
   @Test
@@ -1706,17 +1601,12 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(dataSetBasePath, 
WriteOperationType.BULK_INSERT,
           Collections.singletonList(SqlQueryBasedTransformer.class.getName()), 
PROPS_FILENAME_TEST_SOURCE, false,
           true, true, PartialUpdateAvroPayload.class.getName(), 
"MERGE_ON_READ");
-    HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc, fs, 
hiveServer.getHiveConf());
-    try {
-      ds.sync();
-      assertRecordCount(1000, dataSetBasePath, sqlContext);
+    new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync();
+    assertRecordCount(1000, dataSetBasePath, sqlContext);
 
-      //now assert that hoodie.properties file now has updated payload class 
name
-      HoodieTableMetaClient metaClient = createMetaClient(jsc, 
dataSetBasePath, false);
-      assertEquals(metaClient.getTableConfig().getPayloadClass(), 
PartialUpdateAvroPayload.class.getName());
-    } finally {
-      ds.shutdownGracefully();
-    }
+    //now assert that hoodie.properties file now has updated payload class name
+    HoodieTableMetaClient metaClient = createMetaClient(jsc, dataSetBasePath, 
false);
+    assertEquals(metaClient.getTableConfig().getPayloadClass(), 
PartialUpdateAvroPayload.class.getName());
   }
 
   @Test
@@ -1725,33 +1615,24 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(dataSetBasePath, 
WriteOperationType.BULK_INSERT,
         Collections.singletonList(SqlQueryBasedTransformer.class.getName()), 
PROPS_FILENAME_TEST_SOURCE, false,
         true, false, null, null);
-    HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc, fs, 
hiveServer.getHiveConf());
-    try {
-      ds.sync();
-      assertRecordCount(1000, dataSetBasePath, sqlContext);
-    } finally {
-      ds.shutdownGracefully();
-    }
+    new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync();
+    assertRecordCount(1000, dataSetBasePath, sqlContext);
 
     //now create one more deltaStreamer instance and update payload class
     cfg = TestHelpers.makeConfig(dataSetBasePath, 
WriteOperationType.BULK_INSERT,
         Collections.singletonList(SqlQueryBasedTransformer.class.getName()), 
PROPS_FILENAME_TEST_SOURCE, false,
         true, true, DummyAvroPayload.class.getName(), null);
-    ds = new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf());
-
-    try {
-      //now assert that hoodie.properties file does not have payload class 
prop since it is a COW table
-      Properties props = new Properties();
-      String metaPath = dataSetBasePath + "/.hoodie/hoodie.properties";
-      FileSystem fs = FSUtils.getFs(cfg.targetBasePath, 
jsc.hadoopConfiguration());
-      try (FSDataInputStream inputStream = fs.open(new Path(metaPath))) {
-        props.load(inputStream);
-      }
+    new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf());
 
-      
assertFalse(props.containsKey(HoodieTableConfig.PAYLOAD_CLASS_NAME.key()));
-    } finally {
-      ds.shutdownGracefully();
+    //now assert that hoodie.properties file does not have payload class prop 
since it is a COW table
+    Properties props = new Properties();
+    String metaPath = dataSetBasePath + "/.hoodie/hoodie.properties";
+    FileSystem fs = FSUtils.getFs(cfg.targetBasePath, 
jsc.hadoopConfiguration());
+    try (FSDataInputStream inputStream = fs.open(new Path(metaPath))) {
+      props.load(inputStream);
     }
+
+    assertFalse(props.containsKey(HoodieTableConfig.PAYLOAD_CLASS_NAME.key()));
   }
 
   @ParameterizedTest
@@ -1762,12 +1643,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     // Initial bulk insert
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.BULK_INSERT);
     addRecordMerger(recordType, cfg.configs);
-    HoodieDeltaStreamer ds1 = new HoodieDeltaStreamer(cfg, jsc);
-    try {
-      ds1.sync();
-    } finally {
-      ds1.shutdownGracefully();
-    }
+    new HoodieDeltaStreamer(cfg, jsc).sync();
     assertRecordCount(1000, tableBasePath, sqlContext);
     TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
 
@@ -1775,12 +1651,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     cfg.filterDupes = true;
     cfg.sourceLimit = 2000;
     cfg.operation = WriteOperationType.INSERT;
-    HoodieDeltaStreamer ds2 = new HoodieDeltaStreamer(cfg, jsc);
-    try {
-      ds2.sync();
-    } finally {
-      ds2.shutdownGracefully();
-    }
+    new HoodieDeltaStreamer(cfg, jsc).sync();
     assertRecordCount(2000, tableBasePath, sqlContext);
     TestHelpers.assertCommitMetadata("00001", tableBasePath, fs, 2);
     // 1000 records for commit 00000 & 1000 for commit 00001
@@ -1797,12 +1668,8 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     cfg2.sourceLimit = 2000;
     cfg2.operation = WriteOperationType.UPSERT;
     cfg2.configs.add(String.format("%s=false", 
HoodieCleanConfig.AUTO_CLEAN.key()));
-    HoodieDeltaStreamer ds3 = new HoodieDeltaStreamer(cfg2, jsc);
-    try {
-      ds3.sync();
-    } finally {
-      ds3.shutdownGracefully();
-    }
+    HoodieDeltaStreamer ds2 = new HoodieDeltaStreamer(cfg2, jsc);
+    ds2.sync();
     mClient = 
HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(tableBasePath).setLoadActiveTimelineOnLoad(true).build();
     HoodieInstant newLastFinished = 
mClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get();
     
assertTrue(HoodieTimeline.compareTimestamps(newLastFinished.getTimestamp(), 
HoodieTimeline.GREATER_THAN, lastFinished.getTimestamp()
@@ -1817,13 +1684,10 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     // Try UPSERT with filterDupes true. Expect exception
     cfg2.filterDupes = true;
     cfg2.operation = WriteOperationType.UPSERT;
-    HoodieDeltaStreamer ds4 = new HoodieDeltaStreamer(cfg2, jsc);
     try {
-      ds4.sync();
+      new HoodieDeltaStreamer(cfg2, jsc).sync();
     } catch (IllegalArgumentException e) {
       assertTrue(e.getMessage().contains("'--filter-dupes' needs to be 
disabled when '--op' is 'UPSERT' to ensure updates are not missed."));
-    } finally {
-      ds4.shutdownGracefully();
     }
     UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
   }
@@ -1876,45 +1740,42 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
             transformerClassNames, PROPS_FILENAME_TEST_PARQUET, false,
             useSchemaProvider, 100000, false, null, null, "timestamp", null);
     HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(cfg, jsc);
-    try {
-      deltaStreamer.sync();
-      assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
-    } finally {
-      deltaStreamer.shutdownGracefully();
-    }
-    HoodieDeltaStreamer deltaStreamer1 = new HoodieDeltaStreamer(cfg, jsc);
-    try {
-      if (testEmptyBatch) {
-        prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT, "2.parquet", false, 
null, null);
-        prepareParquetDFSSource(useSchemaProvider, hasTransformer, 
"source.avsc", "target.avsc",
-                PROPS_FILENAME_TEST_PARQUET, PARQUET_SOURCE_ROOT, false, 
"partition_path", "0");
-        deltaStreamer1.sync();
-        // since we mimic'ed empty batch, total records should be same as 
first sync().
-        assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
-        HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(jsc.hadoopConfiguration()).build();
-
-        // validate table schema fetches valid schema from last but one commit.
-        TableSchemaResolver tableSchemaResolver = new 
TableSchemaResolver(metaClient);
-        assertNotEquals(tableSchemaResolver.getTableAvroSchema(), 
Schema.create(Schema.Type.NULL).toString());
-        // schema from latest commit and last but one commit should match
-        compareLatestTwoSchemas(metaClient);
-        prepareParquetDFSSource(useSchemaProvider, hasTransformer, 
"source.avsc", "target.avsc", PROPS_FILENAME_TEST_PARQUET,
-                PARQUET_SOURCE_ROOT, false, "partition_path", "");
-      }
+    deltaStreamer.sync();
+    assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
+    deltaStreamer.shutdownGracefully();
 
-      // proceed w/ non empty batch.
-      prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT, "3.parquet", false, 
null, null);
-      deltaStreamer.sync();
-      assertRecordCount(parquetRecordsCount + 100, tableBasePath, sqlContext);
-      // validate commit metadata for all completed commits to have valid 
schema in extra metadata.
+    if (testEmptyBatch) {
+      prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT, "2.parquet", false, 
null, null);
+      prepareParquetDFSSource(useSchemaProvider, hasTransformer, 
"source.avsc", "target.avsc", PROPS_FILENAME_TEST_PARQUET,
+          PARQUET_SOURCE_ROOT, false, "partition_path", "0");
+      HoodieDeltaStreamer deltaStreamer1 = new HoodieDeltaStreamer(cfg, jsc);
+      deltaStreamer1.sync();
+      // since we mimic'ed empty batch, total records should be same as first 
sync().
+      assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
       HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(jsc.hadoopConfiguration()).build();
-      
metaClient.reloadActiveTimeline().getCommitsTimeline().filterCompletedInstants().getInstants()
-              .forEach(entry -> 
assertValidSchemaAndOperationTypeInCommitMetadata(
-                      entry, metaClient, WriteOperationType.INSERT));
-      testNum++;
-    } finally {
+
+      // validate table schema fetches valid schema from last but one commit.
+      TableSchemaResolver tableSchemaResolver = new 
TableSchemaResolver(metaClient);
+      assertNotEquals(tableSchemaResolver.getTableAvroSchema(), 
Schema.create(Schema.Type.NULL).toString());
+      // schema from latest commit and last but one commit should match
+      compareLatestTwoSchemas(metaClient);
+      prepareParquetDFSSource(useSchemaProvider, hasTransformer, 
"source.avsc", "target.avsc", PROPS_FILENAME_TEST_PARQUET,
+          PARQUET_SOURCE_ROOT, false, "partition_path", "");
       deltaStreamer1.shutdownGracefully();
     }
+
+    // proceed w/ non empty batch.
+    prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT, "3.parquet", false, null, 
null);
+    deltaStreamer.sync();
+    assertRecordCount(parquetRecordsCount + 100, tableBasePath, sqlContext);
+    // validate commit metadata for all completed commits to have valid schema 
in extra metadata.
+    HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(jsc.hadoopConfiguration()).build();
+    metaClient.reloadActiveTimeline().getCommitsTimeline()
+        .filterCompletedInstants().getInstants()
+        .forEach(entry -> assertValidSchemaAndOperationTypeInCommitMetadata(
+            entry, metaClient, WriteOperationType.INSERT));
+    testNum++;
+    deltaStreamer.shutdownGracefully();
   }
 
   private void assertValidSchemaAndOperationTypeInCommitMetadata(HoodieInstant 
instant,
@@ -1961,13 +1822,9 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
         TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, 
ORCDFSSource.class.getName(),
             transformerClassNames, PROPS_FILENAME_TEST_ORC, false,
             useSchemaProvider, 100000, false, null, null, "timestamp", null), 
jsc);
-    try {
-      deltaStreamer.sync();
-      assertRecordCount(ORC_NUM_RECORDS, tableBasePath, sqlContext);
-      testNum++;
-    } finally {
-      deltaStreamer.shutdownGracefully();
-    }
+    deltaStreamer.sync();
+    assertRecordCount(ORC_NUM_RECORDS, tableBasePath, sqlContext);
+    testNum++;
   }
 
   private void prepareJsonKafkaDFSSource(String propsFileName, String 
autoResetValue, String topicName) throws IOException {
@@ -2015,12 +1872,9 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
         TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, 
ParquetDFSSource.class.getName(),
             Collections.emptyList(), PROPS_FILENAME_TEST_PARQUET, false,
             true, 100000, false, null, null, "timestamp", null), jsc);
-    try {
-      deltaStreamer.sync();
-      assertRecordCount(parquetRecords, tableBasePath, sqlContext);
-    } finally {
-      deltaStreamer.shutdownGracefully();
-    }
+    deltaStreamer.sync();
+    assertRecordCount(parquetRecords, tableBasePath, sqlContext);
+    deltaStreamer.shutdownGracefully();
 
     // prep json kafka source
     topicName = "topic" + testNum;
@@ -2031,21 +1885,17 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
         TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, 
JsonKafkaSource.class.getName(),
             Collections.emptyList(), PROPS_FILENAME_TEST_JSON_KAFKA, false,
             true, 100000, false, null, null, "timestamp", null), jsc);
-    try {
-      deltaStreamer.sync();
-      // if auto reset value is set to LATEST, this all kafka records so far 
may not be synced.
-      int totalExpectedRecords = parquetRecords + ((autoResetToLatest) ? 0 : 
JSON_KAFKA_NUM_RECORDS);
-      assertRecordCount(totalExpectedRecords, tableBasePath, sqlContext);
+    deltaStreamer.sync();
+    // if auto reset value is set to LATEST, this all kafka records so far may 
not be synced.
+    int totalExpectedRecords = parquetRecords + ((autoResetToLatest) ? 0 : 
JSON_KAFKA_NUM_RECORDS);
+    assertRecordCount(totalExpectedRecords, tableBasePath, sqlContext);
 
-      // verify 2nd batch to test LATEST auto reset value.
-      prepareJsonKafkaDFSFiles(20, false, topicName);
-      totalExpectedRecords += 20;
-      deltaStreamer.sync();
-      assertRecordCount(totalExpectedRecords, tableBasePath, sqlContext);
-      testNum++;
-    } finally {
-      deltaStreamer.shutdownGracefully();
-    }
+    // verify 2nd batch to test LATEST auto reset value.
+    prepareJsonKafkaDFSFiles(20, false, topicName);
+    totalExpectedRecords += 20;
+    deltaStreamer.sync();
+    assertRecordCount(totalExpectedRecords, tableBasePath, sqlContext);
+    testNum++;
   }
 
   @Test
@@ -2058,20 +1908,15 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
         TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, 
JsonKafkaSource.class.getName(),
             Collections.emptyList(), PROPS_FILENAME_TEST_JSON_KAFKA, false,
             true, 100000, false, null, null, "timestamp", null), jsc);
+    deltaStreamer.sync();
+    assertRecordCount(JSON_KAFKA_NUM_RECORDS, tableBasePath, sqlContext);
 
-    try {
-      deltaStreamer.sync();
-      assertRecordCount(JSON_KAFKA_NUM_RECORDS, tableBasePath, sqlContext);
-
-      int totalRecords = JSON_KAFKA_NUM_RECORDS;
-      int records = 10;
-      totalRecords += records;
-      prepareJsonKafkaDFSFiles(records, false, topicName);
-      deltaStreamer.sync();
-      assertRecordCount(totalRecords, tableBasePath, sqlContext);
-    } finally {
-      deltaStreamer.shutdownGracefully();
-    }
+    int totalRecords = JSON_KAFKA_NUM_RECORDS;
+    int records = 10;
+    totalRecords += records;
+    prepareJsonKafkaDFSFiles(records, false, topicName);
+    deltaStreamer.sync();
+    assertRecordCount(totalRecords, tableBasePath, sqlContext);
   }
 
   @Test
@@ -2088,31 +1933,27 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
         TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, 
JsonKafkaSource.class.getName(),
             Collections.emptyList(), PROPS_FILENAME_TEST_JSON_KAFKA, false,
             true, 100000, false, null, null, "timestamp", null), jsc);
-    try {
-      deltaStreamer.sync();
-      sqlContext.clearCache();
-      Dataset<Row> ds = 
sqlContext.read().format("org.apache.hudi").load(tableBasePath);
-      assertEquals(numRecords, ds.count());
-      //ensure that kafka partition column exists and is populated correctly
-      for (int i = 0; i < numPartitions; i++) {
-        assertEquals(recsPerPartition, ds.filter(KAFKA_SOURCE_PARTITION_COLUMN 
+ "=" + i).count());
-      }
+    deltaStreamer.sync();
+    sqlContext.clearCache();
+    Dataset<Row> ds = 
sqlContext.read().format("org.apache.hudi").load(tableBasePath);
+    assertEquals(numRecords, ds.count());
+    //ensure that kafka partition column exists and is populated correctly
+    for (int i = 0; i < numPartitions; i++) {
+      assertEquals(recsPerPartition, ds.filter(KAFKA_SOURCE_PARTITION_COLUMN + 
"=" + i).count());
+    }
 
-      //ensure that kafka timestamp column exists and is populated correctly
-      long afterTime = Instant.now().toEpochMilli();
-      assertEquals(numRecords, ds.filter(KAFKA_SOURCE_TIMESTAMP_COLUMN + ">" + 
beforeTime).filter(KAFKA_SOURCE_TIMESTAMP_COLUMN + "<" + afterTime).count());
+    //ensure that kafka timestamp column exists and is populated correctly
+    long afterTime = Instant.now().toEpochMilli();
+    assertEquals(numRecords, ds.filter(KAFKA_SOURCE_TIMESTAMP_COLUMN + ">" + 
beforeTime).filter(KAFKA_SOURCE_TIMESTAMP_COLUMN + "<" + afterTime).count());
 
 
-      //ensure that kafka offset column exists and is populated correctly
-      
sqlContext.read().format("org.apache.hudi").load(tableBasePath).col(KAFKA_SOURCE_OFFSET_COLUMN);
-      for (int i = 0; i < recsPerPartition; i++) {
-        for (int j = 0; j < numPartitions; j++) {
-          //each offset partition pair should be unique
-          assertEquals(1, ds.filter(KAFKA_SOURCE_OFFSET_COLUMN + "=" + 
i).filter(KAFKA_SOURCE_PARTITION_COLUMN + "=" + j).count());
-        }
+    //ensure that kafka offset column exists and is populated correctly
+    
sqlContext.read().format("org.apache.hudi").load(tableBasePath).col(KAFKA_SOURCE_OFFSET_COLUMN);
+    for (int i = 0; i < recsPerPartition; i++) {
+      for (int j = 0; j < numPartitions; j++) {
+        //each offset partition pair should be unique
+        assertEquals(1, ds.filter(KAFKA_SOURCE_OFFSET_COLUMN + "=" + 
i).filter(KAFKA_SOURCE_PARTITION_COLUMN + "=" + j).count());
       }
-    } finally {
-      deltaStreamer.shutdownGracefully();
     }
   }
 
@@ -2128,22 +1969,17 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
             Collections.emptyList(), PROPS_FILENAME_TEST_JSON_KAFKA, false,
             true, 100000, false, null,
             null, "timestamp", String.valueOf(System.currentTimeMillis())), 
jsc);
-    try {
-      deltaStreamer.sync();
-      deltaStreamer.shutdownGracefully();
-      assertRecordCount(JSON_KAFKA_NUM_RECORDS, tableBasePath, sqlContext);
+    deltaStreamer.sync();
+    assertRecordCount(JSON_KAFKA_NUM_RECORDS, tableBasePath, sqlContext);
 
-      prepareJsonKafkaDFSFiles(JSON_KAFKA_NUM_RECORDS, false, topicName);
-      deltaStreamer = new HoodieDeltaStreamer(
-          TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, 
JsonKafkaSource.class.getName(),
-              Collections.emptyList(), PROPS_FILENAME_TEST_JSON_KAFKA, false,
-              true, 100000, false, null, null,
-              "timestamp", String.valueOf(System.currentTimeMillis())), jsc);
-      deltaStreamer.sync();
-      assertRecordCount(JSON_KAFKA_NUM_RECORDS * 2, tableBasePath, sqlContext);
-    } finally {
-      deltaStreamer.shutdownGracefully();
-    }
+    prepareJsonKafkaDFSFiles(JSON_KAFKA_NUM_RECORDS, false, topicName);
+    deltaStreamer = new HoodieDeltaStreamer(
+        TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, 
JsonKafkaSource.class.getName(),
+            Collections.emptyList(), PROPS_FILENAME_TEST_JSON_KAFKA, false,
+            true, 100000, false, null, null,
+            "timestamp", String.valueOf(System.currentTimeMillis())), jsc);
+    deltaStreamer.sync();
+    assertRecordCount(JSON_KAFKA_NUM_RECORDS * 2, tableBasePath, sqlContext);
   }
 
   @Disabled("HUDI-6609")
@@ -2262,40 +2098,36 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
         TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, 
testInitFailure ? TestParquetDFSSourceEmptyBatch.class.getName() : 
ParquetDFSSource.class.getName(),
             null, PROPS_FILENAME_TEST_PARQUET, false,
             useSchemaProvider, 100000, false, null, null, "timestamp", null), 
jsc);
-    try {
-      deltaStreamer.sync();
+    deltaStreamer.sync();
 
-      if (testInitFailure) {
-        FileStatus[] fileStatuses = fs.listStatus(new Path(tableBasePath + 
"/.hoodie/"));
-        Arrays.stream(fileStatuses).filter(entry -> 
entry.getPath().getName().contains("commit") || 
entry.getPath().getName().contains("inflight")).forEach(entry -> {
-          try {
-            fs.delete(entry.getPath());
-          } catch (IOException e) {
-            LOG.warn("Failed to delete " + entry.getPath().toString(), e);
-          }
-        });
-      }
-      // delete hoodie.properties
-      fs.delete(new Path(tableBasePath + "/.hoodie/hoodie.properties"));
-
-      // restart the pipeline.
-      if (testInitFailure) { // should succeed.
-        deltaStreamer = new HoodieDeltaStreamer(
-            TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, 
ParquetDFSSource.class.getName(),
-                null, PROPS_FILENAME_TEST_PARQUET, false,
-                useSchemaProvider, 100000, false, null, null, "timestamp", 
null), jsc);
-        deltaStreamer.sync();
-        assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
-      } else {
-        assertThrows(HoodieIOException.class, () -> new HoodieDeltaStreamer(
-            TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, 
ParquetDFSSource.class.getName(),
-                null, PROPS_FILENAME_TEST_PARQUET, false,
-                useSchemaProvider, 100000, false, null, null, "timestamp", 
null), jsc));
-      }
-    } finally {
-      deltaStreamer.shutdownGracefully();
-      testNum++;
+    if (testInitFailure) {
+      FileStatus[] fileStatuses = fs.listStatus(new Path(tableBasePath + 
"/.hoodie/"));
+      Arrays.stream(fileStatuses).filter(entry -> 
entry.getPath().getName().contains("commit") || 
entry.getPath().getName().contains("inflight")).forEach(entry -> {
+        try {
+          fs.delete(entry.getPath());
+        } catch (IOException e) {
+          LOG.warn("Failed to delete " + entry.getPath().toString(), e);
+        }
+      });
+    }
+    // delete hoodie.properties
+    fs.delete(new Path(tableBasePath + "/.hoodie/hoodie.properties"));
+
+    // restart the pipeline.
+    if (testInitFailure) { // should succeed.
+      deltaStreamer = new HoodieDeltaStreamer(
+          TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, 
ParquetDFSSource.class.getName(),
+              null, PROPS_FILENAME_TEST_PARQUET, false,
+              useSchemaProvider, 100000, false, null, null, "timestamp", 
null), jsc);
+      deltaStreamer.sync();
+      assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
+    } else {
+      assertThrows(HoodieIOException.class, () -> new HoodieDeltaStreamer(
+          TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, 
ParquetDFSSource.class.getName(),
+              null, PROPS_FILENAME_TEST_PARQUET, false,
+              useSchemaProvider, 100000, false, null, null, "timestamp", 
null), jsc));
     }
+    testNum++;
   }
 
   @Test
@@ -2381,13 +2213,9 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
             tableBasePath, WriteOperationType.INSERT, 
CsvDFSSource.class.getName(),
             transformerClassNames, PROPS_FILENAME_TEST_CSV, false,
             useSchemaProvider, 1000, false, null, null, sourceOrderingField, 
null), jsc);
-    try {
-      deltaStreamer.sync();
-      assertRecordCount(CSV_NUM_RECORDS, tableBasePath, sqlContext);
-    } finally {
-      deltaStreamer.shutdownGracefully();
-      testNum++;
-    }
+    deltaStreamer.sync();
+    assertRecordCount(CSV_NUM_RECORDS, tableBasePath, sqlContext);
+    testNum++;
   }
 
   @Test
@@ -2505,12 +2333,8 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
             tableBasePath, WriteOperationType.INSERT, 
SqlSource.class.getName(),
             Collections.emptyList(), PROPS_FILENAME_TEST_SQL_SOURCE, false,
             false, 1000, false, null, null, "timestamp", null, true), jsc);
-    try {
-      deltaStreamer.sync();
-      assertRecordCount(SQL_SOURCE_NUM_RECORDS, tableBasePath, sqlContext);
-    } finally {
-      deltaStreamer.shutdownGracefully();
-    }
+    deltaStreamer.sync();
+    assertRecordCount(SQL_SOURCE_NUM_RECORDS, tableBasePath, sqlContext);
   }
 
   @Disabled
@@ -2562,12 +2386,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
         TestHelpers.makeConfigForHudiIncrSrc(tableBasePath, 
downstreamTableBasePath,
             WriteOperationType.BULK_INSERT, true, null);
     
downstreamCfg.configs.add("hoodie.deltastreamer.source.hoodieincr.num_instants=1");
-    HoodieDeltaStreamer ds = new HoodieDeltaStreamer(downstreamCfg, jsc);
-    try {
-      ds.sync();
-    } finally {
-      ds.shutdownGracefully();
-    }
+    new HoodieDeltaStreamer(downstreamCfg, jsc).sync();
 
     insertInTable(tableBasePath, 9, WriteOperationType.UPSERT);
     //No change as this fails with Path not exist error
@@ -2584,18 +2403,8 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     //Adding this conf to make testing easier :)
     
downstreamCfg.configs.add("hoodie.deltastreamer.source.hoodieincr.num_instants=10");
     downstreamCfg.operation = WriteOperationType.UPSERT;
-    ds = new HoodieDeltaStreamer(downstreamCfg, jsc);
-    try {
-      ds.sync();
-    } finally {
-      ds.shutdownGracefully();
-    }
-    ds = new HoodieDeltaStreamer(downstreamCfg, jsc);
-    try {
-      ds.sync();
-    } finally {
-      ds.shutdownGracefully();
-    }
+    new HoodieDeltaStreamer(downstreamCfg, jsc).sync();
+    new HoodieDeltaStreamer(downstreamCfg, jsc).sync();
 
     long baseTableRecords = 
sqlContext.read().format("org.apache.hudi").load(tableBasePath).count();
     long downStreamTableRecords = 
sqlContext.read().format("org.apache.hudi").load(downstreamTableBasePath).count();
@@ -2612,6 +2421,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     cfg.configs.add("hoodie.keep.min.commits=4");
     cfg.configs.add("hoodie.keep.max.commits=5");
     cfg.configs.add("hoodie.test.source.generate.inserts=true");
+
     for (int i = 0; i < count; i++) {
       new HoodieDeltaStreamer(cfg, jsc).sync();
     }
@@ -2643,13 +2453,9 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
         TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, 
ParquetDFSSource.class.getName(),
             null, PROPS_FILENAME_TEST_PARQUET, false,
             false, 100000, false, null, null, "timestamp", null), jsc);
-    try {
-      deltaStreamer.sync();
-      assertRecordCount(PARQUET_NUM_RECORDS, tableBasePath, sqlContext);
-    } finally {
-      deltaStreamer.shutdownGracefully();
-      testNum++;
-    }
+    deltaStreamer.sync();
+    assertRecordCount(PARQUET_NUM_RECORDS, tableBasePath, sqlContext);
+    testNum++;
 
     prepareParquetDFSFiles(PARQUET_NUM_RECORDS, PARQUET_SOURCE_ROOT);
     prepareParquetDFSSource(false, false);
@@ -2658,16 +2464,12 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
         TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.DELETE_PARTITION, ParquetDFSSource.class.getName(),
             
Collections.singletonList(TestSpecificPartitionTransformer.class.getName()), 
PROPS_FILENAME_TEST_PARQUET, false,
             false, 100000, false, null, null, "timestamp", null), jsc);
-    try {
-      deltaStreamer.sync();
-      // No records should match the 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION.
-      assertNoPartitionMatch(tableBasePath, sqlContext, 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
+    deltaStreamer.sync();
+    // No records should match the 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION.
+    assertNoPartitionMatch(tableBasePath, sqlContext, 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
 
-      // There should not be any fileIDs in the deleted partition
-      assertTrue(getAllFileIDsInTable(tableBasePath, 
Option.of(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).isEmpty());
-    } finally {
-      deltaStreamer.shutdownGracefully();
-    }
+    // There should not be any fileIDs in the deleted partition
+    assertTrue(getAllFileIDsInTable(tableBasePath, 
Option.of(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).isEmpty());
   }
 
   @Test
@@ -2691,15 +2493,10 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     // Initial insert
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.BULK_INSERT);
     addRecordMerger(recordType, cfg.configs);
-    HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
-    try {
-      ds.sync();
-      assertRecordCount(1000, tableBasePath, sqlContext);
-      assertDistanceCount(1000, tableBasePath, sqlContext);
-      TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
-    } finally {
-      ds.shutdownGracefully();
-    }
+    new HoodieDeltaStreamer(cfg, jsc).sync();
+    assertRecordCount(1000, tableBasePath, sqlContext);
+    assertDistanceCount(1000, tableBasePath, sqlContext);
+    TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
 
     // Collect the fileIds before running HoodieDeltaStreamer
     Set<String> beforeFileIDs = getAllFileIDsInTable(tableBasePath, 
Option.empty());
@@ -2708,40 +2505,29 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     cfg.operation = operationType;
     // No new data => no commits.
     cfg.sourceLimit = 0;
-    ds = new HoodieDeltaStreamer(cfg, jsc);
-    try {
-      ds.sync();
-
-      if (operationType == WriteOperationType.INSERT_OVERWRITE) {
-        assertRecordCount(1000, tableBasePath, sqlContext);
-        assertDistanceCount(1000, tableBasePath, sqlContext);
-        TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
-      } else if (operationType == WriteOperationType.INSERT_OVERWRITE_TABLE) {
-        HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(tableBasePath).build();
-        final HoodieTableFileSystemView fsView = new 
HoodieTableFileSystemView(metaClient, 
metaClient.getCommitsAndCompactionTimeline());
-        assertEquals(0, fsView.getLatestFileSlices("").count());
-        TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
-
-        // Since the table has been overwritten all fileIDs before should have 
been replaced
-        Set<String> afterFileIDs = getAllFileIDsInTable(tableBasePath, 
Option.empty());
-        assertTrue(afterFileIDs.isEmpty());
-      }
-    } finally {
-      ds.shutdownGracefully();
-    }
+    new HoodieDeltaStreamer(cfg, jsc).sync();
 
+    if (operationType == WriteOperationType.INSERT_OVERWRITE) {
+      assertRecordCount(1000, tableBasePath, sqlContext);
+      assertDistanceCount(1000, tableBasePath, sqlContext);
+      TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
+    } else if (operationType == WriteOperationType.INSERT_OVERWRITE_TABLE) {
+      HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(tableBasePath).build();
+      final HoodieTableFileSystemView fsView = new 
HoodieTableFileSystemView(metaClient, 
metaClient.getCommitsAndCompactionTimeline());
+      assertEquals(0, fsView.getLatestFileSlices("").count());
+      TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
 
-    cfg.sourceLimit = 1000;
-    ds = new HoodieDeltaStreamer(cfg, jsc);
-    try {
-      ds.sync();
-      assertRecordCount(950, tableBasePath, sqlContext);
-      assertDistanceCount(950, tableBasePath, sqlContext);
-      TestHelpers.assertCommitMetadata("00001", tableBasePath, fs, 2);
-      UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
-    } finally {
-      ds.shutdownGracefully();
+      // Since the table has been overwritten all fileIDs before should have 
been replaced
+      Set<String> afterFileIDs = getAllFileIDsInTable(tableBasePath, 
Option.empty());
+      assertTrue(afterFileIDs.isEmpty());
     }
+
+    cfg.sourceLimit = 1000;
+    new HoodieDeltaStreamer(cfg, jsc).sync();
+    assertRecordCount(950, tableBasePath, sqlContext);
+    assertDistanceCount(950, tableBasePath, sqlContext);
+    TestHelpers.assertCommitMetadata("00001", tableBasePath, fs, 2);
+    UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
   }
 
   @Test
@@ -2786,24 +2572,20 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     addRecordMerger(recordType, cfg.configs);
     cfg.configs.add(String.format("%s=%s", 
HoodieTableConfig.DROP_PARTITION_COLUMNS.key(), "true"));
     HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
-    try {
-      ds.sync();
-      // assert ingest successful
-      TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs);
-
-      TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(
-          
HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(fs.getConf()).build());
-      // get schema from data file written in the latest commit
-      Schema tableSchema = 
tableSchemaResolver.getTableAvroSchemaFromDataFile();
-      assertNotNull(tableSchema);
-
-      List<String> tableFields = 
tableSchema.getFields().stream().map(Schema.Field::name).collect(Collectors.toList());
-      // now assert that the partition column is not in the target schema
-      assertFalse(tableFields.contains("partition_path"));
-      UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
-    } finally {
-      ds.shutdownGracefully();
-    }
+    ds.sync();
+    // assert ingest successful
+    TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs);
+
+    TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(
+            
HoodieTableMetaClient.builder().setBasePath(tableBasePath).setConf(fs.getConf()).build());
+    // get schema from data file written in the latest commit
+    Schema tableSchema = tableSchemaResolver.getTableAvroSchemaFromDataFile();
+    assertNotNull(tableSchema);
+
+    List<String> tableFields = 
tableSchema.getFields().stream().map(Schema.Field::name).collect(Collectors.toList());
+    // now assert that the partition column is not in the target schema
+    assertFalse(tableFields.contains("partition_path"));
+    UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
   }
 
   @Test
@@ -2816,20 +2598,15 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     cfg.enableMetaSync = true;
     cfg.forceEmptyMetaSync = true;
 
-    HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc, fs, 
hiveServer.getHiveConf());
-    try {
-      ds.sync();
-      assertRecordCount(0, tableBasePath, sqlContext);
-
-      // make sure hive table is present
-      HiveSyncConfig hiveSyncConfig = getHiveSyncConfig(tableBasePath, 
"hive_trips");
-      hiveSyncConfig.setHadoopConf(hiveServer.getHiveConf());
-      HoodieHiveSyncClient hiveClient = new 
HoodieHiveSyncClient(hiveSyncConfig);
-      final String tableName = hiveSyncConfig.getString(META_SYNC_TABLE_NAME);
-      assertTrue(hiveClient.tableExists(tableName), "Table " + tableName + " 
should exist");
-    } finally {
-      ds.shutdownGracefully();
-    }
+    new HoodieDeltaStreamer(cfg, jsc, fs, hiveServer.getHiveConf()).sync();
+    assertRecordCount(0, tableBasePath, sqlContext);
+
+    // make sure hive table is present
+    HiveSyncConfig hiveSyncConfig = getHiveSyncConfig(tableBasePath, 
"hive_trips");
+    hiveSyncConfig.setHadoopConf(hiveServer.getHiveConf());
+    HoodieHiveSyncClient hiveClient = new HoodieHiveSyncClient(hiveSyncConfig);
+    final String tableName = hiveSyncConfig.getString(META_SYNC_TABLE_NAME);
+    assertTrue(hiveClient.tableExists(tableName), "Table " + tableName + " 
should exist");
   }
 
   @Test
@@ -2837,15 +2614,10 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     String tableBasePath = basePath + 
"/test_resume_checkpoint_after_changing_cow_to_mor";
     // default table type is COW
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.BULK_INSERT);
-    HoodieDeltaStreamer ds1 = new HoodieDeltaStreamer(cfg, jsc);
-    try {
-      ds1.sync();
-      assertRecordCount(1000, tableBasePath, sqlContext);
-      TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
-      TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs);
-    } finally {
-      ds1.shutdownGracefully();
-    }
+    new HoodieDeltaStreamer(cfg, jsc).sync();
+    assertRecordCount(1000, tableBasePath, sqlContext);
+    TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
+    TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs);
 
     // change cow to mor
     HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
@@ -2864,39 +2636,29 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     // continue deltastreamer
     cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT);
     cfg.tableType = HoodieTableType.MERGE_ON_READ.name();
-    HoodieDeltaStreamer ds2 = new HoodieDeltaStreamer(cfg, jsc);
-    try {
-      ds2.sync();
-      // out of 1000 new records, 500 are inserts, 450 are updates and 50 are 
deletes.
-      assertRecordCount(1450, tableBasePath, sqlContext);
-      TestHelpers.assertCommitMetadata("00001", tableBasePath, fs, 2);
-      List<Row> counts = countsPerCommit(tableBasePath, sqlContext);
-      assertEquals(1450, counts.stream().mapToLong(entry -> 
entry.getLong(1)).sum());
-      TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs);
-      // currently there should be 1 deltacommits now
-      TestHelpers.assertAtleastNDeltaCommits(1, tableBasePath, fs);
-    } finally {
-      ds2.shutdownGracefully();
-    }
+    new HoodieDeltaStreamer(cfg, jsc).sync();
+    // out of 1000 new records, 500 are inserts, 450 are updates and 50 are 
deletes.
+    assertRecordCount(1450, tableBasePath, sqlContext);
+    TestHelpers.assertCommitMetadata("00001", tableBasePath, fs, 2);
+    List<Row> counts = countsPerCommit(tableBasePath, sqlContext);
+    assertEquals(1450, counts.stream().mapToLong(entry -> 
entry.getLong(1)).sum());
+    TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs);
+    // currently there should be 1 deltacommits now
+    TestHelpers.assertAtleastNDeltaCommits(1, tableBasePath, fs);
 
     // test the table type is already mor
-    HoodieDeltaStreamer ds3 = new HoodieDeltaStreamer(cfg, jsc);
-    try {
-      ds3.sync();
-      // out of 1000 new records, 500 are inserts, 450 are updates and 50 are 
deletes.
-      // total records should be 1900 now
-      assertRecordCount(1900, tableBasePath, sqlContext);
-      TestHelpers.assertCommitMetadata("00002", tableBasePath, fs, 3);
-      List<Row> counts = countsPerCommit(tableBasePath, sqlContext);
-      assertEquals(1900, counts.stream().mapToLong(entry -> 
entry.getLong(1)).sum());
-      TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs);
-      // currently there should be 2 deltacommits now
-      TestHelpers.assertAtleastNDeltaCommits(2, tableBasePath, fs);
-    } finally {
-      // clean up
-      ds3.shutdownGracefully();
-    }
+    new HoodieDeltaStreamer(cfg, jsc).sync();
+    // out of 1000 new records, 500 are inserts, 450 are updates and 50 are 
deletes.
+    // total records should be 1900 now
+    assertRecordCount(1900, tableBasePath, sqlContext);
+    TestHelpers.assertCommitMetadata("00002", tableBasePath, fs, 3);
+    counts = countsPerCommit(tableBasePath, sqlContext);
+    assertEquals(1900, counts.stream().mapToLong(entry -> 
entry.getLong(1)).sum());
+    TestHelpers.assertAtLeastNCommits(1, tableBasePath, fs);
+    // currently there should be 2 deltacommits now
+    TestHelpers.assertAtleastNDeltaCommits(2, tableBasePath, fs);
 
+    // clean up
     UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
   }
 
@@ -2984,21 +2746,13 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
         transformerClassNames, PROPS_FILENAME_TEST_PARQUET, false,
         useSchemaProvider, 100000, false, null, null, "timestamp", null);
     HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(config, jsc);
-    try {
-      deltaStreamer.sync();
-      assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
-    } finally {
-      deltaStreamer.shutdownGracefully();
-    }
+    deltaStreamer.sync();
+    assertRecordCount(parquetRecordsCount, tableBasePath, sqlContext);
 
     prepareParquetDFSFiles(200, PARQUET_SOURCE_ROOT, "2.parquet", false, null, 
null);
-    try {
-      deltaStreamer.sync();
-      assertRecordCount(parquetRecordsCount + 200, tableBasePath, sqlContext);
-      testNum++;
-    } finally {
-      deltaStreamer.shutdownGracefully();
-    }
+    deltaStreamer.sync();
+    assertRecordCount(parquetRecordsCount + 200, tableBasePath, sqlContext);
+    testNum++;
   }
 
   @ParameterizedTest

Reply via email to