This is an automated email from the ASF dual-hosted git repository.

vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new cd7623e  All Opened hoodie clients in tests needs to be closed 
TestMergeOnReadTable must use embedded timeline server
cd7623e is described below

commit cd7623e2160abd43ea44f48a9640ea8fa0bb6db3
Author: Balaji Varadarajan <varad...@uber.com>
AuthorDate: Wed Jun 12 18:28:49 2019 -0700

    All Opened hoodie clients in tests needs to be closed
    TestMergeOnReadTable must use embedded timeline server
---
 .../java/com/uber/hoodie/TestAsyncCompaction.java  |  2 +-
 .../java/com/uber/hoodie/TestClientRollback.java   |  6 +--
 .../java/com/uber/hoodie/TestHoodieClientBase.java | 37 ++++++++++++-----
 .../TestHoodieClientOnCopyOnWriteStorage.java      | 14 +++----
 .../java/com/uber/hoodie/TestHoodieReadClient.java | 12 +++---
 .../src/test/java/com/uber/hoodie/TestMultiFS.java | 23 ++++++++++-
 .../java/com/uber/hoodie/index/TestHbaseIndex.java | 22 ++++++++--
 .../com/uber/hoodie/io/TestHoodieCompactor.java    | 18 +++++++-
 .../com/uber/hoodie/io/TestHoodieMergeHandle.java  | 18 +++++++-
 .../uber/hoodie/table/TestMergeOnReadTable.java    | 48 ++++++++++++++--------
 10 files changed, 146 insertions(+), 54 deletions(-)

