This is an automated email from the ASF dual-hosted git repository. vbalaji pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push: new cd7623e All Opened hoodie clients in tests needs to be closed TestMergeOnReadTable must use embedded timeline server cd7623e is described below commit cd7623e2160abd43ea44f48a9640ea8fa0bb6db3 Author: Balaji Varadarajan <varad...@uber.com> AuthorDate: Wed Jun 12 18:28:49 2019 -0700 All Opened hoodie clients in tests needs to be closed TestMergeOnReadTable must use embedded timeline server --- .../java/com/uber/hoodie/TestAsyncCompaction.java | 2 +- .../java/com/uber/hoodie/TestClientRollback.java | 6 +-- .../java/com/uber/hoodie/TestHoodieClientBase.java | 37 ++++++++++++----- .../TestHoodieClientOnCopyOnWriteStorage.java | 14 +++---- .../java/com/uber/hoodie/TestHoodieReadClient.java | 12 +++--- .../src/test/java/com/uber/hoodie/TestMultiFS.java | 23 ++++++++++- .../java/com/uber/hoodie/index/TestHbaseIndex.java | 22 ++++++++-- .../com/uber/hoodie/io/TestHoodieCompactor.java | 18 +++++++- .../com/uber/hoodie/io/TestHoodieMergeHandle.java | 18 +++++++- .../uber/hoodie/table/TestMergeOnReadTable.java | 48 ++++++++++++++-------- 10 files changed, 146 insertions(+), 54 deletions(-) diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestAsyncCompaction.java b/hoodie-client/src/test/java/com/uber/hoodie/TestAsyncCompaction.java index a86edc4..4fcc32a 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestAsyncCompaction.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestAsyncCompaction.java @@ -92,7 +92,7 @@ public class TestAsyncCompaction extends TestHoodieClientBase { public void testRollbackForInflightCompaction() throws Exception { // Rollback inflight compaction HoodieWriteConfig cfg = getConfig(false); - HoodieWriteClient client = new HoodieWriteClient(jsc, cfg, true); + HoodieWriteClient client = getHoodieWriteClient(cfg, true); String firstInstantTime = "001"; String secondInstantTime = "004"; diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestClientRollback.java b/hoodie-client/src/test/java/com/uber/hoodie/TestClientRollback.java index f08c343..8a6f18d 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestClientRollback.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestClientRollback.java @@ -204,7 +204,7 @@ public class TestClientRollback extends TestHoodieClientBase { HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).withIndexConfig( HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build(); - HoodieWriteClient client = new HoodieWriteClient(jsc, config, false); + HoodieWriteClient client = getHoodieWriteClient(config, false); // Rollback commit 1 (this should fail, since commit2 is still around) try { @@ -294,7 +294,7 @@ public class TestClientRollback extends TestHoodieClientBase { HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).withIndexConfig( HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build(); - new HoodieWriteClient(jsc, config, false); + getHoodieWriteClient(config, false); // Check results, nothing changed assertTrue(HoodieTestUtils.doesCommitExist(basePath, commitTime1)); @@ -311,7 +311,7 @@ public class TestClientRollback extends TestHoodieClientBase { && HoodieTestUtils.doesDataFileExist(basePath, "2016/05/06", commitTime1, file13)); // Turn auto rollback on - new HoodieWriteClient(jsc, config, true).startCommit(); + getHoodieWriteClient(config, true).startCommit(); assertTrue(HoodieTestUtils.doesCommitExist(basePath, commitTime1)); assertFalse(HoodieTestUtils.doesInflightExist(basePath, commitTime2)); assertFalse(HoodieTestUtils.doesInflightExist(basePath, commitTime3)); diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientBase.java b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientBase.java index 850f45f..a668d3b 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientBase.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientBase.java @@ -81,27 +81,43 @@ public class TestHoodieClientBase implements Serializable { protected transient HoodieTestDataGenerator dataGen = null; private HoodieWriteClient writeClient; + private HoodieReadClient readClient; - protected HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg) throws Exception { - closeClient(); - writeClient = new HoodieWriteClient(jsc, cfg); - return writeClient; + protected HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg) { + return getHoodieWriteClient(cfg, false); } - protected HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg, boolean rollbackInflightCommit) - throws Exception { - closeClient(); - writeClient = new HoodieWriteClient(jsc, cfg, rollbackInflightCommit); + protected HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg, boolean rollbackInflightCommit) { + return getHoodieWriteClient(cfg, rollbackInflightCommit, HoodieIndex.createIndex(cfg, jsc)); + } + + protected HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg, boolean rollbackInflightCommit, + HoodieIndex index) { + closeWriteClient(); + writeClient = new HoodieWriteClient(jsc, cfg, rollbackInflightCommit, index); return writeClient; } - private void closeClient() { + protected HoodieReadClient getHoodieReadClient(String basePath) { + closeReadClient(); + readClient = new HoodieReadClient(jsc, basePath); + return readClient; + } + + private void closeWriteClient() { if (null != writeClient) { writeClient.close(); writeClient = null; } } + private void closeReadClient() { + if (null != readClient) { + readClient.close(); + readClient = null; + } + } + @Before public void init() throws IOException { // Initialize a local spark env @@ -132,7 +148,8 @@ public class TestHoodieClientBase implements Serializable { * Properly release resources at end of each test */ public void tearDown() throws IOException { - closeClient(); + closeWriteClient(); + closeReadClient(); if (null != sqlContext) { logger.info("Clearing sql context cache of spark-session used in previous test-case"); diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java index 6bcfcd9..aa53d9f 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java @@ -215,8 +215,8 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase { assertNodupesWithinPartition(dedupedRecs); // Perform write-action and check - HoodieWriteClient client = new HoodieWriteClient(jsc, - getConfigBuilder().combineInput(true, true).build()); + HoodieWriteClient client = getHoodieWriteClient( + getConfigBuilder().combineInput(true, true).build(), false); client.startCommitWithTime(newCommitTime); List<WriteStatus> statuses = writeFn.apply(client, records, newCommitTime).collect(); assertNoWriteErrors(statuses); @@ -236,7 +236,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase { private HoodieWriteClient getWriteClientWithDummyIndex(final boolean isGlobal) throws Exception { HoodieIndex index = mock(HoodieIndex.class); when(index.isGlobal()).thenReturn(isGlobal); - return new HoodieWriteClient(jsc, getConfigBuilder().build(), false, index); + return getHoodieWriteClient(getConfigBuilder().build(), false, index); } /** @@ -267,7 +267,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase { private void testUpsertsInternal(HoodieWriteConfig hoodieWriteConfig, Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn, boolean isPrepped) throws Exception { - HoodieWriteClient client = new HoodieWriteClient(jsc, hoodieWriteConfig); + HoodieWriteClient client = getHoodieWriteClient(hoodieWriteConfig, false); //Write 1 (only inserts) String newCommitTime = "001"; @@ -291,7 +291,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase { */ @Test public void testDeletes() throws Exception { - HoodieWriteClient client = new HoodieWriteClient(jsc, getConfig()); + HoodieWriteClient client = getHoodieWriteClient(getConfig(), false); /** * Write 1 (inserts and deletes) @@ -347,7 +347,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase { HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit); // hold upto 200 records max dataGen = new HoodieTestDataGenerator(new String[]{testPartitionPath}); - HoodieWriteClient client = new HoodieWriteClient(jsc, config); + HoodieWriteClient client = getHoodieWriteClient(config, false); // Inserts => will write file1 String commitTime1 = "001"; @@ -457,7 +457,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase { // setup the small file handling params HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit); // hold upto 200 records max dataGen = new HoodieTestDataGenerator(new String[]{testPartitionPath}); - HoodieWriteClient client = new HoodieWriteClient(jsc, config); + HoodieWriteClient client = getHoodieWriteClient(config, false); // Inserts => will write file1 String commitTime1 = "001"; diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieReadClient.java b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieReadClient.java index 6608d81..2101577 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieReadClient.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieReadClient.java @@ -89,12 +89,12 @@ public class TestHoodieReadClient extends TestHoodieClientBase { */ private void testReadFilterExist(HoodieWriteConfig config, Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn) throws Exception { - HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config); + HoodieWriteClient writeClient = getHoodieWriteClient(config); String newCommitTime = writeClient.startCommit(); List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100); JavaRDD<HoodieRecord> recordsRDD = jsc.parallelize(records, 1); - HoodieReadClient readClient = new HoodieReadClient(jsc, config.getBasePath()); + HoodieReadClient readClient = getHoodieReadClient(config.getBasePath()); JavaRDD<HoodieRecord> filteredRDD = readClient.filterExists(recordsRDD); // Should not find any files @@ -106,7 +106,7 @@ public class TestHoodieReadClient extends TestHoodieClientBase { // Verify there are no errors assertNoWriteErrors(statuses); - readClient = new HoodieReadClient(jsc, config.getBasePath()); + readClient = getHoodieReadClient(config.getBasePath()); filteredRDD = readClient.filterExists(recordsRDD); List<HoodieRecord> result = filteredRDD.collect(); // Check results @@ -166,7 +166,7 @@ public class TestHoodieReadClient extends TestHoodieClientBase { Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> updateFn, boolean isPrepped) throws Exception { - HoodieWriteClient client = new HoodieWriteClient(jsc, hoodieWriteConfig); + HoodieWriteClient client = getHoodieWriteClient(hoodieWriteConfig); //Write 1 (only inserts) String newCommitTime = "001"; String initCommitTime = "000"; @@ -182,7 +182,7 @@ public class TestHoodieReadClient extends TestHoodieClientBase { .map(record -> new HoodieRecord(record.getKey(), null)) .collect(Collectors.toList())); // Should have 100 records in table (check using Index), all in locations marked at commit - HoodieReadClient readClient = new HoodieReadClient(jsc, hoodieWriteConfig.getBasePath()); + HoodieReadClient readClient = getHoodieReadClient(hoodieWriteConfig.getBasePath()); List<HoodieRecord> taggedRecords = readClient.tagLocation(recordRDD).collect(); checkTaggedRecords(taggedRecords, newCommitTime); @@ -201,7 +201,7 @@ public class TestHoodieReadClient extends TestHoodieClientBase { .map(record -> new HoodieRecord(record.getKey(), null)) .collect(Collectors.toList())); // Index should be able to locate all updates in correct locations. - readClient = new HoodieReadClient(jsc, hoodieWriteConfig.getBasePath()); + readClient = getHoodieReadClient(hoodieWriteConfig.getBasePath()); taggedRecords = readClient.tagLocation(recordRDD).collect(); checkTaggedRecords(taggedRecords, newCommitTime); } diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestMultiFS.java b/hoodie-client/src/test/java/com/uber/hoodie/TestMultiFS.java index 2ae6c93..881481b 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestMultiFS.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestMultiFS.java @@ -48,6 +48,7 @@ import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; +import org.junit.After; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -63,6 +64,7 @@ public class TestMultiFS implements Serializable { private static SQLContext sqlContext; private String tablePath = "file:///tmp/hoodie/sample-table"; protected String tableName = "hoodie_rt"; + private HoodieWriteClient hdfsWriteClient; private String tableType = HoodieTableType.COPY_ON_WRITE.name(); @BeforeClass @@ -83,6 +85,22 @@ public class TestMultiFS implements Serializable { sqlContext = new SQLContext(jsc); } + private HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig config) throws Exception { + if (null != hdfsWriteClient) { + hdfsWriteClient.close(); + } + hdfsWriteClient = new HoodieWriteClient(jsc, config); + return hdfsWriteClient; + } + + @After + public void teardown() { + if (null != hdfsWriteClient) { + hdfsWriteClient.close(); + hdfsWriteClient = null; + } + } + @AfterClass public static void cleanupClass() throws Exception { if (jsc != null) { @@ -98,6 +116,7 @@ public class TestMultiFS implements Serializable { FileSystem.closeAll(); } + protected HoodieWriteConfig getHoodieWriteConfig(String basePath) { return HoodieWriteConfig.newBuilder().withPath(basePath).withEmbeddedTimelineServerEnabled(true) .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) @@ -118,7 +137,7 @@ public class TestMultiFS implements Serializable { //Create write client to write some records in HoodieWriteConfig cfg = getHoodieWriteConfig(dfsBasePath); - HoodieWriteClient hdfsWriteClient = new HoodieWriteClient(jsc, cfg); + HoodieWriteClient hdfsWriteClient = getHoodieWriteClient(cfg); // Write generated data to hdfs (only inserts) String readCommitTime = hdfsWriteClient.startCommit(); @@ -139,7 +158,7 @@ public class TestMultiFS implements Serializable { .initTableType(jsc.hadoopConfiguration(), tablePath, HoodieTableType.valueOf(tableType), tableName, HoodieAvroPayload.class.getName()); HoodieWriteConfig localConfig = getHoodieWriteConfig(tablePath); - HoodieWriteClient localWriteClient = new HoodieWriteClient(jsc, localConfig); + HoodieWriteClient localWriteClient = getHoodieWriteClient(localConfig); String writeCommitTime = localWriteClient.startCommit(); logger.info("Starting write commit " + writeCommitTime); diff --git a/hoodie-client/src/test/java/com/uber/hoodie/index/TestHbaseIndex.java b/hoodie-client/src/test/java/com/uber/hoodie/index/TestHbaseIndex.java index 24bb4fa..f2c5d5c 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/index/TestHbaseIndex.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/index/TestHbaseIndex.java @@ -84,6 +84,7 @@ public class TestHbaseIndex { private static String tableName = "test_table"; private String basePath = null; private transient FileSystem fs; + private HoodieWriteClient writeClient; public TestHbaseIndex() throws Exception { } @@ -113,6 +114,11 @@ public class TestHbaseIndex { @After public void clear() throws Exception { + if (null != writeClient) { + writeClient.close(); + writeClient = null; + } + if (basePath != null) { new File(basePath).delete(); } @@ -128,6 +134,14 @@ public class TestHbaseIndex { HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath); } + private HoodieWriteClient getWriteClient(HoodieWriteConfig config) throws Exception { + if (null != writeClient) { + writeClient.close(); + } + writeClient = new HoodieWriteClient(jsc, config); + return writeClient; + } + @Test public void testSimpleTagLocationAndUpdate() throws Exception { @@ -139,7 +153,7 @@ public class TestHbaseIndex { // Load to memory HoodieWriteConfig config = getConfig(); HBaseIndex index = new HBaseIndex(config); - HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config); + HoodieWriteClient writeClient = getWriteClient(config); writeClient.startCommit(); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc); @@ -178,7 +192,7 @@ public class TestHbaseIndex { // Load to memory HoodieWriteConfig config = getConfig(); HBaseIndex index = new HBaseIndex(config); - HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config); + HoodieWriteClient writeClient = getWriteClient(config); String newCommitTime = writeClient.startCommit(); List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200); @@ -231,7 +245,7 @@ public class TestHbaseIndex { // only for test, set the hbaseConnection to mocked object index.setHbaseConnection(hbaseConnection); - HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config); + HoodieWriteClient writeClient = getWriteClient(config); // start a commit and generate test data String newCommitTime = writeClient.startCommit(); @@ -258,7 +272,7 @@ public class TestHbaseIndex { HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(); HoodieWriteConfig config = getConfig(); HBaseIndex index = new HBaseIndex(config); - HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config); + HoodieWriteClient writeClient = getWriteClient(config); // start a commit and generate test data String newCommitTime = writeClient.startCommit(); diff --git a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java index 40834b7..fade5d8 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCompactor.java @@ -64,6 +64,7 @@ public class TestHoodieCompactor { private transient HoodieTestDataGenerator dataGen = null; private transient FileSystem fs; private Configuration hadoopConf; + private HoodieWriteClient writeClient; @Before public void init() throws IOException { @@ -84,6 +85,11 @@ public class TestHoodieCompactor { @After public void clean() { + if (null != writeClient) { + writeClient.close(); + writeClient = null; + } + if (basePath != null) { new File(basePath).delete(); } @@ -92,6 +98,14 @@ public class TestHoodieCompactor { } } + private HoodieWriteClient getWriteClient(HoodieWriteConfig config) throws Exception { + if (null != writeClient) { + writeClient.close(); + } + writeClient = new HoodieWriteClient(jsc, config); + return writeClient; + } + private HoodieWriteConfig getConfig() { return getConfigBuilder() .withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).build()) @@ -123,7 +137,7 @@ public class TestHoodieCompactor { HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); HoodieWriteConfig config = getConfig(); HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); - HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config); + HoodieWriteClient writeClient = getWriteClient(config); String newCommitTime = writeClient.startCommit(); List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100); @@ -140,7 +154,7 @@ public class TestHoodieCompactor { public void testWriteStatusContentsAfterCompaction() throws Exception { // insert 100 records HoodieWriteConfig config = getConfig(); - HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config); + HoodieWriteClient writeClient = getWriteClient(config); String newCommitTime = "100"; writeClient.startCommitWithTime(newCommitTime); diff --git a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieMergeHandle.java b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieMergeHandle.java index 45ed0ae..44fa0e2 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieMergeHandle.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieMergeHandle.java @@ -63,6 +63,7 @@ public class TestHoodieMergeHandle { protected transient FileSystem fs; protected String basePath = null; protected transient HoodieTestDataGenerator dataGen = null; + private HoodieWriteClient writeClient; @Before public void init() throws IOException { @@ -83,6 +84,11 @@ public class TestHoodieMergeHandle { @After public void clean() { + if (null != writeClient) { + writeClient.close(); + writeClient = null; + } + if (basePath != null) { new File(basePath).delete(); } @@ -91,6 +97,14 @@ public class TestHoodieMergeHandle { } } + private HoodieWriteClient getWriteClient(HoodieWriteConfig config) throws Exception { + if (null != writeClient) { + writeClient.close(); + } + writeClient = new HoodieWriteClient(jsc, config); + return writeClient; + } + @Test public void testUpsertsForMultipleRecordsInSameFile() throws Exception { // Create records in a single partition @@ -99,7 +113,7 @@ public class TestHoodieMergeHandle { // Build a write config with bulkinsertparallelism set HoodieWriteConfig cfg = getConfigBuilder().build(); - HoodieWriteClient client = new HoodieWriteClient(jsc, cfg); + HoodieWriteClient client = getWriteClient(cfg); FileSystem fs = FSUtils.getFs(basePath, jsc.hadoopConfiguration()); /** @@ -250,7 +264,7 @@ public class TestHoodieMergeHandle { public void testHoodieMergeHandleWriteStatMetrics() throws Exception { // insert 100 records HoodieWriteConfig config = getConfigBuilder().build(); - HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config); + HoodieWriteClient writeClient = getWriteClient(config); String newCommitTime = "100"; writeClient.startCommitWithTime(newCommitTime); diff --git a/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java b/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java index 1c95e9f..5f22de0 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java @@ -92,6 +92,7 @@ public class TestMergeOnReadTable { private static HdfsTestService hdfsTestService; private transient JavaSparkContext jsc = null; private transient SQLContext sqlContext; + private HoodieWriteClient writeClient; @AfterClass public static void cleanUp() throws Exception { @@ -139,6 +140,11 @@ public class TestMergeOnReadTable { @After public void clean() { + if (null != writeClient) { + writeClient.close(); + writeClient = null; + } + if (basePath != null) { new File(basePath).delete(); } @@ -147,10 +153,18 @@ public class TestMergeOnReadTable { } } + private HoodieWriteClient getWriteClient(HoodieWriteConfig config) throws Exception { + if (null != writeClient) { + writeClient.close(); + } + writeClient = new HoodieWriteClient(jsc, config); + return writeClient; + } + @Test public void testSimpleInsertAndUpdate() throws Exception { HoodieWriteConfig cfg = getConfig(true); - HoodieWriteClient client = new HoodieWriteClient(jsc, cfg); + HoodieWriteClient client = getWriteClient(cfg); /** * Write 1 (only inserts) @@ -234,7 +248,7 @@ public class TestMergeOnReadTable { @Test public void testMetadataAggregateFromWriteStatus() throws Exception { HoodieWriteConfig cfg = getConfigBuilder(false).withWriteStatusClass(MetadataMergeWriteStatus.class).build(); - HoodieWriteClient client = new HoodieWriteClient(jsc, cfg); + HoodieWriteClient client = getWriteClient(cfg); String newCommitTime = "001"; HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(); @@ -257,7 +271,7 @@ public class TestMergeOnReadTable { @Test public void testSimpleInsertUpdateAndDelete() throws Exception { HoodieWriteConfig cfg = getConfig(true); - HoodieWriteClient client = new HoodieWriteClient(jsc, cfg); + HoodieWriteClient client = getWriteClient(cfg); /** * Write 1 (only inserts, written as parquet file) @@ -342,7 +356,7 @@ public class TestMergeOnReadTable { HoodieTestUtils.initTableType(jsc.hadoopConfiguration(), basePath, HoodieTableType.COPY_ON_WRITE); HoodieWriteConfig cfg = getConfig(true); - HoodieWriteClient client = new HoodieWriteClient(jsc, cfg); + HoodieWriteClient client = getWriteClient(cfg); /** * Write 1 (only inserts) @@ -401,7 +415,7 @@ public class TestMergeOnReadTable { public void testRollbackWithDeltaAndCompactionCommit() throws Exception { HoodieWriteConfig cfg = getConfig(false); - HoodieWriteClient client = new HoodieWriteClient(jsc, cfg); + HoodieWriteClient client = getWriteClient(cfg); // Test delta commit rollback /** @@ -445,7 +459,7 @@ public class TestMergeOnReadTable { */ final String commitTime1 = "002"; // WriteClient with custom config (disable small file handling) - client = new HoodieWriteClient(jsc, getHoodieWriteConfigWithSmallFileHandlingOff()); + client = getWriteClient(getHoodieWriteConfigWithSmallFileHandlingOff()); client.startCommitWithTime(commitTime1); List<HoodieRecord> copyOfRecords = new ArrayList<>(records); @@ -475,7 +489,7 @@ public class TestMergeOnReadTable { * Write 3 (inserts + updates - testing successful delta commit) */ final String commitTime2 = "002"; - client = new HoodieWriteClient(jsc, cfg); + client = getWriteClient(cfg); client.startCommitWithTime(commitTime2); copyOfRecords = new ArrayList<>(records); @@ -566,7 +580,7 @@ public class TestMergeOnReadTable { public void testMultiRollbackWithDeltaAndCompactionCommit() throws Exception { HoodieWriteConfig cfg = getConfig(false); - final HoodieWriteClient client = new HoodieWriteClient(jsc, cfg); + final HoodieWriteClient client = getWriteClient(cfg); List<String> allCommits = new ArrayList<>(); /** * Write 1 (only inserts) @@ -611,7 +625,7 @@ public class TestMergeOnReadTable { newCommitTime = "002"; allCommits.add(newCommitTime); // WriteClient with custom config (disable small file handling) - HoodieWriteClient nClient = new HoodieWriteClient(jsc, getHoodieWriteConfigWithSmallFileHandlingOff()); + HoodieWriteClient nClient = getWriteClient(getHoodieWriteConfigWithSmallFileHandlingOff()); nClient.startCommitWithTime(newCommitTime); List<HoodieRecord> copyOfRecords = new ArrayList<>(records); @@ -741,7 +755,7 @@ public class TestMergeOnReadTable { @Test public void testUpsertPartitioner() throws Exception { HoodieWriteConfig cfg = getConfig(true); - HoodieWriteClient client = new HoodieWriteClient(jsc, cfg); + HoodieWriteClient client = getWriteClient(cfg); /** * Write 1 (only inserts, written as parquet file) @@ -822,7 +836,7 @@ public class TestMergeOnReadTable { public void testLogFileCountsAfterCompaction() throws Exception { // insert 100 records HoodieWriteConfig config = getConfig(true); - HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config); + HoodieWriteClient writeClient = getWriteClient(config); HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(); String newCommitTime = "100"; writeClient.startCommitWithTime(newCommitTime); @@ -897,7 +911,7 @@ public class TestMergeOnReadTable { public void testMetadataValuesAfterInsertUpsertAndCompaction() throws Exception { // insert 100 records HoodieWriteConfig config = getConfig(false); - HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config); + HoodieWriteClient writeClient = getWriteClient(config); HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(); String newCommitTime = "100"; writeClient.startCommitWithTime(newCommitTime); @@ -940,7 +954,7 @@ public class TestMergeOnReadTable { // insert 100 records // Setting IndexType to be InMemory to simulate Global Index nature HoodieWriteConfig config = getConfigBuilder(false, IndexType.INMEMORY).build(); - HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config); + HoodieWriteClient writeClient = getWriteClient(config); HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(); String newCommitTime = "100"; writeClient.startCommitWithTime(newCommitTime); @@ -979,7 +993,7 @@ public class TestMergeOnReadTable { // insert 100 records // Setting IndexType to be InMemory to simulate Global Index nature HoodieWriteConfig config = getConfigBuilder(false, IndexType.INMEMORY).build(); - HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config); + HoodieWriteClient writeClient = getWriteClient(config); HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(); String newCommitTime = "100"; writeClient.startCommitWithTime(newCommitTime); @@ -1033,7 +1047,7 @@ public class TestMergeOnReadTable { // insert 100 records // Setting IndexType to be InMemory to simulate Global Index nature HoodieWriteConfig config = getConfigBuilder(false, IndexType.INMEMORY).build(); - HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config); + HoodieWriteClient writeClient = getWriteClient(config); HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(); String newCommitTime = "100"; writeClient.startCommitWithTime(newCommitTime); @@ -1088,7 +1102,7 @@ public class TestMergeOnReadTable { public void testRollingStatsInMetadata() throws Exception { HoodieWriteConfig cfg = getConfigBuilder(false, IndexType.INMEMORY).withAutoCommit(false).build(); - HoodieWriteClient client = new HoodieWriteClient(jsc, cfg); + HoodieWriteClient client = getWriteClient(cfg); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); HoodieTable table = HoodieTable.getHoodieTable(metaClient, cfg, jsc); @@ -1178,7 +1192,7 @@ public class TestMergeOnReadTable { public void testRollingStatsWithSmallFileHandling() throws Exception { HoodieWriteConfig cfg = getConfigBuilder(false, IndexType.INMEMORY).withAutoCommit(false).build(); - HoodieWriteClient client = new HoodieWriteClient(jsc, cfg); + HoodieWriteClient client = getWriteClient(cfg); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); Map<String, Long> fileIdToInsertsMap = new HashMap<>(); Map<String, Long> fileIdToUpsertsMap = new HashMap<>();