This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 01e803b [HUDI-247] Unify the re-initialization of
HoodieTableMetaClient in test for hoodie-client module (#930)
01e803b is described below
commit 01e803b00e5acf9bb1dbe38340e38e8feb8d037d
Author: vinoyang <[email protected]>
AuthorDate: Mon Sep 30 20:38:52 2019 +0800
[HUDI-247] Unify the re-initialization of HoodieTableMetaClient in test for
hoodie-client module (#930)
---
.../org/apache/hudi/HoodieClientTestHarness.java | 13 ++--
.../src/test/java/org/apache/hudi/TestCleaner.java | 76 ++++++++++------------
.../java/org/apache/hudi/TestClientRollback.java | 6 +-
.../hudi/index/TestHBaseQPSResourceAllocator.java | 4 +-
.../java/org/apache/hudi/index/TestHbaseIndex.java | 19 +++---
.../org/apache/hudi/index/TestHoodieIndex.java | 4 +-
.../hudi/index/bloom/TestHoodieBloomIndex.java | 34 +++++-----
.../index/bloom/TestHoodieGlobalBloomIndex.java | 14 ++--
.../apache/hudi/io/TestHoodieCommitArchiveLog.java | 21 +++---
.../org/apache/hudi/io/TestHoodieCompactor.java | 15 ++---
.../org/apache/hudi/io/TestHoodieMergeHandle.java | 12 ++--
.../apache/hudi/table/TestCopyOnWriteTable.java | 23 ++++---
.../apache/hudi/table/TestMergeOnReadTable.java | 32 ++++-----
.../hudi/common/table/HoodieTableMetaClient.java | 10 +++
14 files changed, 142 insertions(+), 141 deletions(-)
diff --git
a/hudi-client/src/test/java/org/apache/hudi/HoodieClientTestHarness.java
b/hudi-client/src/test/java/org/apache/hudi/HoodieClientTestHarness.java
index 84a9e21..10fb0bc 100644
--- a/hudi-client/src/test/java/org/apache/hudi/HoodieClientTestHarness.java
+++ b/hudi-client/src/test/java/org/apache/hudi/HoodieClientTestHarness.java
@@ -55,6 +55,7 @@ public abstract class HoodieClientTestHarness implements
Serializable {
protected TemporaryFolder folder = null;
protected transient HoodieTestDataGenerator dataGen = null;
protected transient ExecutorService executorService;
+ protected transient HoodieTableMetaClient metaClient;
//dfs
protected String dfsBasePath;
@@ -72,7 +73,7 @@ public abstract class HoodieClientTestHarness implements
Serializable {
initSparkContexts();
initTestDataGenerator();
initFileSystem();
- initTableType();
+ initMetaClient();
}
/**
@@ -80,7 +81,7 @@ public abstract class HoodieClientTestHarness implements
Serializable {
* @throws IOException
*/
public void cleanupResources() throws IOException {
- cleanupTableType();
+ cleanupMetaClient();
cleanupSparkContexts();
cleanupTestDataGenerator();
cleanupFileSystem();
@@ -191,7 +192,7 @@ public abstract class HoodieClientTestHarness implements
Serializable {
*
* @throws IOException
*/
- protected void initTableType() throws IOException {
+ protected void initMetaClient() throws IOException {
if (basePath == null) {
throw new IllegalStateException("The base path has not been
initialized.");
}
@@ -200,14 +201,14 @@ public abstract class HoodieClientTestHarness implements
Serializable {
throw new IllegalStateException("The Spark context has not been
initialized.");
}
- HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, getTableType());
+ metaClient = HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath,
getTableType());
}
/**
* Cleanups table type.
*/
- protected void cleanupTableType() {
-
+ protected void cleanupMetaClient() {
+ metaClient = null;
}
/**
diff --git a/hudi-client/src/test/java/org/apache/hudi/TestCleaner.java
b/hudi-client/src/test/java/org/apache/hudi/TestCleaner.java
index 1e8df4a..fe25491 100644
--- a/hudi-client/src/test/java/org/apache/hudi/TestCleaner.java
+++ b/hudi-client/src/test/java/org/apache/hudi/TestCleaner.java
@@ -114,7 +114,7 @@ public class TestCleaner extends TestHoodieClientBase {
assertNoWriteErrors(statuses);
// verify that there is a commit
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTimeline timeline = new
HoodieActiveTimeline(metaClient).getCommitTimeline();
assertEquals("Expecting a single commit.", 1,
timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants());
// Should have 100 records in table (check using Index), all in locations
marked at commit
@@ -200,8 +200,8 @@ public class TestCleaner extends TestHoodieClientBase {
insertFirstBigBatchForClientCleanerTest(cfg, client,
recordInsertGenWrappedFunction, insertFn);
Map<HoodieFileGroupId, FileSlice> compactionFileIdToLatestFileSlice =
new HashMap<>();
- HoodieTableMetaClient metadata = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
- HoodieTable table = HoodieTable.getHoodieTable(metadata, getConfig(),
jsc);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig(),
jsc);
for (String partitionPath : dataGen.getPartitionPaths()) {
TableFileSystemView fsView = table.getFileSystemView();
Option<Boolean> added =
Option.fromJavaOptional(fsView.getAllFileGroups(partitionPath).findFirst()
@@ -239,8 +239,8 @@ public class TestCleaner extends TestHoodieClientBase {
// Verify there are no errors
assertNoWriteErrors(statuses);
- metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(),
basePath);
- table = HoodieTable.getHoodieTable(metadata, getConfig(), jsc);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ table = HoodieTable.getHoodieTable(metaClient, getConfig(), jsc);
HoodieTimeline timeline = table.getMetaClient().getCommitsTimeline();
TableFileSystemView fsView = table.getFileSystemView();
@@ -375,8 +375,8 @@ public class TestCleaner extends TestHoodieClientBase {
// Verify there are no errors
assertNoWriteErrors(statuses);
- HoodieTableMetaClient metadata = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
- HoodieTable table1 = HoodieTable.getHoodieTable(metadata, cfg, jsc);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieTable table1 = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
HoodieTimeline activeTimeline = table1.getCompletedCommitsTimeline();
Option<HoodieInstant> earliestRetainedCommit =
activeTimeline.nthFromLastInstant(maxCommits - 1);
Set<HoodieInstant> acceptableCommits =
activeTimeline.getInstants().collect(Collectors.toSet());
@@ -424,9 +424,8 @@ public class TestCleaner extends TestHoodieClientBase {
.createNewDataFile(basePath,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000");
String file1P1C0 = HoodieTestUtils
.createNewDataFile(basePath,
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "000");
- HoodieTable table = HoodieTable.getHoodieTable(
- new HoodieTableMetaClient(jsc.hadoopConfiguration(),
config.getBasePath(), true), config,
- jsc);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
List<HoodieCleanStat> hoodieCleanStatsOne = table.clean(jsc);
assertEquals("Must not clean any files", 0,
@@ -442,8 +441,8 @@ public class TestCleaner extends TestHoodieClientBase {
// make next commit, with 1 insert & 1 update per partition
HoodieTestUtils.createCommitFiles(basePath, "001");
- table = HoodieTable.getHoodieTable(new
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true), config,
- jsc);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ table = HoodieTable.getHoodieTable(metaClient, config, jsc);
String file2P0C1 = HoodieTestUtils
.createNewDataFile(basePath,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001"); // insert
@@ -472,8 +471,8 @@ public class TestCleaner extends TestHoodieClientBase {
// make next commit, with 2 updates to existing files, and 1 insert
HoodieTestUtils.createCommitFiles(basePath, "002");
- table = HoodieTable.getHoodieTable(new
HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true),
- config, jsc);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ table = HoodieTable.getHoodieTable(metaClient, config, jsc);
HoodieTestUtils
.createDataFile(basePath,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002", file1P0C0); //
update
@@ -578,9 +577,8 @@ public class TestCleaner extends TestHoodieClientBase {
String file1P1C0 = HoodieTestUtils
.createNewDataFile(basePath,
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "000");
- HoodieTable table = HoodieTable.getHoodieTable(
- new HoodieTableMetaClient(jsc.hadoopConfiguration(),
config.getBasePath(), true), config,
- jsc);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
List<HoodieCleanStat> hoodieCleanStatsOne = table.clean(jsc);
assertEquals("Must not clean any files", 0,
@@ -596,8 +594,8 @@ public class TestCleaner extends TestHoodieClientBase {
// make next commit, with 1 insert & 1 update per partition
HoodieTestUtils.createCommitFiles(basePath, "001");
- table = HoodieTable.getHoodieTable(new
HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true),
- config, jsc);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ table = HoodieTable.getHoodieTable(metaClient, config, jsc);
String file2P0C1 = HoodieTestUtils
.createNewDataFile(basePath,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001"); // insert
@@ -626,8 +624,8 @@ public class TestCleaner extends TestHoodieClientBase {
// make next commit, with 2 updates to existing files, and 1 insert
HoodieTestUtils.createCommitFiles(basePath, "002");
- table = HoodieTable.getHoodieTable(new
HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true),
- config, jsc);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ table = HoodieTable.getHoodieTable(metaClient, config, jsc);
HoodieTestUtils
.createDataFile(basePath,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002", file1P0C0); //
update
@@ -646,8 +644,8 @@ public class TestCleaner extends TestHoodieClientBase {
// make next commit, with 2 updates to existing files, and 1 insert
HoodieTestUtils.createCommitFiles(basePath, "003");
- table = HoodieTable.getHoodieTable(new
HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true),
- config, jsc);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ table = HoodieTable.getHoodieTable(metaClient, config, jsc);
HoodieTestUtils
.createDataFile(basePath,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "003", file1P0C0); //
update
@@ -700,9 +698,8 @@ public class TestCleaner extends TestHoodieClientBase {
assertEquals("Some marker files are created.", markerFiles.size(),
getTotalTempFiles());
HoodieWriteConfig config =
HoodieWriteConfig.newBuilder().withPath(basePath).build();
- HoodieTable table = HoodieTable.getHoodieTable(
- new HoodieTableMetaClient(jsc.hadoopConfiguration(),
config.getBasePath(), true), config,
- jsc);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
table.rollback(jsc, "000", true);
assertEquals("All temp files are deleted.", 0, getTotalTempFiles());
@@ -722,9 +719,8 @@ public class TestCleaner extends TestHoodieClientBase {
// with just some commit metadata, but no data/partitionPaths.
HoodieTestUtils.createCommitFiles(basePath, "000");
- HoodieTable table = HoodieTable.getHoodieTable(
- new HoodieTableMetaClient(jsc.hadoopConfiguration(),
config.getBasePath(), true), config,
- jsc);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
List<HoodieCleanStat> hoodieCleanStatsOne = table.clean(jsc);
assertTrue("HoodieCleanStats should be empty for a table with empty
partitionPaths", hoodieCleanStatsOne.isEmpty());
@@ -783,9 +779,8 @@ public class TestCleaner extends TestHoodieClientBase {
updateAllFilesInPartition(filesP1C0,
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "003");
updateAllFilesInPartition(filesP2C0,
HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, "003");
- HoodieTable table = HoodieTable.getHoodieTable(
- new HoodieTableMetaClient(jsc.hadoopConfiguration(),
config.getBasePath(), true), config,
- jsc);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
List<HoodieCleanStat> hoodieCleanStats = table.clean(jsc);
assertEquals(100,
@@ -890,9 +885,8 @@ public class TestCleaner extends TestHoodieClientBase {
for (int j = 1; j <= i; j++) {
if (j == i && j <= maxNumFileIdsForCompaction) {
expFileIdToPendingCompaction.put(fileId, compactionInstants[j]);
- HoodieTable table = HoodieTable.getHoodieTable(
- new HoodieTableMetaClient(jsc.hadoopConfiguration(),
config.getBasePath(), true), config,
- jsc);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieTable table = HoodieTable.getHoodieTable(metaClient, config,
jsc);
FileSlice slice = table.getRTFileSystemView().getLatestFileSlices(
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)
.filter(fs -> fs.getFileId().equals(fileId)).findFirst().get();
@@ -934,15 +928,13 @@ public class TestCleaner extends TestHoodieClientBase {
}
// Clean now
- HoodieTable table = HoodieTable.getHoodieTable(
- new HoodieTableMetaClient(jsc.hadoopConfiguration(),
config.getBasePath(), true), config,
- jsc);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
List<HoodieCleanStat> hoodieCleanStats = table.clean(jsc);
// Test for safety
- final HoodieTable hoodieTable = HoodieTable.getHoodieTable(
- new HoodieTableMetaClient(jsc.hadoopConfiguration(),
config.getBasePath(), true), config,
- jsc);
+ final HoodieTableMetaClient newMetaClient =
HoodieTableMetaClient.reload(metaClient);
+ final HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient,
config, jsc);
expFileIdToPendingCompaction.entrySet().stream().forEach(entry -> {
String fileId = entry.getKey();
@@ -961,7 +953,7 @@ public class TestCleaner extends TestHoodieClientBase {
// Test for progress (Did we clean some files ?)
long numFilesUnderCompactionDeleted =
hoodieCleanStats.stream().flatMap(cleanStat -> {
- return convertPathToFileIdWithCommitTime(metaClient,
cleanStat.getDeletePathPatterns()).map(
+ return convertPathToFileIdWithCommitTime(newMetaClient,
cleanStat.getDeletePathPatterns()).map(
fileIdWithCommitTime -> {
if
(expFileIdToPendingCompaction.containsKey(fileIdWithCommitTime.getKey())) {
Assert.assertTrue("Deleted instant time must be less than
pending compaction",
diff --git a/hudi-client/src/test/java/org/apache/hudi/TestClientRollback.java
b/hudi-client/src/test/java/org/apache/hudi/TestClientRollback.java
index 20cc86c..11504a4 100644
--- a/hudi-client/src/test/java/org/apache/hudi/TestClientRollback.java
+++ b/hudi-client/src/test/java/org/apache/hudi/TestClientRollback.java
@@ -97,7 +97,7 @@ public class TestClientRollback extends TestHoodieClientBase {
assertNoWriteErrors(statuses);
List<String> partitionPaths = FSUtils.getAllPartitionPaths(fs,
cfg.getBasePath(),
getConfig().shouldAssumeDatePartitioning());
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig(),
jsc);
final ReadOptimizedView view1 = table.getROFileSystemView();
@@ -122,7 +122,7 @@ public class TestClientRollback extends
TestHoodieClientBase {
// Verify there are no errors
assertNoWriteErrors(statuses);
- metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(),
basePath);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
table = HoodieTable.getHoodieTable(metaClient, getConfig(), jsc);
final ReadOptimizedView view2 = table.getROFileSystemView();
@@ -143,7 +143,7 @@ public class TestClientRollback extends
TestHoodieClientBase {
HoodieInstant savepoint =
table.getCompletedSavepointTimeline().getInstants().findFirst().get();
client.rollbackToSavepoint(savepoint.getTimestamp());
- metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(),
basePath);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
table = HoodieTable.getHoodieTable(metaClient, getConfig(), jsc);
final ReadOptimizedView view3 = table.getROFileSystemView();
dataFiles = partitionPaths.stream().flatMap(s -> {
diff --git
a/hudi-client/src/test/java/org/apache/hudi/index/TestHBaseQPSResourceAllocator.java
b/hudi-client/src/test/java/org/apache/hudi/index/TestHBaseQPSResourceAllocator.java
index 622395d..9efe708 100644
---
a/hudi-client/src/test/java/org/apache/hudi/index/TestHBaseQPSResourceAllocator.java
+++
b/hudi-client/src/test/java/org/apache/hudi/index/TestHBaseQPSResourceAllocator.java
@@ -53,14 +53,14 @@ public class TestHBaseQPSResourceAllocator extends
HoodieClientTestHarness {
initTempFolderAndPath();
basePath = folder.getRoot().getAbsolutePath() + QPS_TEST_SUFFIX_PATH;
// Initialize table
- initTableType();
+ initMetaClient();
}
@After
public void tearDown() throws Exception {
cleanupSparkContexts();
cleanupTempFolderAndPath();
- cleanupTableType();
+ cleanupMetaClient();
if (utility != null) {
utility.shutdownMiniCluster();
}
diff --git
a/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java
b/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java
index 9d89186..6c2fc0f 100644
--- a/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java
+++ b/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java
@@ -40,7 +40,6 @@ import org.apache.hudi.HoodieWriteClient;
import org.apache.hudi.WriteStatus;
import org.apache.hudi.common.HoodieTestDataGenerator;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieTestUtils;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.config.HoodieCompactionConfig;
@@ -104,9 +103,8 @@ public class TestHbaseIndex extends HoodieClientTestHarness
{
// Create a temp folder as the base path
initTempFolderAndPath();
- // Initialize table
- HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath);
initTestDataGenerator();
+ initMetaClient();
}
@After
@@ -114,6 +112,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness
{
cleanupSparkContexts();
cleanupTempFolderAndPath();
cleanupTestDataGenerator();
+ cleanupMetaClient();
}
private HoodieWriteClient getWriteClient(HoodieWriteConfig config) throws
Exception {
@@ -132,7 +131,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness
{
HBaseIndex index = new HBaseIndex(config);
try (HoodieWriteClient writeClient = getWriteClient(config);) {
writeClient.startCommit();
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config,
jsc);
// Test tagLocation without any entries in index
@@ -151,7 +150,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness
{
// Now commit this & update location of records inserted and validate no
errors
writeClient.commit(newCommitTime, writeStatues);
// Now tagLocation for these records, hbaseIndex should tag them
correctly
- metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(),
basePath);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc);
javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
assertTrue(javaRDD.filter(record ->
record.isCurrentLocationKnown()).collect().size() == 200);
@@ -173,7 +172,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness
{
HBaseIndex index = new HBaseIndex(config);
HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config);
writeClient.startCommit();
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config,
jsc);
JavaRDD<WriteStatus> writeStatues = writeClient.upsert(writeRecords,
newCommitTime);
@@ -185,7 +184,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness
{
// Now commit this & update location of records inserted and validate no
errors
writeClient.commit(newCommitTime, writeStatues);
// Now tagLocation for these records, hbaseIndex should tag them correctly
- metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(),
basePath);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc);
JavaRDD<HoodieRecord> javaRDD = index.tagLocation(writeRecords, jsc,
hoodieTable);
assertTrue(javaRDD.filter(record ->
record.isCurrentLocationKnown()).collect().size() == 10);
@@ -205,7 +204,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness
{
String newCommitTime = writeClient.startCommit();
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config,
jsc);
// Insert 200 records
@@ -257,7 +256,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness
{
String newCommitTime = writeClient.startCommit();
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 250);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config,
jsc);
// Insert 250 records
@@ -282,7 +281,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness
{
String newCommitTime = writeClient.startCommit();
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 250);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config,
jsc);
// Insert 200 records
diff --git
a/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java
b/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java
index d073757..4354216 100644
--- a/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java
+++ b/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java
@@ -36,14 +36,14 @@ public class TestHoodieIndex extends
HoodieClientTestHarness {
public void setUp() throws Exception {
initSparkContexts("TestHoodieIndex");
initTempFolderAndPath();
- initTableType();
+ initMetaClient();
}
@After
public void tearDown() throws Exception {
cleanupSparkContexts();
cleanupTempFolderAndPath();
- cleanupTableType();
+ cleanupMetaClient();
}
@Test
diff --git
a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
index 28d19eb..69d0cfa 100644
---
a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
+++
b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
@@ -44,7 +44,6 @@ import org.apache.hudi.common.HoodieClientTestUtils;
import org.apache.hudi.common.TestRawTripPayload;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieTestUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.FSUtils;
import org.apache.hudi.common.util.FileIOUtils;
@@ -92,10 +91,10 @@ public class TestHoodieBloomIndex extends
HoodieClientTestHarness {
initSparkContexts("TestHoodieBloomIndex");
initTempFolderAndPath();
initFileSystem();
- HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath);
// We have some records to be tagged (two different partitions)
schemaStr =
FileIOUtils.readAsUTFString(getClass().getResourceAsStream("/exampleSchema.txt"));
schema = HoodieAvroUtils.addMetadataFields(new
Schema.Parser().parse(schemaStr));
+ initMetaClient();
}
@After
@@ -103,6 +102,7 @@ public class TestHoodieBloomIndex extends
HoodieClientTestHarness {
cleanupSparkContexts();
cleanupFileSystem();
cleanupTempFolderAndPath();
+ cleanupMetaClient();
}
private HoodieWriteConfig makeConfig() {
@@ -163,8 +163,8 @@ public class TestHoodieBloomIndex extends
HoodieClientTestHarness {
false);
List<String> partitions = Arrays.asList("2016/01/21", "2016/04/01",
"2015/03/12");
- HoodieTableMetaClient metadata = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
- HoodieTable table = HoodieTable.getHoodieTable(metadata, config, jsc);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
List<Tuple2<String, BloomIndexFileInfo>> filesList =
index.loadInvolvedFiles(partitions, jsc, table);
// Still 0, as no valid commit
assertEquals(filesList.size(), 0);
@@ -174,7 +174,7 @@ public class TestHoodieBloomIndex extends
HoodieClientTestHarness {
new File(basePath + "/.hoodie/20160401010101.commit").createNewFile();
new File(basePath + "/.hoodie/20150312101010.commit").createNewFile();
- table = HoodieTable.getHoodieTable(metadata, config, jsc);
+ table = HoodieTable.getHoodieTable(metaClient, config, jsc);
filesList = index.loadInvolvedFiles(partitions, jsc, table);
assertEquals(filesList.size(), 4);
@@ -286,9 +286,9 @@ public class TestHoodieBloomIndex extends
HoodieClientTestHarness {
// We have some records to be tagged (two different partitions)
JavaRDD<HoodieRecord> recordRDD = jsc.emptyRDD();
// Also create the metadata and config
- HoodieTableMetaClient metadata = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
HoodieWriteConfig config = makeConfig();
- HoodieTable table = HoodieTable.getHoodieTable(metadata, config, jsc);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
// Let's tag
HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config);
@@ -331,9 +331,9 @@ public class TestHoodieBloomIndex extends
HoodieClientTestHarness {
JavaRDD<HoodieRecord> recordRDD = jsc.parallelize(Arrays.asList(record1,
record2, record3, record4));
// Also create the metadata and config
- HoodieTableMetaClient metadata = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
HoodieWriteConfig config = makeConfig();
- HoodieTable table = HoodieTable.getHoodieTable(metadata, config, jsc);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
// Let's tag
HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config);
@@ -353,8 +353,8 @@ public class TestHoodieBloomIndex extends
HoodieClientTestHarness {
HoodieClientTestUtils.writeParquetFile(basePath, "2015/01/31",
Arrays.asList(record4), schema, null, true);
// We do the tag again
- metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
- table = HoodieTable.getHoodieTable(metadata, config, jsc);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ table = HoodieTable.getHoodieTable(metaClient, config, jsc);
taggedRecordRDD = bloomIndex.tagLocation(recordRDD, jsc, table);
@@ -401,9 +401,9 @@ public class TestHoodieBloomIndex extends
HoodieClientTestHarness {
JavaRDD<HoodieKey> keysRDD = jsc.parallelize(Arrays.asList(key1, key2,
key3, key4));
// Also create the metadata and config
- HoodieTableMetaClient metadata = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
HoodieWriteConfig config = makeConfig();
- HoodieTable table = HoodieTable.getHoodieTable(metadata, config, jsc);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
// Let's tag
HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config);
@@ -424,8 +424,8 @@ public class TestHoodieBloomIndex extends
HoodieClientTestHarness {
HoodieClientTestUtils.writeParquetFile(basePath, "2015/01/31",
Arrays.asList(record4), schema, null, true);
// We do the tag again
- metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
- table = HoodieTable.getHoodieTable(metadata, config, jsc);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ table = HoodieTable.getHoodieTable(metaClient, config, jsc);
taggedRecordRDD = bloomIndex.fetchRecordLocation(keysRDD, jsc, table);
// Check results
@@ -473,9 +473,9 @@ public class TestHoodieBloomIndex extends
HoodieClientTestHarness {
// We do the tag
JavaRDD<HoodieRecord> recordRDD = jsc.parallelize(Arrays.asList(record1,
record2));
- HoodieTableMetaClient metadata = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
HoodieWriteConfig config = makeConfig();
- HoodieTable table = HoodieTable.getHoodieTable(metadata, config, jsc);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config);
JavaRDD<HoodieRecord> taggedRecordRDD = bloomIndex.tagLocation(recordRDD,
jsc, table);
diff --git
a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
index 9993cb4..11669a3 100644
---
a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
+++
b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
@@ -40,7 +40,6 @@ import org.apache.hudi.common.TestRawTripPayload;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieTestUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.FSUtils;
import org.apache.hudi.common.util.FileIOUtils;
@@ -66,16 +65,17 @@ public class TestHoodieGlobalBloomIndex extends
HoodieClientTestHarness {
public void setUp() throws Exception {
initSparkContexts("TestHoodieGlobalBloomIndex");
initTempFolderAndPath();
- HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath);
// We have some records to be tagged (two different partitions)
schemaStr =
FileIOUtils.readAsUTFString(getClass().getResourceAsStream("/exampleSchema.txt"));
schema = HoodieAvroUtils.addMetadataFields(new
Schema.Parser().parse(schemaStr));
+ initMetaClient();
}
@After
public void tearDown() throws Exception {
cleanupSparkContexts();
cleanupTempFolderAndPath();
+ cleanupMetaClient();
}
@Test
@@ -127,8 +127,8 @@ public class TestHoodieGlobalBloomIndex extends
HoodieClientTestHarness {
// intentionally missed the partition "2015/03/12" to see if the
GlobalBloomIndex can pick it up
List<String> partitions = Arrays.asList("2016/01/21", "2016/04/01");
- HoodieTableMetaClient metadata = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
- HoodieTable table = HoodieTable.getHoodieTable(metadata, config, jsc);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
// partitions will NOT be respected by this loadInvolvedFiles(...) call
List<Tuple2<String, BloomIndexFileInfo>> filesList =
index.loadInvolvedFiles(partitions, jsc, table);
// Still 0, as no valid commit
@@ -139,7 +139,7 @@ public class TestHoodieGlobalBloomIndex extends
HoodieClientTestHarness {
new File(basePath + "/.hoodie/20160401010101.commit").createNewFile();
new File(basePath + "/.hoodie/20150312101010.commit").createNewFile();
- table = HoodieTable.getHoodieTable(metadata, config, jsc);
+ table = HoodieTable.getHoodieTable(metaClient, config, jsc);
filesList = index.loadInvolvedFiles(partitions, jsc, table);
assertEquals(filesList.size(), 4);
@@ -264,8 +264,8 @@ public class TestHoodieGlobalBloomIndex extends
HoodieClientTestHarness {
HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12",
Arrays.asList(record4), schema, null, false);
// intentionally missed the partition "2015/03/12" to see if the
GlobalBloomIndex can pick it up
- HoodieTableMetaClient metadata = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
- HoodieTable table = HoodieTable.getHoodieTable(metadata, config, jsc);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
// Add some commits
diff --git
a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java
b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java
index 5dee6f6..9a9ddc0 100644
---
a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java
+++
b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java
@@ -54,6 +54,7 @@ import org.junit.Test;
public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
private Configuration hadoopConf;
+ private HoodieTableMetaClient metaClient;
@Before
public void init() throws Exception {
@@ -63,7 +64,7 @@ public class TestHoodieCommitArchiveLog extends
HoodieClientTestHarness {
hadoopConf = dfs.getConf();
jsc.hadoopConfiguration().addResource(dfs.getConf());
dfs.mkdirs(new Path(basePath));
- HoodieTestUtils.init(hadoopConf, basePath);
+ metaClient = HoodieTestUtils.init(hadoopConf, basePath);
}
@After
@@ -78,8 +79,8 @@ public class TestHoodieCommitArchiveLog extends
HoodieClientTestHarness {
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath)
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.forTable("test-trip-table").build();
- HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg,
- new HoodieTableMetaClient(dfs.getConf(), cfg.getBasePath(), true));
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg,
metaClient);
boolean result = archiveLog.archiveIfRequired(jsc);
assertTrue(result);
}
@@ -135,7 +136,7 @@ public class TestHoodieCommitArchiveLog extends
HoodieClientTestHarness {
new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION,
"105"), dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "105", dfs.getConf());
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(dfs.getConf(), basePath);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTimeline timeline =
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
assertEquals("Loaded 6 commits and the count should match", 6,
timeline.countInstants());
@@ -158,8 +159,8 @@ public class TestHoodieCommitArchiveLog extends
HoodieClientTestHarness {
// verify in-flight instants before archive
verifyInflightInstants(metaClient, 3);
- HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg,
- new HoodieTableMetaClient(dfs.getConf(), basePath, true));
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg,
metaClient);
assertTrue(archiveLog.archiveIfRequired(jsc));
@@ -235,7 +236,7 @@ public class TestHoodieCommitArchiveLog extends
HoodieClientTestHarness {
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.forTable("test-trip-table").withCompactionConfig(
HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2,
5).build()).build();
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(dfs.getConf(), basePath);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg,
metaClient);
// Requested Compaction
HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath,
@@ -302,7 +303,7 @@ public class TestHoodieCommitArchiveLog extends
HoodieClientTestHarness {
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.forTable("test-trip-table").withCompactionConfig(
HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2,
5).build()).build();
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(dfs.getConf(), basePath);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg,
metaClient);
HoodieTestDataGenerator.createCommitFile(basePath, "100", dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "101", dfs.getConf());
@@ -328,7 +329,7 @@ public class TestHoodieCommitArchiveLog extends
HoodieClientTestHarness {
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.forTable("test-trip-table").withCompactionConfig(
HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2,
5).build()).build();
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(dfs.getConf(), basePath);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg,
metaClient);
HoodieTestDataGenerator.createCommitFile(basePath, "100", dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "101", dfs.getConf());
@@ -360,7 +361,7 @@ public class TestHoodieCommitArchiveLog extends
HoodieClientTestHarness {
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.forTable("test-trip-table").withCompactionConfig(
HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2,
5).build()).build();
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(dfs.getConf(), basePath);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg,
metaClient);
HoodieTestDataGenerator.createCommitFile(basePath, "100", dfs.getConf());
HoodieTestDataGenerator.createCompactionRequestedFile(basePath, "101",
dfs.getConf());
diff --git
a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCompactor.java
b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCompactor.java
index 8d9a676..0a02b70 100644
--- a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCompactor.java
+++ b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCompactor.java
@@ -52,6 +52,7 @@ import org.junit.Test;
public class TestHoodieCompactor extends HoodieClientTestHarness {
private Configuration hadoopConf;
+ private HoodieTableMetaClient metaClient;
@Before
public void setUp() throws Exception {
@@ -62,7 +63,7 @@ public class TestHoodieCompactor extends
HoodieClientTestHarness {
initTempFolderAndPath();
hadoopConf = HoodieTestUtils.getDefaultHadoopConf();
fs = FSUtils.getFs(basePath, hadoopConf);
- HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ);
+ metaClient = HoodieTestUtils.init(hadoopConf, basePath,
HoodieTableType.MERGE_ON_READ);
initTestDataGenerator();
}
@@ -96,9 +97,7 @@ public class TestHoodieCompactor extends
HoodieClientTestHarness {
@Test(expected = HoodieNotSupportedException.class)
public void testCompactionOnCopyOnWriteFail() throws Exception {
- HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.COPY_ON_WRITE);
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
-
+ metaClient = HoodieTestUtils.init(hadoopConf, basePath,
HoodieTableType.COPY_ON_WRITE);
HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig(),
jsc);
String compactionInstantTime = HoodieActiveTimeline.createNewCommitTime();
table.compact(jsc, compactionInstantTime, table.scheduleCompaction(jsc,
compactionInstantTime));
@@ -106,8 +105,8 @@ public class TestHoodieCompactor extends
HoodieClientTestHarness {
@Test
public void testCompactionEmpty() throws Exception {
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
HoodieWriteConfig config = getConfig();
+ metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
try (HoodieWriteClient writeClient = getWriteClient(config);) {
@@ -136,7 +135,7 @@ public class TestHoodieCompactor extends
HoodieClientTestHarness {
List<WriteStatus> statuses = writeClient.insert(recordsRDD,
newCommitTime).collect();
// Update all the 100 records
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
newCommitTime = "101";
@@ -153,7 +152,7 @@ public class TestHoodieCompactor extends
HoodieClientTestHarness {
updatedRecords);
// Verify that all data file has one log file
- metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(),
basePath);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
table = HoodieTable.getHoodieTable(metaClient, config, jsc);
for (String partitionPath : dataGen.getPartitionPaths()) {
List<FileSlice> groupedLogFiles =
table.getRTFileSystemView().getLatestFileSlices(partitionPath)
@@ -164,7 +163,7 @@ public class TestHoodieCompactor extends
HoodieClientTestHarness {
}
// Do a compaction
- metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(),
basePath);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
table = HoodieTable.getHoodieTable(metaClient, config, jsc);
String compactionInstantTime =
HoodieActiveTimeline.createNewCommitTime();
diff --git
a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
index 288e04c..68db6ca 100644
--- a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
+++ b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
@@ -58,17 +58,17 @@ public class TestHoodieMergeHandle extends
HoodieClientTestHarness {
initSparkContexts("TestHoodieMergeHandle");
initTempFolderAndPath();
initFileSystem();
- initTableType();
initTestDataGenerator();
+ initMetaClient();
}
@After
public void tearDown() throws Exception {
- cleanupTableType();
cleanupFileSystem();
cleanupTestDataGenerator();
cleanupTempFolderAndPath();
cleanupSparkContexts();
+ cleanupMetaClient();
}
private HoodieWriteClient getWriteClient(HoodieWriteConfig config) throws
Exception {
@@ -109,7 +109,7 @@ public class TestHoodieMergeHandle extends
HoodieClientTestHarness {
assertNoWriteErrors(statuses);
// verify that there is a commit
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTimeline timeline = new
HoodieActiveTimeline(metaClient).getCommitTimeline();
assertEquals("Expecting a single commit.", 1,
timeline.findInstantsAfter("000",
Integer.MAX_VALUE).countInstants());
@@ -137,7 +137,7 @@ public class TestHoodieMergeHandle extends
HoodieClientTestHarness {
assertNoWriteErrors(statuses);
// verify that there are 2 commits
- metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(),
basePath);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
assertEquals("Expecting two commits.", 2,
timeline.findInstantsAfter("000", Integer.MAX_VALUE)
.countInstants());
@@ -161,7 +161,7 @@ public class TestHoodieMergeHandle extends
HoodieClientTestHarness {
assertNoWriteErrors(statuses);
// verify that there are now 3 commits
- metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(),
basePath);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
assertEquals("Expecting three commits.", 3,
timeline.findInstantsAfter("000", Integer.MAX_VALUE)
.countInstants());
@@ -259,7 +259,7 @@ public class TestHoodieMergeHandle extends
HoodieClientTestHarness {
.map(status -> status.getStat().getNumInserts()).reduce((a, b) -> a
+ b).get(), 100);
// Update all the 100 records
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
newCommitTime = "101";
diff --git
a/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java
b/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java
index 6439d75..2c46fa9 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java
@@ -73,7 +73,7 @@ public class TestCopyOnWriteTable extends
HoodieClientTestHarness {
public void setUp() throws Exception {
initSparkContexts("TestCopyOnWriteTable");
initTempFolderAndPath();
- initTableType();
+ initMetaClient();
initTestDataGenerator();
initFileSystem();
}
@@ -82,7 +82,7 @@ public class TestCopyOnWriteTable extends
HoodieClientTestHarness {
public void tearDown() throws Exception {
cleanupSparkContexts();
cleanupTempFolderAndPath();
- cleanupTableType();
+ cleanupMetaClient();
cleanupFileSystem();
cleanupTestDataGenerator();
}
@@ -94,7 +94,7 @@ public class TestCopyOnWriteTable extends
HoodieClientTestHarness {
String commitTime = HoodieTestUtils.makeNewCommitTime();
HoodieWriteConfig config = makeHoodieClientConfig();
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
Pair<Path, String> newPathWithWriteToken =
jsc.parallelize(Arrays.asList(1)).map(x -> {
@@ -127,7 +127,7 @@ public class TestCopyOnWriteTable extends
HoodieClientTestHarness {
// Prepare the AvroParquetIO
HoodieWriteConfig config = makeHoodieClientConfig();
String firstCommitTime = HoodieTestUtils.makeNewCommitTime();
- HoodieTableMetaClient metadata = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
String partitionPath = "/2016/01/31";
HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc);
@@ -203,7 +203,7 @@ public class TestCopyOnWriteTable extends
HoodieClientTestHarness {
Thread.sleep(1000);
String newCommitTime = HoodieTestUtils.makeNewCommitTime();
- metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
final HoodieCopyOnWriteTable newTable = new HoodieCopyOnWriteTable(config,
jsc);
List<WriteStatus> statuses =
jsc.parallelize(Arrays.asList(1)).map(x -> {
@@ -271,7 +271,7 @@ public class TestCopyOnWriteTable extends
HoodieClientTestHarness {
HoodieWriteConfig config =
makeHoodieClientConfigBuilder().withWriteStatusClass(MetadataMergeWriteStatus.class)
.build();
String firstCommitTime = HoodieTestUtils.makeNewCommitTime();
- HoodieTableMetaClient metadata = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc);
@@ -308,8 +308,8 @@ public class TestCopyOnWriteTable extends
HoodieClientTestHarness {
public void testInsertRecords() throws Exception {
HoodieWriteConfig config = makeHoodieClientConfig();
String commitTime = HoodieTestUtils.makeNewCommitTime();
- HoodieTableMetaClient metadata = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
// Case 1:
// 10 records for partition 1, 1 record for partition 2.
@@ -362,7 +362,7 @@ public class TestCopyOnWriteTable extends
HoodieClientTestHarness {
HoodieStorageConfig.newBuilder().limitFileSize(64 *
1024).parquetBlockSize(64 * 1024).parquetPageSize(64 * 1024)
.build()).build();
String commitTime = HoodieTestUtils.makeNewCommitTime();
- HoodieTableMetaClient metadata = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc);
List<HoodieRecord> records = new ArrayList<>();
@@ -401,8 +401,7 @@ public class TestCopyOnWriteTable extends
HoodieClientTestHarness {
HoodieClientTestUtils.fakeCommitFile(basePath, "001");
HoodieClientTestUtils.fakeDataFile(basePath, testPartitionPath, "001",
"file1", fileSize);
-
- HoodieTableMetaClient metadata = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc);
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new
String[]{testPartitionPath});
@@ -476,7 +475,7 @@ public class TestCopyOnWriteTable extends
HoodieClientTestHarness {
public void testInsertUpsertWithHoodieAvroPayload() throws Exception {
HoodieWriteConfig config =
makeHoodieClientConfigBuilder().withStorageConfig(
HoodieStorageConfig.newBuilder().limitFileSize(1000 *
1024).build()).build();
- HoodieTableMetaClient metadata = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
final HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config,
jsc);
String commitTime = "000";
// Perform inserts of 100 records to test CreateHandle and BufferedExecutor
@@ -487,7 +486,7 @@ public class TestCopyOnWriteTable extends
HoodieClientTestHarness {
WriteStatus writeStatus = ws.get(0).get(0);
String fileId = writeStatus.getFileId();
- metadata.getFs().create(new Path(basePath +
"/.hoodie/000.commit")).close();
+ metaClient.getFs().create(new Path(basePath +
"/.hoodie/000.commit")).close();
final HoodieCopyOnWriteTable table2 = new HoodieCopyOnWriteTable(config,
jsc);
final List<HoodieRecord> updates =
diff --git
a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
index 833b39a..788c783 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
@@ -156,7 +156,7 @@ public class TestMergeOnReadTable extends
HoodieClientTestHarness {
statuses = client.upsert(jsc.parallelize(records, 1),
newCommitTime).collect();
// Verify there are no errors
assertNoWriteErrors(statuses);
- metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(),
cfg.getBasePath());
+ metaClient = HoodieTableMetaClient.reload(metaClient);
deltaCommit =
metaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant();
assertTrue(deltaCommit.isPresent());
assertEquals("Latest Delta commit should be 004", "004",
deltaCommit.get().getTimestamp());
@@ -173,7 +173,7 @@ public class TestMergeOnReadTable extends
HoodieClientTestHarness {
assertTrue(dataFilesToRead.findAny().isPresent());
// verify that there is a commit
- metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(),
cfg.getBasePath(), true);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTimeline timeline =
metaClient.getCommitTimeline().filterCompletedInstants();
assertEquals("Expecting a single commit.", 1,
timeline.findInstantsAfter("000",
Integer.MAX_VALUE).countInstants());
@@ -270,7 +270,7 @@ public class TestMergeOnReadTable extends
HoodieClientTestHarness {
// Verify there are no errors
assertNoWriteErrors(statuses);
- metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(),
cfg.getBasePath());
+ metaClient = HoodieTableMetaClient.reload(metaClient);
deltaCommit =
metaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant();
assertTrue(deltaCommit.isPresent());
assertEquals("Latest Delta commit should be 004", "004",
deltaCommit.get().getTimestamp());
@@ -335,7 +335,7 @@ public class TestMergeOnReadTable extends
HoodieClientTestHarness {
//rollback a COW commit when TableType is MOR
client.rollback(newCommitTime);
- metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(),
cfg.getBasePath());
+ metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg,
jsc);
FileStatus[] allFiles =
HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
HoodieTableFileSystemView roView = new
HoodieTableFileSystemView(metaClient,
@@ -454,7 +454,7 @@ public class TestMergeOnReadTable extends
HoodieClientTestHarness {
Assert.assertEquals(Arrays.asList(allFiles).stream().filter(file ->
file.getPath().getName()
.contains(commitTime2)).collect(Collectors.toList()).size(), 0);
- metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(),
cfg.getBasePath());
+ metaClient = HoodieTableMetaClient.reload(metaClient);
hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
roView = new HoodieTableFileSystemView(metaClient,
hoodieTable.getCompletedCommitsTimeline(), allFiles);
dataFiles = roView.getLatestDataFiles().map(hf ->
hf.getPath()).collect(Collectors.toList());
@@ -477,14 +477,14 @@ public class TestMergeOnReadTable extends
HoodieClientTestHarness {
// Verify there are no errors
assertNoWriteErrors(statuses);
- metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(),
cfg.getBasePath());
+ metaClient = HoodieTableMetaClient.reload(metaClient);
String compactionInstantTime =
thirdClient.scheduleCompaction(Option.empty()).get().toString();
JavaRDD<WriteStatus> ws = thirdClient.compact(compactionInstantTime);
thirdClient.commitCompaction(compactionInstantTime, ws,
Option.empty());
allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(),
cfg.getBasePath());
- metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(),
cfg.getBasePath());
+ metaClient = HoodieTableMetaClient.reload(metaClient);
hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
roView = new HoodieTableFileSystemView(metaClient,
metaClient.getCommitsTimeline(), allFiles);
List<HoodieDataFile> dataFiles2 =
roView.getLatestDataFiles().collect(Collectors.toList());
@@ -504,7 +504,7 @@ public class TestMergeOnReadTable extends
HoodieClientTestHarness {
thirdClient.rollback(compactedCommitTime);
allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(),
cfg.getBasePath());
- metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(),
cfg.getBasePath());
+ metaClient = HoodieTableMetaClient.reload(metaClient);
hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
roView = new HoodieTableFileSystemView(metaClient,
metaClient.getCommitsTimeline(), allFiles);
@@ -603,7 +603,7 @@ public class TestMergeOnReadTable extends
HoodieClientTestHarness {
// Verify there are no errors
assertNoWriteErrors(statuses);
- metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(),
cfg.getBasePath());
+ metaClient = HoodieTableMetaClient.reload(metaClient);
String compactionInstantTime = "004";
allCommits.add(compactionInstantTime);
@@ -626,7 +626,7 @@ public class TestMergeOnReadTable extends
HoodieClientTestHarness {
// Verify there are no errors
assertNoWriteErrors(statuses);
- metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(),
cfg.getBasePath());
+ metaClient = HoodieTableMetaClient.reload(metaClient);
compactionInstantTime = "006";
allCommits.add(compactionInstantTime);
@@ -635,7 +635,7 @@ public class TestMergeOnReadTable extends
HoodieClientTestHarness {
client.commitCompaction(compactionInstantTime, ws, Option.empty());
allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(),
cfg.getBasePath());
- metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(),
cfg.getBasePath());
+ metaClient = HoodieTableMetaClient.reload(metaClient);
roView = new HoodieTableFileSystemView(metaClient,
metaClient.getCommitsTimeline(), allFiles);
final String compactedCommitTime =
metaClient.getActiveTimeline().reload().getCommitsTimeline().lastInstant()
@@ -669,7 +669,7 @@ public class TestMergeOnReadTable extends
HoodieClientTestHarness {
// Rollback latest commit first
client.restoreToInstant("000");
- metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(),
cfg.getBasePath());
+ metaClient = HoodieTableMetaClient.reload(metaClient);
allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(),
cfg.getBasePath());
roView = new HoodieTableFileSystemView(metaClient,
metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
@@ -754,7 +754,7 @@ public class TestMergeOnReadTable extends
HoodieClientTestHarness {
// Verify there are no errors
assertNoWriteErrors(statuses);
- metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(),
cfg.getBasePath());
+ metaClient = HoodieTableMetaClient.reload(metaClient);
deltaCommit =
metaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant();
assertTrue(deltaCommit.isPresent());
assertEquals("Latest Delta commit should be 002", "002",
deltaCommit.get().getTimestamp());
@@ -811,7 +811,7 @@ public class TestMergeOnReadTable extends
HoodieClientTestHarness {
HoodieTestDataGenerator.avroSchemaWithMetadataFields,
updatedRecords);
// Verify that all data file has one log file
- metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(),
basePath);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
table = HoodieTable.getHoodieTable(metaClient, config, jsc);
// In writeRecordsToLogFiles, no commit files are getting added, so
resetting file-system view state
((SyncableFileSystemView) (table.getRTFileSystemView())).reset();
@@ -833,7 +833,7 @@ public class TestMergeOnReadTable extends
HoodieClientTestHarness {
JavaRDD<WriteStatus> result =
writeClient.compact(compactionInstantTime);
// Verify that recently written compacted data file has no log file
- metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(),
basePath);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
table = HoodieTable.getHoodieTable(metaClient, config, jsc);
HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
@@ -949,7 +949,7 @@ public class TestMergeOnReadTable extends
HoodieClientTestHarness {
.copyToLocalFile(new Path(metaClient.getMetaPath(), fileName), new
Path(file.getAbsolutePath()));
writeClient.rollback(newCommitTime);
- metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(),
basePath);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
RealtimeView tableRTFileSystemView = table.getRTFileSystemView();
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index e0c30be..7cf095b 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -71,6 +71,7 @@ public class HoodieTableMetaClient implements Serializable {
private String basePath;
private transient HoodieWrapperFileSystem fs;
private String metaPath;
+ private boolean loadActiveTimelineOnLoad;
private SerializableConfiguration hadoopConf;
private HoodieTableType tableType;
private HoodieTableConfig tableConfig;
@@ -104,6 +105,7 @@ public class HoodieTableMetaClient implements Serializable {
this.tableConfig = new HoodieTableConfig(fs, metaPath);
this.tableType = tableConfig.getTableType();
log.info("Finished Loading Table of type " + tableType + " from " +
basePath);
+ this.loadActiveTimelineOnLoad = loadActiveTimelineOnLoad;
if (loadActiveTimelineOnLoad) {
log.info("Loading Active commit timeline for " + basePath);
getActiveTimeline();
@@ -118,6 +120,14 @@ public class HoodieTableMetaClient implements Serializable
{
public HoodieTableMetaClient() {
}
+ public static HoodieTableMetaClient reload(HoodieTableMetaClient
oldMetaClient) {
+ return new HoodieTableMetaClient(
+ oldMetaClient.hadoopConf.get(),
+ oldMetaClient.basePath,
+ oldMetaClient.loadActiveTimelineOnLoad,
+ oldMetaClient.consistencyGuardConfig);
+ }
+
/**
* This method is only used when this object is deserialized in a spark
executor.
*