diff --git 
a/hoodie-client/src/test/java/com/uber/hoodie/TestAsyncCompaction.java 
b/hoodie-client/src/test/java/com/uber/hoodie/TestAsyncCompaction.java
index a86edc4..4fcc32a 100644
--- a/hoodie-client/src/test/java/com/uber/hoodie/TestAsyncCompaction.java
+++ b/hoodie-client/src/test/java/com/uber/hoodie/TestAsyncCompaction.java
@@ -92,7 +92,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase 
{
   public void testRollbackForInflightCompaction() throws Exception {
     // Rollback inflight compaction
     HoodieWriteConfig cfg = getConfig(false);
-    HoodieWriteClient client = new HoodieWriteClient(jsc, cfg, true);
+    HoodieWriteClient client = getHoodieWriteClient(cfg, true);
 
     String firstInstantTime = "001";
     String secondInstantTime = "004";
diff --git 
a/hoodie-client/src/test/java/com/uber/hoodie/TestClientRollback.java 
b/hoodie-client/src/test/java/com/uber/hoodie/TestClientRollback.java
index f08c343..8a6f18d 100644
--- a/hoodie-client/src/test/java/com/uber/hoodie/TestClientRollback.java
+++ b/hoodie-client/src/test/java/com/uber/hoodie/TestClientRollback.java
@@ -204,7 +204,7 @@ public class TestClientRollback extends 
TestHoodieClientBase {
     HoodieWriteConfig config = 
HoodieWriteConfig.newBuilder().withPath(basePath).withIndexConfig(
         
HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build();
 
-    HoodieWriteClient client = new HoodieWriteClient(jsc, config, false);
+    HoodieWriteClient client = getHoodieWriteClient(config, false);
 
     // Rollback commit 1 (this should fail, since commit2 is still around)
     try {
@@ -294,7 +294,7 @@ public class TestClientRollback extends 
TestHoodieClientBase {
     HoodieWriteConfig config = 
HoodieWriteConfig.newBuilder().withPath(basePath).withIndexConfig(
         
HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build();
 
-    new HoodieWriteClient(jsc, config, false);
+    getHoodieWriteClient(config, false);
 
     // Check results, nothing changed
     assertTrue(HoodieTestUtils.doesCommitExist(basePath, commitTime1));
@@ -311,7 +311,7 @@ public class TestClientRollback extends 
TestHoodieClientBase {
         && HoodieTestUtils.doesDataFileExist(basePath, "2016/05/06", 
commitTime1, file13));
 
     // Turn auto rollback on
-    new HoodieWriteClient(jsc, config, true).startCommit();
+    getHoodieWriteClient(config, true).startCommit();
     assertTrue(HoodieTestUtils.doesCommitExist(basePath, commitTime1));
     assertFalse(HoodieTestUtils.doesInflightExist(basePath, commitTime2));
     assertFalse(HoodieTestUtils.doesInflightExist(basePath, commitTime3));
diff --git 
a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientBase.java 
b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientBase.java
index 850f45f..a668d3b 100644
--- a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientBase.java
+++ b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientBase.java
@@ -81,27 +81,43 @@ public class TestHoodieClientBase implements Serializable {
   protected transient HoodieTestDataGenerator dataGen = null;
 
   private HoodieWriteClient writeClient;
+  private HoodieReadClient readClient;
 
-  protected HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg) 
throws Exception {
-    closeClient();
-    writeClient = new HoodieWriteClient(jsc, cfg);
-    return writeClient;
+  protected HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg) {
+    return getHoodieWriteClient(cfg, false);
   }
 
-  protected HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg, 
boolean rollbackInflightCommit)
-      throws Exception {
-    closeClient();
-    writeClient = new HoodieWriteClient(jsc, cfg, rollbackInflightCommit);
+  protected HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg, 
boolean rollbackInflightCommit) {
+    return getHoodieWriteClient(cfg, rollbackInflightCommit, 
HoodieIndex.createIndex(cfg, jsc));
+  }
+
+  protected HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg, 
boolean rollbackInflightCommit,
+      HoodieIndex index) {
+    closeWriteClient();
+    writeClient = new HoodieWriteClient(jsc, cfg, rollbackInflightCommit, 
index);
     return writeClient;
   }
 
-  private void closeClient() {
+  protected HoodieReadClient getHoodieReadClient(String basePath) {
+    closeReadClient();
+    readClient = new HoodieReadClient(jsc, basePath);
+    return readClient;
+  }
+
+  private void closeWriteClient() {
     if (null != writeClient) {
       writeClient.close();
       writeClient = null;
     }
   }
 
+  private void closeReadClient() {
+    if (null != readClient) {
+      readClient.close();
+      readClient = null;
+    }
+  }
+
   @Before
   public void init() throws IOException {
     // Initialize a local spark env
@@ -132,7 +148,8 @@ public class TestHoodieClientBase implements Serializable {
    * Properly release resources at end of each test
    */
   public void tearDown() throws IOException {
-    closeClient();
+    closeWriteClient();
+    closeReadClient();
 
     if (null != sqlContext) {
       logger.info("Clearing sql context cache of spark-session used in 
previous test-case");
diff --git 
a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java
 
b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java
index 6bcfcd9..aa53d9f 100644
--- 
a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java
+++ 
b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java
@@ -215,8 +215,8 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
TestHoodieClientBase {
     assertNodupesWithinPartition(dedupedRecs);
 
     // Perform write-action and check
-    HoodieWriteClient client = new HoodieWriteClient(jsc,
-        getConfigBuilder().combineInput(true, true).build());
+    HoodieWriteClient client = getHoodieWriteClient(
+        getConfigBuilder().combineInput(true, true).build(), false);
     client.startCommitWithTime(newCommitTime);
     List<WriteStatus> statuses = writeFn.apply(client, records, 
newCommitTime).collect();
     assertNoWriteErrors(statuses);
@@ -236,7 +236,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
TestHoodieClientBase {
   private HoodieWriteClient getWriteClientWithDummyIndex(final boolean 
isGlobal) throws Exception {
     HoodieIndex index = mock(HoodieIndex.class);
     when(index.isGlobal()).thenReturn(isGlobal);
-    return new HoodieWriteClient(jsc, getConfigBuilder().build(), false, 
index);
+    return getHoodieWriteClient(getConfigBuilder().build(), false, index);
   }
 
   /**
@@ -267,7 +267,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
TestHoodieClientBase {
   private void testUpsertsInternal(HoodieWriteConfig hoodieWriteConfig,
       Function3<JavaRDD<WriteStatus>, HoodieWriteClient, 
JavaRDD<HoodieRecord>, String> writeFn,
       boolean isPrepped) throws Exception {
-    HoodieWriteClient client = new HoodieWriteClient(jsc, hoodieWriteConfig);
+    HoodieWriteClient client = getHoodieWriteClient(hoodieWriteConfig, false);
 
     //Write 1 (only inserts)
     String newCommitTime = "001";
@@ -291,7 +291,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
TestHoodieClientBase {
    */
   @Test
   public void testDeletes() throws Exception {
-    HoodieWriteClient client = new HoodieWriteClient(jsc, getConfig());
+    HoodieWriteClient client = getHoodieWriteClient(getConfig(), false);
 
     /**
      * Write 1 (inserts and deletes)
@@ -347,7 +347,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
TestHoodieClientBase {
     HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit); // 
hold upto 200 records max
     dataGen = new HoodieTestDataGenerator(new String[]{testPartitionPath});
 
-    HoodieWriteClient client = new HoodieWriteClient(jsc, config);
+    HoodieWriteClient client = getHoodieWriteClient(config, false);
 
     // Inserts => will write file1
     String commitTime1 = "001";
@@ -457,7 +457,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
TestHoodieClientBase {
     // setup the small file handling params
     HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit); // 
hold upto 200 records max
     dataGen = new HoodieTestDataGenerator(new String[]{testPartitionPath});
-    HoodieWriteClient client = new HoodieWriteClient(jsc, config);
+    HoodieWriteClient client = getHoodieWriteClient(config, false);
 
     // Inserts => will write file1
     String commitTime1 = "001";
diff --git 
a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieReadClient.java 
b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieReadClient.java
index 6608d81..2101577 100644
--- a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieReadClient.java
+++ b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieReadClient.java
@@ -89,12 +89,12 @@ public class TestHoodieReadClient extends 
TestHoodieClientBase {
    */
   private void testReadFilterExist(HoodieWriteConfig config,
       Function3<JavaRDD<WriteStatus>, HoodieWriteClient, 
JavaRDD<HoodieRecord>, String> writeFn) throws Exception {
-    HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config);
+    HoodieWriteClient writeClient = getHoodieWriteClient(config);
     String newCommitTime = writeClient.startCommit();
     List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
     JavaRDD<HoodieRecord> recordsRDD = jsc.parallelize(records, 1);
 
-    HoodieReadClient readClient = new HoodieReadClient(jsc, 
config.getBasePath());
+    HoodieReadClient readClient = getHoodieReadClient(config.getBasePath());
     JavaRDD<HoodieRecord> filteredRDD = readClient.filterExists(recordsRDD);
 
     // Should not find any files
@@ -106,7 +106,7 @@ public class TestHoodieReadClient extends 
TestHoodieClientBase {
     // Verify there are no errors
     assertNoWriteErrors(statuses);
 
-    readClient = new HoodieReadClient(jsc, config.getBasePath());
+    readClient = getHoodieReadClient(config.getBasePath());
     filteredRDD = readClient.filterExists(recordsRDD);
     List<HoodieRecord> result = filteredRDD.collect();
     // Check results
@@ -166,7 +166,7 @@ public class TestHoodieReadClient extends 
TestHoodieClientBase {
       Function3<JavaRDD<WriteStatus>, HoodieWriteClient, 
JavaRDD<HoodieRecord>, String> updateFn,
       boolean isPrepped)
       throws Exception {
-    HoodieWriteClient client = new HoodieWriteClient(jsc, hoodieWriteConfig);
+    HoodieWriteClient client = getHoodieWriteClient(hoodieWriteConfig);
     //Write 1 (only inserts)
     String newCommitTime = "001";
     String initCommitTime = "000";
@@ -182,7 +182,7 @@ public class TestHoodieReadClient extends 
TestHoodieClientBase {
                 .map(record -> new HoodieRecord(record.getKey(), null))
                 .collect(Collectors.toList()));
     // Should have 100 records in table (check using Index), all in locations 
marked at commit
-    HoodieReadClient readClient = new HoodieReadClient(jsc, 
hoodieWriteConfig.getBasePath());
+    HoodieReadClient readClient = 
getHoodieReadClient(hoodieWriteConfig.getBasePath());
     List<HoodieRecord> taggedRecords = 
readClient.tagLocation(recordRDD).collect();
     checkTaggedRecords(taggedRecords, newCommitTime);
 
@@ -201,7 +201,7 @@ public class TestHoodieReadClient extends 
TestHoodieClientBase {
                 .map(record -> new HoodieRecord(record.getKey(), null))
                 .collect(Collectors.toList()));
     // Index should be able to locate all updates in correct locations.
-    readClient = new HoodieReadClient(jsc, hoodieWriteConfig.getBasePath());
+    readClient = getHoodieReadClient(hoodieWriteConfig.getBasePath());
     taggedRecords = readClient.tagLocation(recordRDD).collect();
     checkTaggedRecords(taggedRecords, newCommitTime);
   }
diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestMultiFS.java 
b/hoodie-client/src/test/java/com/uber/hoodie/TestMultiFS.java
index 2ae6c93..881481b 100644
--- a/hoodie-client/src/test/java/com/uber/hoodie/TestMultiFS.java
+++ b/hoodie-client/src/test/java/com/uber/hoodie/TestMultiFS.java
@@ -48,6 +48,7 @@ import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SQLContext;
+import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -63,6 +64,7 @@ public class TestMultiFS implements Serializable {
   private static SQLContext sqlContext;
   private String tablePath = "file:///tmp/hoodie/sample-table";
   protected String tableName = "hoodie_rt";
+  private HoodieWriteClient hdfsWriteClient;
   private String tableType = HoodieTableType.COPY_ON_WRITE.name();
 
   @BeforeClass
@@ -83,6 +85,22 @@ public class TestMultiFS implements Serializable {
     sqlContext = new SQLContext(jsc);
   }
 
+  private HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig config) 
throws Exception {
+    if (null != hdfsWriteClient) {
+      hdfsWriteClient.close();
+    }
+    hdfsWriteClient = new HoodieWriteClient(jsc, config);
+    return hdfsWriteClient;
+  }
+
+  @After
+  public void teardown() {
+    if (null != hdfsWriteClient) {
+      hdfsWriteClient.close();
+      hdfsWriteClient = null;
+    }
+  }
+
   @AfterClass
   public static void cleanupClass() throws Exception {
     if (jsc != null) {
@@ -98,6 +116,7 @@ public class TestMultiFS implements Serializable {
     FileSystem.closeAll();
   }
 
+
   protected HoodieWriteConfig getHoodieWriteConfig(String basePath) {
     return 
HoodieWriteConfig.newBuilder().withPath(basePath).withEmbeddedTimelineServerEnabled(true)
         
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
@@ -118,7 +137,7 @@ public class TestMultiFS implements Serializable {
 
     //Create write client to write some records in
     HoodieWriteConfig cfg = getHoodieWriteConfig(dfsBasePath);
-    HoodieWriteClient hdfsWriteClient = new HoodieWriteClient(jsc, cfg);
+    HoodieWriteClient hdfsWriteClient = getHoodieWriteClient(cfg);
 
     // Write generated data to hdfs (only inserts)
     String readCommitTime = hdfsWriteClient.startCommit();
@@ -139,7 +158,7 @@ public class TestMultiFS implements Serializable {
         .initTableType(jsc.hadoopConfiguration(), tablePath, 
HoodieTableType.valueOf(tableType), tableName,
             HoodieAvroPayload.class.getName());
     HoodieWriteConfig localConfig = getHoodieWriteConfig(tablePath);
-    HoodieWriteClient localWriteClient = new HoodieWriteClient(jsc, 
localConfig);
+    HoodieWriteClient localWriteClient = getHoodieWriteClient(localConfig);
 
     String writeCommitTime = localWriteClient.startCommit();
     logger.info("Starting write commit " + writeCommitTime);
diff --git 
a/hoodie-client/src/test/java/com/uber/hoodie/index/TestHbaseIndex.java 
b/hoodie-client/src/test/java/com/uber/hoodie/index/TestHbaseIndex.java
index 24bb4fa..f2c5d5c 100644
--- a/hoodie-client/src/test/java/com/uber/hoodie/index/TestHbaseIndex.java
+++ b/hoodie-client/src/test/java/com/uber/hoodie/index/TestHbaseIndex.java
@@ -84,6 +84,7 @@ public class TestHbaseIndex {
   private static String tableName = "test_table";
   private String basePath = null;
   private transient FileSystem fs;
+  private HoodieWriteClient writeClient;
 
   public TestHbaseIndex() throws Exception {
   }
@@ -113,6 +114,11 @@ public class TestHbaseIndex {
 
   @After
   public void clear() throws Exception {
+    if (null != writeClient) {
+      writeClient.close();
+      writeClient = null;
+    }
+
     if (basePath != null) {
       new File(basePath).delete();
     }
@@ -128,6 +134,14 @@ public class TestHbaseIndex {
     HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath);
   }
 
+  private HoodieWriteClient getWriteClient(HoodieWriteConfig config) throws 
Exception {
+    if (null != writeClient) {
+      writeClient.close();
+    }
+    writeClient = new HoodieWriteClient(jsc, config);
+    return writeClient;
+  }
+
   @Test
   public void testSimpleTagLocationAndUpdate() throws Exception {
 
@@ -139,7 +153,7 @@ public class TestHbaseIndex {
     // Load to memory
     HoodieWriteConfig config = getConfig();
     HBaseIndex index = new HBaseIndex(config);
-    HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config);
+    HoodieWriteClient writeClient = getWriteClient(config);
     writeClient.startCommit();
     HoodieTableMetaClient metaClient = new 
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
     HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, 
jsc);
@@ -178,7 +192,7 @@ public class TestHbaseIndex {
     // Load to memory
     HoodieWriteConfig config = getConfig();
     HBaseIndex index = new HBaseIndex(config);
-    HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config);
+    HoodieWriteClient writeClient = getWriteClient(config);
 
     String newCommitTime = writeClient.startCommit();
     List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
@@ -231,7 +245,7 @@ public class TestHbaseIndex {
     // only for test, set the hbaseConnection to mocked object
     index.setHbaseConnection(hbaseConnection);
 
-    HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config);
+    HoodieWriteClient writeClient = getWriteClient(config);
 
     // start a commit and generate test data
     String newCommitTime = writeClient.startCommit();
@@ -258,7 +272,7 @@ public class TestHbaseIndex {
     HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
     HoodieWriteConfig config = getConfig();
     HBaseIndex index = new HBaseIndex(config);
-    HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config);
+    HoodieWriteClient writeClient = getWriteClient(config);
 
     // start a commit and generate test data
     String newCommitTime = writeClient.startCommit();
diff --git 
a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java 
b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java
index 40834b7..fade5d8 100644
--- a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java
+++ b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java
@@ -64,6 +64,7 @@ public class TestHoodieCompactor {
   private transient HoodieTestDataGenerator dataGen = null;
   private transient FileSystem fs;
   private Configuration hadoopConf;
+  private HoodieWriteClient writeClient;
 
   @Before
   public void init() throws IOException {
@@ -84,6 +85,11 @@ public class TestHoodieCompactor {
 
   @After
   public void clean() {
+    if (null != writeClient) {
+      writeClient.close();
+      writeClient = null;
+    }
+
     if (basePath != null) {
       new File(basePath).delete();
     }
@@ -92,6 +98,14 @@ public class TestHoodieCompactor {
     }
   }
 
+  private HoodieWriteClient getWriteClient(HoodieWriteConfig config) throws 
Exception {
+    if (null != writeClient) {
+      writeClient.close();
+    }
+    writeClient = new HoodieWriteClient(jsc, config);
+    return writeClient;
+  }
+
   private HoodieWriteConfig getConfig() {
     return getConfigBuilder()
         
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).build())
@@ -123,7 +137,7 @@ public class TestHoodieCompactor {
     HoodieTableMetaClient metaClient = new 
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
     HoodieWriteConfig config = getConfig();
     HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
-    HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config);
+    HoodieWriteClient writeClient = getWriteClient(config);
 
     String newCommitTime = writeClient.startCommit();
     List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
@@ -140,7 +154,7 @@ public class TestHoodieCompactor {
   public void testWriteStatusContentsAfterCompaction() throws Exception {
     // insert 100 records
     HoodieWriteConfig config = getConfig();
-    HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config);
+    HoodieWriteClient writeClient = getWriteClient(config);
     String newCommitTime = "100";
     writeClient.startCommitWithTime(newCommitTime);
 
diff --git 
a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieMergeHandle.java 
b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieMergeHandle.java
index 45ed0ae..44fa0e2 100644
--- a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieMergeHandle.java
+++ b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieMergeHandle.java
@@ -63,6 +63,7 @@ public class TestHoodieMergeHandle {
   protected transient FileSystem fs;
   protected String basePath = null;
   protected transient HoodieTestDataGenerator dataGen = null;
+  private  HoodieWriteClient writeClient;
 
   @Before
   public void init() throws IOException {
@@ -83,6 +84,11 @@ public class TestHoodieMergeHandle {
 
   @After
   public void clean() {
+    if (null != writeClient) {
+      writeClient.close();
+      writeClient = null;
+    }
+
     if (basePath != null) {
       new File(basePath).delete();
     }
@@ -91,6 +97,14 @@ public class TestHoodieMergeHandle {
     }
   }
 
+  private HoodieWriteClient getWriteClient(HoodieWriteConfig config) throws 
Exception {
+    if (null != writeClient) {
+      writeClient.close();
+    }
+    writeClient = new HoodieWriteClient(jsc, config);
+    return writeClient;
+  }
+
   @Test
   public void testUpsertsForMultipleRecordsInSameFile() throws Exception {
     // Create records in a single partition
@@ -99,7 +113,7 @@ public class TestHoodieMergeHandle {
 
     // Build a write config with bulkinsertparallelism set
     HoodieWriteConfig cfg = getConfigBuilder().build();
-    HoodieWriteClient client = new HoodieWriteClient(jsc, cfg);
+    HoodieWriteClient client = getWriteClient(cfg);
     FileSystem fs = FSUtils.getFs(basePath, jsc.hadoopConfiguration());
 
     /**
@@ -250,7 +264,7 @@ public class TestHoodieMergeHandle {
   public void testHoodieMergeHandleWriteStatMetrics() throws Exception {
     // insert 100 records
     HoodieWriteConfig config = getConfigBuilder().build();
-    HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config);
+    HoodieWriteClient writeClient = getWriteClient(config);
     String newCommitTime = "100";
     writeClient.startCommitWithTime(newCommitTime);
 
diff --git 
a/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java 
b/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java
index 1c95e9f..5f22de0 100644
--- 
a/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java
+++ 
b/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java
@@ -92,6 +92,7 @@ public class TestMergeOnReadTable {
   private static HdfsTestService hdfsTestService;
   private transient JavaSparkContext jsc = null;
   private transient SQLContext sqlContext;
+  private HoodieWriteClient writeClient;
 
   @AfterClass
   public static void cleanUp() throws Exception {
@@ -139,6 +140,11 @@ public class TestMergeOnReadTable {
 
   @After
   public void clean() {
+    if (null != writeClient) {
+      writeClient.close();
+      writeClient = null;
+    }
+
     if (basePath != null) {
       new File(basePath).delete();
     }
@@ -147,10 +153,18 @@ public class TestMergeOnReadTable {
     }
   }
 
+  private HoodieWriteClient getWriteClient(HoodieWriteConfig config) throws 
Exception {
+    if (null != writeClient) {
+      writeClient.close();
+    }
+    writeClient = new HoodieWriteClient(jsc, config);
+    return writeClient;
+  }
+
   @Test
   public void testSimpleInsertAndUpdate() throws Exception {
     HoodieWriteConfig cfg = getConfig(true);
-    HoodieWriteClient client = new HoodieWriteClient(jsc, cfg);
+    HoodieWriteClient client = getWriteClient(cfg);
 
     /**
      * Write 1 (only inserts)
@@ -234,7 +248,7 @@ public class TestMergeOnReadTable {
   @Test
   public void testMetadataAggregateFromWriteStatus() throws Exception {
     HoodieWriteConfig cfg = 
getConfigBuilder(false).withWriteStatusClass(MetadataMergeWriteStatus.class).build();
-    HoodieWriteClient client = new HoodieWriteClient(jsc, cfg);
+    HoodieWriteClient client = getWriteClient(cfg);
 
     String newCommitTime = "001";
     HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
@@ -257,7 +271,7 @@ public class TestMergeOnReadTable {
   @Test
   public void testSimpleInsertUpdateAndDelete() throws Exception {
     HoodieWriteConfig cfg = getConfig(true);
-    HoodieWriteClient client = new HoodieWriteClient(jsc, cfg);
+    HoodieWriteClient client = getWriteClient(cfg);
 
     /**
      * Write 1 (only inserts, written as parquet file)
@@ -342,7 +356,7 @@ public class TestMergeOnReadTable {
     HoodieTestUtils.initTableType(jsc.hadoopConfiguration(), basePath, 
HoodieTableType.COPY_ON_WRITE);
 
     HoodieWriteConfig cfg = getConfig(true);
-    HoodieWriteClient client = new HoodieWriteClient(jsc, cfg);
+    HoodieWriteClient client = getWriteClient(cfg);
 
     /**
      * Write 1 (only inserts)
@@ -401,7 +415,7 @@ public class TestMergeOnReadTable {
   public void testRollbackWithDeltaAndCompactionCommit() throws Exception {
 
     HoodieWriteConfig cfg = getConfig(false);
-    HoodieWriteClient client = new HoodieWriteClient(jsc, cfg);
+    HoodieWriteClient client = getWriteClient(cfg);
 
     // Test delta commit rollback
     /**
@@ -445,7 +459,7 @@ public class TestMergeOnReadTable {
      */
     final String commitTime1 = "002";
     // WriteClient with custom config (disable small file handling)
-    client = new HoodieWriteClient(jsc, 
getHoodieWriteConfigWithSmallFileHandlingOff());
+    client = getWriteClient(getHoodieWriteConfigWithSmallFileHandlingOff());
     client.startCommitWithTime(commitTime1);
 
     List<HoodieRecord> copyOfRecords = new ArrayList<>(records);
@@ -475,7 +489,7 @@ public class TestMergeOnReadTable {
      * Write 3 (inserts + updates - testing successful delta commit)
      */
     final String commitTime2 = "002";
-    client = new HoodieWriteClient(jsc, cfg);
+    client = getWriteClient(cfg);
     client.startCommitWithTime(commitTime2);
 
     copyOfRecords = new ArrayList<>(records);
@@ -566,7 +580,7 @@ public class TestMergeOnReadTable {
   public void testMultiRollbackWithDeltaAndCompactionCommit() throws Exception 
{
 
     HoodieWriteConfig cfg = getConfig(false);
-    final HoodieWriteClient client = new HoodieWriteClient(jsc, cfg);
+    final HoodieWriteClient client = getWriteClient(cfg);
     List<String> allCommits = new ArrayList<>();
     /**
      * Write 1 (only inserts)
@@ -611,7 +625,7 @@ public class TestMergeOnReadTable {
     newCommitTime = "002";
     allCommits.add(newCommitTime);
     // WriteClient with custom config (disable small file handling)
-    HoodieWriteClient nClient = new HoodieWriteClient(jsc, 
getHoodieWriteConfigWithSmallFileHandlingOff());
+    HoodieWriteClient nClient = 
getWriteClient(getHoodieWriteConfigWithSmallFileHandlingOff());
     nClient.startCommitWithTime(newCommitTime);
 
     List<HoodieRecord> copyOfRecords = new ArrayList<>(records);
@@ -741,7 +755,7 @@ public class TestMergeOnReadTable {
   @Test
   public void testUpsertPartitioner() throws Exception {
     HoodieWriteConfig cfg = getConfig(true);
-    HoodieWriteClient client = new HoodieWriteClient(jsc, cfg);
+    HoodieWriteClient client = getWriteClient(cfg);
 
     /**
      * Write 1 (only inserts, written as parquet file)
@@ -822,7 +836,7 @@ public class TestMergeOnReadTable {
   public void testLogFileCountsAfterCompaction() throws Exception {
     // insert 100 records
     HoodieWriteConfig config = getConfig(true);
-    HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config);
+    HoodieWriteClient writeClient = getWriteClient(config);
     HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
     String newCommitTime = "100";
     writeClient.startCommitWithTime(newCommitTime);
@@ -897,7 +911,7 @@ public class TestMergeOnReadTable {
   public void testMetadataValuesAfterInsertUpsertAndCompaction() throws 
Exception {
     // insert 100 records
     HoodieWriteConfig config = getConfig(false);
-    HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config);
+    HoodieWriteClient writeClient = getWriteClient(config);
     HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
     String newCommitTime = "100";
     writeClient.startCommitWithTime(newCommitTime);
@@ -940,7 +954,7 @@ public class TestMergeOnReadTable {
     // insert 100 records
     // Setting IndexType to be InMemory to simulate Global Index nature
     HoodieWriteConfig config = getConfigBuilder(false, 
IndexType.INMEMORY).build();
-    HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config);
+    HoodieWriteClient writeClient = getWriteClient(config);
     HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
     String newCommitTime = "100";
     writeClient.startCommitWithTime(newCommitTime);
@@ -979,7 +993,7 @@ public class TestMergeOnReadTable {
     // insert 100 records
     // Setting IndexType to be InMemory to simulate Global Index nature
     HoodieWriteConfig config = getConfigBuilder(false, 
IndexType.INMEMORY).build();
-    HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config);
+    HoodieWriteClient writeClient = getWriteClient(config);
     HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
     String newCommitTime = "100";
     writeClient.startCommitWithTime(newCommitTime);
@@ -1033,7 +1047,7 @@ public class TestMergeOnReadTable {
     // insert 100 records
     // Setting IndexType to be InMemory to simulate Global Index nature
     HoodieWriteConfig config = getConfigBuilder(false, 
IndexType.INMEMORY).build();
-    HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config);
+    HoodieWriteClient writeClient = getWriteClient(config);
     HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
     String newCommitTime = "100";
     writeClient.startCommitWithTime(newCommitTime);
@@ -1088,7 +1102,7 @@ public class TestMergeOnReadTable {
   public void testRollingStatsInMetadata() throws Exception {
 
     HoodieWriteConfig cfg = getConfigBuilder(false, 
IndexType.INMEMORY).withAutoCommit(false).build();
-    HoodieWriteClient client = new HoodieWriteClient(jsc, cfg);
+    HoodieWriteClient client = getWriteClient(cfg);
     HoodieTableMetaClient metaClient = new 
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
     HoodieTable table = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
 
@@ -1178,7 +1192,7 @@ public class TestMergeOnReadTable {
   public void testRollingStatsWithSmallFileHandling() throws Exception {
 
     HoodieWriteConfig cfg = getConfigBuilder(false, 
IndexType.INMEMORY).withAutoCommit(false).build();
-    HoodieWriteClient client = new HoodieWriteClient(jsc, cfg);
+    HoodieWriteClient client = getWriteClient(cfg);
     HoodieTableMetaClient metaClient = new 
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
     Map<String, Long> fileIdToInsertsMap = new HashMap<>();
     Map<String, Long> fileIdToUpsertsMap = new HashMap<>();

Reply via email to