This is an automated email from the ASF dual-hosted git repository.
vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new b54517a [HUDI-886] Replace jsc.hadoopConfiguration by hadoop
configuration in hudi-client testcase (#1621)
b54517a is described below
commit b54517aad07750071c9df164d9e573ed3335652d
Author: Shen Hong <[email protected]>
AuthorDate: Tue May 12 23:51:31 2020 +0800
[HUDI-886] Replace jsc.hadoopConfiguration by hadoop configuration in
hudi-client testcase (#1621)
---
.../apache/hudi/client/TestHoodieClientBase.java | 8 ++--
.../TestHoodieClientOnCopyOnWriteStorage.java | 36 +++++++++---------
.../java/org/apache/hudi/client/TestMultiFS.java | 4 +-
.../hudi/common/HoodieClientTestHarness.java | 4 +-
.../java/org/apache/hudi/index/TestHbaseIndex.java | 2 +-
.../hudi/index/bloom/TestHoodieBloomIndex.java | 2 +-
.../apache/hudi/io/TestHoodieCommitArchiveLog.java | 2 +-
.../org/apache/hudi/io/TestHoodieMergeHandle.java | 2 +-
.../java/org/apache/hudi/table/TestCleaner.java | 4 +-
.../apache/hudi/table/TestMergeOnReadTable.java | 44 +++++++++++-----------
.../commit/TestCopyOnWriteActionExecutor.java | 10 ++---
.../table/action/compact/TestAsyncCompaction.java | 38 +++++++++----------
12 files changed, 78 insertions(+), 78 deletions(-)
diff --git
a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java
b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java
index 89f109a..c8f9fdd 100644
--- a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java
+++ b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java
@@ -249,7 +249,7 @@ public class TestHoodieClientBase extends
HoodieClientTestHarness {
return (commit, numRecords) -> {
final HoodieIndex index = HoodieIndex.createIndex(writeConfig);
List<HoodieRecord> records = recordGenFunction.apply(commit, numRecords);
- final HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true);
+ final HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(hadoopConf, basePath, true);
HoodieTable table = HoodieTable.create(metaClient, writeConfig,
hadoopConf);
JavaRDD<HoodieRecord> taggedRecords =
index.tagLocation(jsc.parallelize(records, 1), jsc, table);
return taggedRecords.collect();
@@ -270,7 +270,7 @@ public class TestHoodieClientBase extends
HoodieClientTestHarness {
return (numRecords) -> {
final HoodieIndex index = HoodieIndex.createIndex(writeConfig);
List<HoodieKey> records = keyGenFunction.apply(numRecords);
- final HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true);
+ final HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(hadoopConf, basePath, true);
HoodieTable table = HoodieTable.create(metaClient, writeConfig,
hadoopConf);
JavaRDD<HoodieRecord> recordsToDelete = jsc.parallelize(records, 1)
.map(key -> new HoodieRecord(key, new EmptyHoodieRecordPayload()));
@@ -467,7 +467,7 @@ public class TestHoodieClientBase extends
HoodieClientTestHarness {
assertPartitionMetadataForRecords(records, fs);
// verify that there is a commit
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+ HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
basePath);
HoodieTimeline timeline = new
HoodieActiveTimeline(metaClient).getCommitTimeline();
if (assertForCommit) {
@@ -535,7 +535,7 @@ public class TestHoodieClientBase extends
HoodieClientTestHarness {
assertPartitionMetadataForKeys(keysToDelete, fs);
// verify that there is a commit
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+ HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
basePath);
HoodieTimeline timeline = new
HoodieActiveTimeline(metaClient).getCommitTimeline();
if (assertForCommit) {
diff --git
a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
index 246ba37..223c39d 100644
---
a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
+++
b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
@@ -478,7 +478,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
TestHoodieClientBase {
assertEquals(1, statuses.size(), "Just 1 file needs to be added.");
String file1 = statuses.get(0).getFileId();
assertEquals(100,
- readRowKeysFromParquet(jsc.hadoopConfiguration(), new Path(basePath,
statuses.get(0).getStat().getPath()))
+ readRowKeysFromParquet(hadoopConf, new Path(basePath,
statuses.get(0).getStat().getPath()))
.size(), "file should contain 100 records");
// Update + Inserts such that they just expand file1
@@ -498,10 +498,10 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
TestHoodieClientBase {
assertEquals(file1, statuses.get(0).getFileId(), "Existing file should be
expanded");
assertEquals(commitTime1, statuses.get(0).getStat().getPrevCommit(),
"Existing file should be expanded");
Path newFile = new Path(basePath, statuses.get(0).getStat().getPath());
- assertEquals(140, readRowKeysFromParquet(jsc.hadoopConfiguration(),
newFile).size(),
+ assertEquals(140, readRowKeysFromParquet(hadoopConf, newFile).size(),
"file should contain 140 records");
- List<GenericRecord> records =
ParquetUtils.readAvroRecords(jsc.hadoopConfiguration(), newFile);
+ List<GenericRecord> records = ParquetUtils.readAvroRecords(hadoopConf,
newFile);
for (GenericRecord record : records) {
String recordKey =
record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
assertEquals(commitTime2,
record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString(), "only expect
commit2");
@@ -521,7 +521,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
TestHoodieClientBase {
assertNoWriteErrors(statuses);
assertEquals(2, statuses.size(), "2 files needs to be committed.");
- HoodieTableMetaClient metadata = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+ HoodieTableMetaClient metadata = new HoodieTableMetaClient(hadoopConf,
basePath);
HoodieTable table = getHoodieTable(metadata, config);
BaseFileOnlyView fileSystemView = table.getBaseFileOnlyView();
@@ -532,7 +532,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
TestHoodieClientBase {
for (HoodieBaseFile file : files) {
if (file.getFileName().contains(file1)) {
assertEquals(commitTime3, file.getCommitTime(), "Existing file should
be expanded");
- records = ParquetUtils.readAvroRecords(jsc.hadoopConfiguration(), new
Path(file.getPath()));
+ records = ParquetUtils.readAvroRecords(hadoopConf, new
Path(file.getPath()));
for (GenericRecord record : records) {
String recordKey =
record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
String recordCommitTime =
record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
@@ -548,7 +548,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
TestHoodieClientBase {
assertEquals(0, keys2.size(), "All keys added in commit 2 must be
updated in commit3 correctly");
} else {
assertEquals(commitTime3, file.getCommitTime(), "New file must be
written for commit 3");
- records = ParquetUtils.readAvroRecords(jsc.hadoopConfiguration(), new
Path(file.getPath()));
+ records = ParquetUtils.readAvroRecords(hadoopConf, new
Path(file.getPath()));
for (GenericRecord record : records) {
String recordKey =
record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
assertEquals(commitTime3,
record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString(),
@@ -589,7 +589,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
TestHoodieClientBase {
assertEquals(1, statuses.size(), "Just 1 file needs to be added.");
String file1 = statuses.get(0).getFileId();
assertEquals(100,
- readRowKeysFromParquet(jsc.hadoopConfiguration(), new Path(basePath,
statuses.get(0).getStat().getPath()))
+ readRowKeysFromParquet(hadoopConf, new Path(basePath,
statuses.get(0).getStat().getPath()))
.size(), "file should contain 100 records");
// Second, set of Inserts should just expand file1
@@ -605,10 +605,10 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
TestHoodieClientBase {
assertEquals(file1, statuses.get(0).getFileId(), "Existing file should be
expanded");
assertEquals(commitTime1, statuses.get(0).getStat().getPrevCommit(),
"Existing file should be expanded");
Path newFile = new Path(basePath, statuses.get(0).getStat().getPath());
- assertEquals(140, readRowKeysFromParquet(jsc.hadoopConfiguration(),
newFile).size(),
+ assertEquals(140, readRowKeysFromParquet(hadoopConf, newFile).size(),
"file should contain 140 records");
- List<GenericRecord> records =
ParquetUtils.readAvroRecords(jsc.hadoopConfiguration(), newFile);
+ List<GenericRecord> records = ParquetUtils.readAvroRecords(hadoopConf,
newFile);
for (GenericRecord record : records) {
String recordKey =
record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
String recCommitTime =
record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
@@ -627,7 +627,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
TestHoodieClientBase {
assertNoWriteErrors(statuses);
assertEquals(2, statuses.size(), "2 files needs to be committed.");
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+ HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
basePath);
HoodieTable table = getHoodieTable(metaClient, config);
List<HoodieBaseFile> files = table.getBaseFileOnlyView()
.getLatestBaseFilesBeforeOrOn(testPartitionPath,
commitTime3).collect(Collectors.toList());
@@ -636,7 +636,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
TestHoodieClientBase {
int totalInserts = 0;
for (HoodieBaseFile file : files) {
assertEquals(commitTime3, file.getCommitTime(), "All files must be at
commit 3");
- records = ParquetUtils.readAvroRecords(jsc.hadoopConfiguration(), new
Path(file.getPath()));
+ records = ParquetUtils.readAvroRecords(hadoopConf, new
Path(file.getPath()));
totalInserts += records.size();
}
assertEquals(totalInserts, inserts1.size() + inserts2.size() +
insert3.size(),
@@ -670,7 +670,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
TestHoodieClientBase {
assertEquals(1, statuses.size(), "Just 1 file needs to be added.");
String file1 = statuses.get(0).getFileId();
assertEquals(100,
- readRowKeysFromParquet(jsc.hadoopConfiguration(), new Path(basePath,
statuses.get(0).getStat().getPath()))
+ readRowKeysFromParquet(hadoopConf, new Path(basePath,
statuses.get(0).getStat().getPath()))
.size(), "file should contain 100 records");
// Delete 20 among 100 inserted
@@ -763,10 +763,10 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
TestHoodieClientBase {
Path newFile = new Path(basePath, statuses.get(0).getStat().getPath());
assertEquals(exepctedRecords,
- readRowKeysFromParquet(jsc.hadoopConfiguration(), newFile).size(),
+ readRowKeysFromParquet(hadoopConf, newFile).size(),
"file should contain 110 records");
- List<GenericRecord> records =
ParquetUtils.readAvroRecords(jsc.hadoopConfiguration(), newFile);
+ List<GenericRecord> records = ParquetUtils.readAvroRecords(hadoopConf,
newFile);
for (GenericRecord record : records) {
String recordKey =
record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
assertTrue(keys.contains(recordKey), "key expected to be part of " +
instantTime);
@@ -808,7 +808,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
TestHoodieClientBase {
HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).build();
try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+ HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
basePath);
HoodieTable table = HoodieTable.create(metaClient, cfg, hadoopConf);
String instantTime = "000";
@@ -855,7 +855,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
TestHoodieClientBase {
HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).build();
HoodieWriteClient client = getHoodieWriteClient(cfg);
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+ HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
basePath);
String instantTime = "000";
client.startCommitWithTime(instantTime);
@@ -926,7 +926,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
TestHoodieClientBase {
*/
@Test
public void testConsistencyCheckDuringFinalize() throws Exception {
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+ HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
basePath);
String instantTime = "000";
HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).build();
HoodieWriteClient client = getHoodieWriteClient(cfg);
@@ -944,7 +944,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
TestHoodieClientBase {
@Test
public void testRollbackAfterConsistencyCheckFailure() throws Exception {
String instantTime = "000";
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+ HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
basePath);
HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).build();
HoodieWriteClient client = getHoodieWriteClient(cfg);
testConsistencyCheck(metaClient, instantTime);
diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestMultiFS.java
b/hudi-client/src/test/java/org/apache/hudi/client/TestMultiFS.java
index 5c470cb..54e5e8b 100644
--- a/hudi-client/src/test/java/org/apache/hudi/client/TestMultiFS.java
+++ b/hudi-client/src/test/java/org/apache/hudi/client/TestMultiFS.java
@@ -81,7 +81,7 @@ public class TestMultiFS extends HoodieClientTestHarness {
@Test
public void readLocalWriteHDFS() throws Exception {
// Initialize table and filesystem
- HoodieTableMetaClient.initTableType(jsc.hadoopConfiguration(),
dfsBasePath, HoodieTableType.valueOf(tableType),
+ HoodieTableMetaClient.initTableType(hadoopConf, dfsBasePath,
HoodieTableType.valueOf(tableType),
tableName, HoodieAvroPayload.class.getName());
// Create write client to write some records in
@@ -106,7 +106,7 @@ public class TestMultiFS extends HoodieClientTestHarness {
assertEquals(readRecords.count(), records.size(), "Should contain 100
records");
// Write to local
- HoodieTableMetaClient.initTableType(jsc.hadoopConfiguration(),
tablePath, HoodieTableType.valueOf(tableType),
+ HoodieTableMetaClient.initTableType(hadoopConf, tablePath,
HoodieTableType.valueOf(tableType),
tableName, HoodieAvroPayload.class.getName());
String writeCommitTime = localWriteClient.startCommit();
diff --git
a/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java
b/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java
index 62d5505..2a24c21 100644
---
a/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java
+++
b/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java
@@ -144,7 +144,7 @@ public abstract class HoodieClientTestHarness extends
HoodieCommonTestHarness im
throw new IllegalStateException("The Spark context has not been
initialized.");
}
- initFileSystemWithConfiguration(jsc.hadoopConfiguration());
+ initFileSystemWithConfiguration(hadoopConf);
}
/**
@@ -181,7 +181,7 @@ public abstract class HoodieClientTestHarness extends
HoodieCommonTestHarness im
throw new IllegalStateException("The Spark context has not been
initialized.");
}
- metaClient = HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath,
getTableType());
+ metaClient = HoodieTestUtils.init(hadoopConf, basePath, getTableType());
}
/**
diff --git
a/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java
b/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java
index 42a0b97..d1122cf 100644
--- a/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java
+++ b/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java
@@ -112,7 +112,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness
{
public void setUp() throws Exception {
// Initialize a local spark env
initSparkContexts("TestHbaseIndex");
- jsc.hadoopConfiguration().addResource(utility.getConfiguration());
+ hadoopConf.addResource(utility.getConfiguration());
// Create a temp folder as the base path
initPath();
diff --git
a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
index a6f69b6..8b93828 100644
---
a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
+++
b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
@@ -259,7 +259,7 @@ public class TestHoodieBloomIndex extends
HoodieClientTestHarness {
List<String> uuids =
Arrays.asList(record1.getRecordKey(), record2.getRecordKey(),
record3.getRecordKey(), record4.getRecordKey());
- List<String> results =
HoodieKeyLookupHandle.checkCandidatesAgainstFile(jsc.hadoopConfiguration(),
uuids,
+ List<String> results =
HoodieKeyLookupHandle.checkCandidatesAgainstFile(hadoopConf, uuids,
new Path(basePath + "/2016/01/31/" + filename));
assertEquals(results.size(), 2);
assertTrue(results.get(0).equals("1eb5b87a-1feh-4edd-87b4-6ec96dc405a0")
diff --git
a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java
b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java
index d56f297..bfd5946 100644
---
a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java
+++
b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java
@@ -62,7 +62,7 @@ public class TestHoodieCommitArchiveLog extends
HoodieClientTestHarness {
initPath();
initSparkContexts("TestHoodieCommitArchiveLog");
hadoopConf = dfs.getConf();
- jsc.hadoopConfiguration().addResource(dfs.getConf());
+ hadoopConf.addResource(dfs.getConf());
dfs.mkdirs(new Path(basePath));
metaClient = HoodieTestUtils.init(hadoopConf, basePath);
}
diff --git
a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
index 601f64a..29adf5e 100644
--- a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
+++ b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
@@ -85,7 +85,7 @@ public class TestHoodieMergeHandle extends
HoodieClientTestHarness {
// Build a write config with bulkinsertparallelism set
HoodieWriteConfig cfg = getConfigBuilder().build();
try (HoodieWriteClient client = getWriteClient(cfg);) {
- FileSystem fs = FSUtils.getFs(basePath, jsc.hadoopConfiguration());
+ FileSystem fs = FSUtils.getFs(basePath, hadoopConf);
/**
* Write 1 (only inserts) This will do a bulk insert of 44 records of
which there are 2 records repeated 21 times
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java
b/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java
index 2efdeb5..8445518 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestCleaner.java
@@ -567,7 +567,7 @@ public class TestCleaner extends TestHoodieClientBase {
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build())
.build();
- HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath,
HoodieTableType.MERGE_ON_READ);
+ HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ);
// Make 3 files, one base file and 2 log files associated with base file
String file1P0 =
@@ -1010,7 +1010,7 @@ public class TestCleaner extends TestHoodieClientBase {
private void testPendingCompactions(HoodieWriteConfig config, int
expNumFilesDeleted,
int expNumFilesUnderCompactionDeleted, boolean retryFailure) throws
IOException {
HoodieTableMetaClient metaClient =
- HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath,
HoodieTableType.MERGE_ON_READ);
+ HoodieTestUtils.init(hadoopConf, basePath,
HoodieTableType.MERGE_ON_READ);
String[] instants = new String[] {"000", "001", "003", "005", "007",
"009", "011", "013"};
String[] compactionInstants = new String[] {"002", "004", "006", "008",
"010"};
Map<String, String> expFileIdToPendingCompaction = new HashMap<>();
diff --git
a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
index 57f5edb..bcdffba 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
@@ -100,19 +100,19 @@ public class TestMergeOnReadTable extends
HoodieClientTestHarness {
public void init() throws IOException {
initDFS();
initSparkContexts("TestHoodieMergeOnReadTable");
- jsc.hadoopConfiguration().addResource(dfs.getConf());
+ hadoopConf.addResource(dfs.getConf());
initPath();
dfs.mkdirs(new Path(basePath));
- HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath,
HoodieTableType.MERGE_ON_READ);
+ HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ);
initTestDataGenerator();
// initialize parquet input format
roInputFormat = new HoodieParquetInputFormat();
- roJobConf = new JobConf(jsc.hadoopConfiguration());
+ roJobConf = new JobConf(hadoopConf);
roInputFormat.setConf(roJobConf);
rtInputFormat = new HoodieParquetRealtimeInputFormat();
- rtJobConf = new JobConf(jsc.hadoopConfiguration());
+ rtJobConf = new JobConf(hadoopConf);
rtInputFormat.setConf(rtJobConf);
}
@@ -307,7 +307,7 @@ public class TestMergeOnReadTable extends
HoodieClientTestHarness {
List<WriteStatus> statuses = client.upsert(writeRecords,
newCommitTime).collect();
assertNoWriteErrors(statuses);
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
+ HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
cfg.getBasePath());
HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg,
hadoopConf);
Option<HoodieInstant> deltaCommit =
metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
@@ -375,7 +375,7 @@ public class TestMergeOnReadTable extends
HoodieClientTestHarness {
public void testCOWToMORConvertedTableRollback() throws Exception {
// Set TableType to COW
- HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath,
HoodieTableType.COPY_ON_WRITE);
+ HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.COPY_ON_WRITE);
HoodieWriteConfig cfg = getConfig(true);
try (HoodieWriteClient client = getWriteClient(cfg);) {
@@ -393,7 +393,7 @@ public class TestMergeOnReadTable extends
HoodieClientTestHarness {
// verify there are no errors
assertNoWriteErrors(statuses);
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
+ HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
cfg.getBasePath());
Option<HoodieInstant> commit =
metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
assertTrue(commit.isPresent());
assertEquals("001", commit.get().getTimestamp(), "commit should be 001");
@@ -411,7 +411,7 @@ public class TestMergeOnReadTable extends
HoodieClientTestHarness {
assertNoWriteErrors(statuses);
// Set TableType to MOR
- HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath,
HoodieTableType.MERGE_ON_READ);
+ HoodieTestUtils.init(hadoopConf, basePath,
HoodieTableType.MERGE_ON_READ);
// rollback a COW commit when TableType is MOR
client.rollback(newCommitTime);
@@ -448,7 +448,7 @@ public class TestMergeOnReadTable extends
HoodieClientTestHarness {
List<WriteStatus> statuses = writeStatusJavaRDD.collect();
assertNoWriteErrors(statuses);
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
+ HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
cfg.getBasePath());
HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg,
hadoopConf);
Option<HoodieInstant> deltaCommit =
metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
@@ -595,7 +595,7 @@ public class TestMergeOnReadTable extends
HoodieClientTestHarness {
List<WriteStatus> statuses = writeStatusJavaRDD.collect();
assertNoWriteErrors(statuses);
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
+ HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
cfg.getBasePath());
HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg,
hadoopConf);
Option<HoodieInstant> deltaCommit =
metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
@@ -758,7 +758,7 @@ public class TestMergeOnReadTable extends
HoodieClientTestHarness {
List<WriteStatus> statuses = client.upsert(writeRecords,
newCommitTime).collect();
assertNoWriteErrors(statuses);
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
+ HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
cfg.getBasePath());
HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg,
hadoopConf);
Option<HoodieInstant> deltaCommit =
metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
@@ -832,7 +832,7 @@ public class TestMergeOnReadTable extends
HoodieClientTestHarness {
writeClient.insert(recordsRDD, newCommitTime).collect();
// Update all the 100 records
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+ HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
basePath);
newCommitTime = "101";
writeClient.startCommitWithTime(newCommitTime);
@@ -907,7 +907,7 @@ public class TestMergeOnReadTable extends
HoodieClientTestHarness {
writeClient.commit(newCommitTime, statuses);
HoodieTable table =
- HoodieTable.create(new
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath), config, hadoopConf);
+ HoodieTable.create(new HoodieTableMetaClient(hadoopConf, basePath),
config, hadoopConf);
SliceView tableRTFileSystemView = table.getSliceView();
long numLogFiles = 0;
@@ -968,7 +968,7 @@ public class TestMergeOnReadTable extends
HoodieClientTestHarness {
// We will test HUDI-204 here. We will simulate rollback happening twice
by copying the commit file to local fs
// and calling rollback twice
final String lastCommitTime = newCommitTime;
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+ HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
basePath);
HoodieInstant last = metaClient.getCommitsTimeline().getInstants()
.filter(instant ->
instant.getTimestamp().equals(lastCommitTime)).findFirst().get();
String fileName = last.getFileName();
@@ -1017,7 +1017,7 @@ public class TestMergeOnReadTable extends
HoodieClientTestHarness {
statuses.collect();
HoodieTable table =
- HoodieTable.create(new
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath), config, hadoopConf);
+ HoodieTable.create(new HoodieTableMetaClient(hadoopConf, basePath),
config, hadoopConf);
SliceView tableRTFileSystemView = table.getSliceView();
long numLogFiles = 0;
@@ -1038,7 +1038,7 @@ public class TestMergeOnReadTable extends
HoodieClientTestHarness {
writeClient.commitCompaction(newCommitTime, statuses, Option.empty());
// Trigger a rollback of compaction
writeClient.rollback(newCommitTime);
- table = HoodieTable.create(new
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath), config, hadoopConf);
+ table = HoodieTable.create(new HoodieTableMetaClient(hadoopConf,
basePath), config, hadoopConf);
tableRTFileSystemView = table.getSliceView();
((SyncableFileSystemView) tableRTFileSystemView).reset();
Option<HoodieInstant> lastInstant = ((SyncableFileSystemView)
tableRTFileSystemView).getLastInstant();
@@ -1058,7 +1058,7 @@ public class TestMergeOnReadTable extends
HoodieClientTestHarness {
HoodieWriteConfig cfg = getConfigBuilder(false,
IndexType.INMEMORY).withAutoCommit(false).build();
try (HoodieWriteClient client = getWriteClient(cfg);) {
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+ HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
basePath);
HoodieTable table = HoodieTable.create(metaClient, cfg, hadoopConf);
// Create a commit without rolling stats in metadata to test backwards
compatibility
@@ -1157,7 +1157,7 @@ public class TestMergeOnReadTable extends
HoodieClientTestHarness {
public void testRollingStatsWithSmallFileHandling() throws Exception {
HoodieWriteConfig cfg = getConfigBuilder(false,
IndexType.INMEMORY).withAutoCommit(false).build();
try (HoodieWriteClient client = getWriteClient(cfg);) {
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+ HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
basePath);
Map<String, Long> fileIdToInsertsMap = new HashMap<>();
Map<String, Long> fileIdToUpsertsMap = new HashMap<>();
@@ -1304,7 +1304,7 @@ public class TestMergeOnReadTable extends
HoodieClientTestHarness {
List<WriteStatus> statuses = client.upsert(writeRecords,
newCommitTime).collect();
assertNoWriteErrors(statuses);
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
+ HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
cfg.getBasePath());
HoodieMergeOnReadTable hoodieTable = (HoodieMergeOnReadTable)
HoodieTable.create(metaClient, cfg, hadoopConf);
Option<HoodieInstant> deltaCommit =
metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
@@ -1397,7 +1397,7 @@ public class TestMergeOnReadTable extends
HoodieClientTestHarness {
List<WriteStatus> statuses = client.insert(writeRecords,
commitTime).collect();
assertNoWriteErrors(statuses);
- metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(),
cfg.getBasePath());
+ metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf);
Option<HoodieInstant> deltaCommit =
metaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant();
@@ -1450,7 +1450,7 @@ public class TestMergeOnReadTable extends
HoodieClientTestHarness {
private FileStatus[] getROIncrementalFiles(String partitionPath, String
startCommitTime, int numCommitsToPull, boolean stopAtCompaction)
throws Exception {
- HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath,
HoodieTableType.MERGE_ON_READ);
+ HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ);
setupIncremental(roJobConf, startCommitTime, numCommitsToPull,
stopAtCompaction);
FileInputFormat.setInputPaths(roJobConf, Paths.get(basePath,
partitionPath).toString());
return roInputFormat.listStatus(roJobConf);
@@ -1463,7 +1463,7 @@ public class TestMergeOnReadTable extends
HoodieClientTestHarness {
private FileStatus[] getRTIncrementalFiles(String partitionPath, String
startCommitTime, int numCommitsToPull)
throws Exception {
- HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath,
HoodieTableType.MERGE_ON_READ);
+ HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ);
setupIncremental(rtJobConf, startCommitTime, numCommitsToPull, false);
FileInputFormat.setInputPaths(rtJobConf, Paths.get(basePath,
partitionPath).toString());
return rtInputFormat.listStatus(rtJobConf);
diff --git
a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java
b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java
index 54fb555..d39df24 100644
---
a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java
+++
b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java
@@ -163,13 +163,13 @@ public class TestCopyOnWriteActionExecutor extends
HoodieClientTestHarness {
// Read out the bloom filter and make sure filter can answer record exist
or not
Path parquetFilePath = allFiles[0].getPath();
- BloomFilter filter =
ParquetUtils.readBloomFilterFromParquetMetadata(jsc.hadoopConfiguration(),
parquetFilePath);
+ BloomFilter filter =
ParquetUtils.readBloomFilterFromParquetMetadata(hadoopConf, parquetFilePath);
for (HoodieRecord record : records) {
assertTrue(filter.mightContain(record.getRecordKey()));
}
// Read the parquet file, check the record content
- List<GenericRecord> fileRecords =
ParquetUtils.readAvroRecords(jsc.hadoopConfiguration(), parquetFilePath);
+ List<GenericRecord> fileRecords = ParquetUtils.readAvroRecords(hadoopConf,
parquetFilePath);
GenericRecord newRecord;
int index = 0;
for (GenericRecord record : fileRecords) {
@@ -205,7 +205,7 @@ public class TestCopyOnWriteActionExecutor extends
HoodieClientTestHarness {
// Check whether the record has been updated
Path updatedParquetFilePath = allFiles[0].getPath();
BloomFilter updatedFilter =
-
ParquetUtils.readBloomFilterFromParquetMetadata(jsc.hadoopConfiguration(),
updatedParquetFilePath);
+ ParquetUtils.readBloomFilterFromParquetMetadata(hadoopConf,
updatedParquetFilePath);
for (HoodieRecord record : records) {
// No change to the _row_key
assertTrue(updatedFilter.mightContain(record.getRecordKey()));
@@ -234,9 +234,9 @@ public class TestCopyOnWriteActionExecutor extends
HoodieClientTestHarness {
throws Exception {
// initialize parquet input format
HoodieParquetInputFormat hoodieInputFormat = new
HoodieParquetInputFormat();
- JobConf jobConf = new JobConf(jsc.hadoopConfiguration());
+ JobConf jobConf = new JobConf(hadoopConf);
hoodieInputFormat.setConf(jobConf);
- HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath,
HoodieTableType.COPY_ON_WRITE);
+ HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.COPY_ON_WRITE);
setupIncremental(jobConf, startCommitTime, numCommitsToPull);
FileInputFormat.setInputPaths(jobConf, Paths.get(basePath,
partitionPath).toString());
return hoodieInputFormat.listStatus(jobConf);
diff --git
a/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
index f348f0c..88fdae8 100644
---
a/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
+++
b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
@@ -107,7 +107,7 @@ public class TestAsyncCompaction extends
TestHoodieClientBase {
// Schedule compaction but do not run them
scheduleCompaction(compactionInstantTime, client, cfg);
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
+ HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
cfg.getBasePath());
HoodieInstant pendingCompactionInstant =
metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get();
@@ -118,14 +118,14 @@ public class TestAsyncCompaction extends
TestHoodieClientBase {
moveCompactionFromRequestedToInflight(compactionInstantTime, cfg);
// Reload and rollback inflight compaction
- metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(),
cfg.getBasePath());
+ metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg,
hadoopConf);
// hoodieTable.rollback(jsc,
// new HoodieInstant(true, HoodieTimeline.COMPACTION_ACTION,
compactionInstantTime), false);
client.rollbackInflightCompaction(
new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION,
compactionInstantTime), hoodieTable);
- metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(),
cfg.getBasePath());
+ metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
pendingCompactionInstant =
metaClient.getCommitsAndCompactionTimeline().filterPendingCompactionTimeline()
.getInstants().findFirst().get();
assertEquals("compaction", pendingCompactionInstant.getAction());
@@ -163,10 +163,10 @@ public class TestAsyncCompaction extends
TestHoodieClientBase {
// Schedule compaction but do not run them
scheduleCompaction(compactionInstantTime, client, cfg);
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
+ HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
cfg.getBasePath());
createNextDeltaCommit(inflightInstantTime, records, client, metaClient,
cfg, true);
- metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(),
cfg.getBasePath());
+ metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
HoodieInstant pendingCompactionInstant =
metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get();
assertEquals(compactionInstantTime,
pendingCompactionInstant.getTimestamp(),
@@ -179,7 +179,7 @@ public class TestAsyncCompaction extends
TestHoodieClientBase {
client.startCommitWithTime(nextInflightInstantTime);
// Validate
- metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(),
cfg.getBasePath());
+ metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
inflightInstant =
metaClient.getActiveTimeline().filterPendingExcludingCompaction().firstInstant().get();
assertEquals(inflightInstant.getTimestamp(), nextInflightInstantTime,
"inflight instant has expected instant time");
assertEquals(1, metaClient.getActiveTimeline()
@@ -211,7 +211,7 @@ public class TestAsyncCompaction extends
TestHoodieClientBase {
new ArrayList<>());
// Schedule and mark compaction instant as inflight
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
+ HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
cfg.getBasePath());
HoodieTable hoodieTable = getHoodieTable(metaClient, cfg);
scheduleCompaction(compactionInstantTime, client, cfg);
moveCompactionFromRequestedToInflight(compactionInstantTime, cfg);
@@ -244,7 +244,7 @@ public class TestAsyncCompaction extends
TestHoodieClientBase {
// Schedule compaction but do not run them
scheduleCompaction(compactionInstantTime, client, cfg);
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
+ HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
cfg.getBasePath());
HoodieInstant pendingCompactionInstant =
metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get();
assertEquals(compactionInstantTime,
pendingCompactionInstant.getTimestamp(), "Pending Compaction instant has
expected instant time");
@@ -273,10 +273,10 @@ public class TestAsyncCompaction extends
TestHoodieClientBase {
records = runNextDeltaCommits(client, readClient,
Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true,
new ArrayList<>());
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
+ HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
cfg.getBasePath());
createNextDeltaCommit(inflightInstantTime, records, client, metaClient,
cfg, true);
- metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(),
cfg.getBasePath());
+ metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
HoodieInstant inflightInstant =
metaClient.getActiveTimeline().filterPendingExcludingCompaction().firstInstant().get();
assertEquals(inflightInstantTime, inflightInstant.getTimestamp(),
"inflight instant has expected instant time");
@@ -338,7 +338,7 @@ public class TestAsyncCompaction extends
TestHoodieClientBase {
runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime,
secondInstantTime), records, cfg, true,
new ArrayList<>());
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
+ HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
cfg.getBasePath());
HoodieTable hoodieTable = getHoodieTable(metaClient, cfg);
scheduleAndExecuteCompaction(compactionInstantTime, client, hoodieTable,
cfg, numRecs, false);
}
@@ -362,7 +362,7 @@ public class TestAsyncCompaction extends
TestHoodieClientBase {
records = runNextDeltaCommits(client, readClient,
Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true,
new ArrayList<>());
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
+ HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
cfg.getBasePath());
HoodieTable hoodieTable = getHoodieTable(metaClient, cfg);
scheduleCompaction(compactionInstantTime, client, cfg);
@@ -379,7 +379,7 @@ public class TestAsyncCompaction extends
TestHoodieClientBase {
private void validateDeltaCommit(String latestDeltaCommit,
final Map<HoodieFileGroupId, Pair<String, HoodieCompactionOperation>>
fgIdToCompactionOperation,
HoodieWriteConfig cfg) {
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
+ HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
cfg.getBasePath());
HoodieTable table = getHoodieTable(metaClient, cfg);
List<FileSlice> fileSliceList = getCurrentLatestFileSlices(table);
fileSliceList.forEach(fileSlice -> {
@@ -400,7 +400,7 @@ public class TestAsyncCompaction extends
TestHoodieClientBase {
List<HoodieRecord> records,
HoodieWriteConfig cfg, boolean insertFirst, List<String>
expPendingCompactionInstants)
throws Exception {
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
+ HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
cfg.getBasePath());
List<Pair<String, HoodieCompactionPlan>> pendingCompactions =
readClient.getPendingCompactions();
List<String> gotPendingCompactionInstants =
pendingCompactions.stream().map(pc ->
pc.getKey()).sorted().collect(Collectors.toList());
@@ -422,7 +422,7 @@ public class TestAsyncCompaction extends
TestHoodieClientBase {
client.commit(firstInstant, statuses);
}
assertNoWriteErrors(statusList);
- metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(),
cfg.getBasePath());
+ metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
HoodieTable hoodieTable = getHoodieTable(metaClient, cfg);
List<HoodieBaseFile> dataFilesToRead =
getCurrentLatestDataFiles(hoodieTable, cfg);
assertTrue(dataFilesToRead.stream().findAny().isPresent(),
@@ -433,7 +433,7 @@ public class TestAsyncCompaction extends
TestHoodieClientBase {
int numRecords = records.size();
for (String instantTime : deltaInstants) {
records = dataGen.generateUpdates(instantTime, numRecords);
- metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(),
cfg.getBasePath());
+ metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
createNextDeltaCommit(instantTime, records, client, metaClient, cfg,
false);
validateDeltaCommit(instantTime, fgIdToCompactionOperation, cfg);
}
@@ -441,7 +441,7 @@ public class TestAsyncCompaction extends
TestHoodieClientBase {
}
private void moveCompactionFromRequestedToInflight(String
compactionInstantTime, HoodieWriteConfig cfg) {
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
+ HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
cfg.getBasePath());
HoodieInstant compactionInstant =
HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime);
metaClient.getActiveTimeline().transitionCompactionRequestedToInflight(compactionInstant);
HoodieInstant instant =
metaClient.getActiveTimeline().reload().filterPendingCompactionTimeline().getInstants()
@@ -452,7 +452,7 @@ public class TestAsyncCompaction extends
TestHoodieClientBase {
private void scheduleCompaction(String compactionInstantTime,
HoodieWriteClient client, HoodieWriteConfig cfg)
throws IOException {
client.scheduleCompactionAtInstant(compactionInstantTime, Option.empty());
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
+ HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
cfg.getBasePath());
HoodieInstant instant =
metaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().get();
assertEquals(compactionInstantTime, instant.getTimestamp(), "Last
compaction instant must be the one set");
}
@@ -484,7 +484,7 @@ public class TestAsyncCompaction extends
TestHoodieClientBase {
}
// verify that there is a commit
- table = getHoodieTable(new
HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath(), true), cfg);
+ table = getHoodieTable(new HoodieTableMetaClient(hadoopConf,
cfg.getBasePath(), true), cfg);
HoodieTimeline timeline =
table.getMetaClient().getCommitTimeline().filterCompletedInstants();
String latestCompactionCommitTime =
timeline.lastInstant().get().getTimestamp();
assertEquals(latestCompactionCommitTime, compactionInstantTime,