This is an automated email from the ASF dual-hosted git repository.
vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new c9fcf96 [HUDI-1315] Adding builder for HoodieTableMetaClient
initialization (#2534)
c9fcf96 is described below
commit c9fcf964b2bae56a54cb72951c8d8999eb323ed6
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Fri Feb 19 20:54:26 2021 -0500
[HUDI-1315] Adding builder for HoodieTableMetaClient initialization (#2534)
---
.../main/java/org/apache/hudi/cli/HoodieCLI.java | 4 +-
.../apache/hudi/cli/commands/CommitsCommand.java | 4 +-
.../hudi/cli/commands/FileSystemViewCommand.java | 2 +-
.../org/apache/hudi/cli/commands/SparkMain.java | 6 +-
.../org/apache/hudi/cli/commands/TableCommand.java | 2 +-
.../scala/org/apache/hudi/cli/DedupeSparkJob.scala | 4 +-
.../apache/hudi/client/AbstractHoodieClient.java | 6 +-
.../metadata/HoodieBackedTableMetadataWriter.java | 2 +-
.../org/apache/hudi/table/HoodieFlinkTable.java | 11 ++-
.../org/apache/hudi/table/HoodieJavaTable.java | 11 ++-
.../org/apache/hudi/client/HoodieReadClient.java | 4 +-
.../org/apache/hudi/table/HoodieSparkTable.java | 11 ++-
.../hudi/client/TestCompactionAdminClient.java | 12 ++--
.../TestHoodieClientOnCopyOnWriteStorage.java | 12 ++--
.../java/org/apache/hudi/client/TestMultiFS.java | 4 +-
.../hudi/metadata/TestHoodieBackedMetadata.java | 10 +--
.../table/action/compact/CompactionTestBase.java | 16 ++---
.../table/action/compact/TestAsyncCompaction.java | 26 +++----
.../table/action/compact/TestInlineCompaction.java | 36 +++++-----
.../hudi/testutils/HoodieClientTestBase.java | 8 +--
.../hudi/testutils/HoodieClientTestHarness.java | 2 +-
.../hudi/testutils/HoodieClientTestUtils.java | 2 +-
.../hudi/testutils/HoodieMergeOnReadTestUtils.java | 2 +-
.../hudi/common/table/HoodieTableMetaClient.java | 83 +++++++++++++++-------
.../table/log/AbstractHoodieLogRecordScanner.java | 2 +-
.../hudi/common/table/log/LogReaderUtils.java | 2 +-
.../common/table/view/FileSystemViewManager.java | 2 +-
.../apache/hudi/metadata/BaseTableMetadata.java | 2 +-
.../hudi/metadata/HoodieBackedTableMetadata.java | 2 +-
.../table/timeline/TestHoodieActiveTimeline.java | 7 +-
.../table/view/TestIncrementalFSViewSync.java | 10 +--
.../hudi/common/testutils/CompactionTestUtils.java | 2 +-
.../hudi/common/testutils/FileCreateUtils.java | 2 +-
.../common/testutils/HoodieCommonTestHarness.java | 2 +-
.../hudi/common/util/TestCompactionUtils.java | 4 +-
.../hudi/hadoop/HoodieROTablePathFilter.java | 2 +-
.../realtime/AbstractRealtimeRecordReader.java | 2 +-
.../hudi/hadoop/utils/HoodieInputFormatUtils.java | 2 +-
.../integ/testsuite/dag/nodes/CompactNode.java | 5 +-
.../integ/testsuite/dag/nodes/RollbackNode.java | 5 +-
.../testsuite/dag/nodes/ScheduleCompactNode.java | 5 +-
.../reader/DFSHoodieDatasetInputReader.java | 3 +-
.../testsuite/job/TestHoodieTestSuiteJob.java | 8 +--
.../internal/DataSourceInternalWriterHelper.java | 2 +-
.../org/apache/hudi/HoodieDataSourceHelpers.java | 2 +-
.../main/scala/org/apache/hudi/DefaultSource.scala | 8 +--
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 3 +-
.../org/apache/hudi/HoodieStreamingSink.scala | 4 +-
.../sql/hudi/streaming/HoodieStreamSource.scala | 2 +-
.../src/test/java/HoodieJavaStreamingApp.java | 4 +-
.../apache/hudi/functional/TestCOWDataSource.scala | 12 ++--
.../hudi/functional/TestStructuredStreaming.scala | 9 ++-
.../hudi/sync/common/AbstractSyncHoodieClient.java | 2 +-
.../hudi/utilities/HiveIncrementalPuller.java | 4 +-
.../apache/hudi/utilities/HoodieClusteringJob.java | 2 +-
.../hudi/utilities/HoodieCompactionAdminTool.java | 2 +-
.../hudi/utilities/HoodieSnapshotCopier.java | 2 +-
.../hudi/utilities/HoodieSnapshotExporter.java | 4 +-
...heckpointFromAnotherHoodieTimelineProvider.java | 2 +-
.../hudi/utilities/deltastreamer/DeltaSync.java | 3 +-
.../deltastreamer/HoodieDeltaStreamer.java | 4 +-
.../hudi/utilities/perf/TimelineServerPerf.java | 2 +-
.../sources/helpers/IncrSourceHelper.java | 2 +-
.../functional/TestHoodieDeltaStreamer.java | 20 +++---
64 files changed, 241 insertions(+), 203 deletions(-)
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieCLI.java
b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieCLI.java
index a4059e1..04dedc5 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieCLI.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieCLI.java
@@ -85,8 +85,8 @@ public class HoodieCLI {
}
public static void refreshTableMetadata() {
- setTableMetaClient(new HoodieTableMetaClient(HoodieCLI.conf, basePath,
false, HoodieCLI.consistencyGuardConfig,
- Option.of(layoutVersion)));
+
setTableMetaClient(HoodieTableMetaClient.builder().setConf(HoodieCLI.conf).setBasePath(basePath).setLoadActiveTimelineOnLoad(false).setConsistencyGuardConfig(HoodieCLI.consistencyGuardConfig)
+ .setLayoutVersion(Option.of(layoutVersion)).build());
}
public static void connectTo(String basePath, Integer layoutVersion) {
diff --git
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java
index 70e2029..3e216b4 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java
@@ -401,7 +401,7 @@ public class CommitsCommand implements CommandMarker {
public String compareCommits(@CliOption(key = {"path"}, help = "Path of the
table to compare to") final String path) {
HoodieTableMetaClient source = HoodieCLI.getTableMetaClient();
- HoodieTableMetaClient target = new HoodieTableMetaClient(HoodieCLI.conf,
path);
+ HoodieTableMetaClient target =
HoodieTableMetaClient.builder().setConf(HoodieCLI.conf).setBasePath(path).build();
HoodieTimeline targetTimeline =
target.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
HoodieTimeline sourceTimeline =
source.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
String targetLatestCommit =
@@ -426,7 +426,7 @@ public class CommitsCommand implements CommandMarker {
@CliCommand(value = "commits sync", help = "Compare commits with another
Hoodie table")
public String syncCommits(@CliOption(key = {"path"}, help = "Path of the
table to compare to") final String path) {
- HoodieCLI.syncTableMetadata = new HoodieTableMetaClient(HoodieCLI.conf,
path);
+ HoodieCLI.syncTableMetadata =
HoodieTableMetaClient.builder().setConf(HoodieCLI.conf).setBasePath(path).build();
HoodieCLI.state = HoodieCLI.CLIState.SYNC;
return "Load sync state between " +
HoodieCLI.getTableMetaClient().getTableConfig().getTableName() + " and "
+ HoodieCLI.syncTableMetadata.getTableConfig().getTableName();
diff --git
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/FileSystemViewCommand.java
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/FileSystemViewCommand.java
index ef76ee4..37bc651 100644
---
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/FileSystemViewCommand.java
+++
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/FileSystemViewCommand.java
@@ -237,7 +237,7 @@ public class FileSystemViewCommand implements CommandMarker
{
boolean includeMaxInstant, boolean includeInflight, boolean
excludeCompaction) throws IOException {
HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
HoodieTableMetaClient metaClient =
- new HoodieTableMetaClient(client.getHadoopConf(),
client.getBasePath(), true);
+
HoodieTableMetaClient.builder().setConf(client.getHadoopConf()).setBasePath(client.getBasePath()).setLoadActiveTimelineOnLoad(true).build();
FileSystem fs = HoodieCLI.fs;
String globPath = String.format("%s/%s/*", client.getBasePath(),
globRegex);
List<FileStatus> statuses = FSUtils.getGlobStatusExcludingMetaFolder(fs,
new Path(globPath));
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
index f715b16..0f0a851 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
@@ -402,8 +402,10 @@ public class SparkMain {
*/
protected static int upgradeOrDowngradeTable(JavaSparkContext jsc, String
basePath, String toVersion) {
HoodieWriteConfig config = getWriteConfig(basePath);
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), false,
- config.getConsistencyGuardConfig(), Option.of(new
TimelineLayoutVersion(config.getTimelineLayoutVersion())));
+ HoodieTableMetaClient metaClient =
+
HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(config.getBasePath())
+
.setLoadActiveTimelineOnLoad(false).setConsistencyGuardConfig(config.getConsistencyGuardConfig())
+ .setLayoutVersion(Option.of(new
TimelineLayoutVersion(config.getTimelineLayoutVersion()))).build();
try {
new SparkUpgradeDowngrade(metaClient, config, new
HoodieSparkEngineContext(jsc)).run(metaClient,
HoodieTableVersion.valueOf(toVersion), config, new
HoodieSparkEngineContext(jsc), null);
LOG.info(String.format("Table at \"%s\" upgraded / downgraded to version
\"%s\".", basePath, toVersion));
diff --git
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java
index 9c947e4..168de26 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java
@@ -95,7 +95,7 @@ public class TableCommand implements CommandMarker {
boolean existing = false;
try {
- new HoodieTableMetaClient(HoodieCLI.conf, path);
+
HoodieTableMetaClient.builder().setConf(HoodieCLI.conf).setBasePath(path).build();
existing = true;
} catch (TableNotFoundException dfe) {
// expected
diff --git a/hudi-cli/src/main/scala/org/apache/hudi/cli/DedupeSparkJob.scala
b/hudi-cli/src/main/scala/org/apache/hudi/cli/DedupeSparkJob.scala
index 96944c5..04c5f93 100644
--- a/hudi-cli/src/main/scala/org/apache/hudi/cli/DedupeSparkJob.scala
+++ b/hudi-cli/src/main/scala/org/apache/hudi/cli/DedupeSparkJob.scala
@@ -75,7 +75,7 @@ class DedupeSparkJob(basePath: String,
val tmpTableName = s"htbl_${System.currentTimeMillis()}"
val dedupeTblName = s"${tmpTableName}_dupeKeys"
- val metadata = new HoodieTableMetaClient(fs.getConf, basePath)
+ val metadata =
HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(basePath).build()
val allFiles = fs.listStatus(new
org.apache.hadoop.fs.Path(s"$basePath/$duplicatedPartitionPath"))
val fsView = new HoodieTableFileSystemView(metadata,
metadata.getActiveTimeline.getCommitTimeline.filterCompletedInstants(),
allFiles)
@@ -184,7 +184,7 @@ class DedupeSparkJob(basePath: String,
}
def fixDuplicates(dryRun: Boolean = true) = {
- val metadata = new HoodieTableMetaClient(fs.getConf, basePath)
+ val metadata =
HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(basePath).build()
val allFiles = fs.listStatus(new
Path(s"$basePath/$duplicatedPartitionPath"))
val fsView = new HoodieTableFileSystemView(metadata,
metadata.getActiveTimeline.getCommitTimeline.filterCompletedInstants(),
allFiles)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java
index 765965f..0266a65 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java
@@ -128,9 +128,9 @@ public abstract class AbstractHoodieClient implements
Serializable, AutoCloseabl
}
protected HoodieTableMetaClient createMetaClient(boolean
loadActiveTimelineOnLoad) {
- return new HoodieTableMetaClient(hadoopConf, config.getBasePath(),
loadActiveTimelineOnLoad,
- config.getConsistencyGuardConfig(),
- Option.of(new
TimelineLayoutVersion(config.getTimelineLayoutVersion())));
+ return
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(config.getBasePath())
+
.setLoadActiveTimelineOnLoad(loadActiveTimelineOnLoad).setConsistencyGuardConfig(config.getConsistencyGuardConfig())
+ .setLayoutVersion(Option.of(new
TimelineLayoutVersion(config.getTimelineLayoutVersion()))).build();
}
public Option<EmbeddedTimelineService> getTimelineServer() {
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 1f76a5e..be1fc3a 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -106,7 +106,7 @@ public abstract class HoodieBackedTableMetadataWriter
implements HoodieTableMeta
ValidationUtils.checkArgument(!this.metadataWriteConfig.useFileListingMetadata(),
"File listing cannot be used for Metadata Table");
initRegistry();
- HoodieTableMetaClient datasetMetaClient = new
HoodieTableMetaClient(hadoopConf, datasetWriteConfig.getBasePath());
+ HoodieTableMetaClient datasetMetaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(datasetWriteConfig.getBasePath()).build();
initialize(engineContext, datasetMetaClient);
if (enabled) {
// This is always called even in case the table was created for the
first time. This is because
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
index 6b7c4a6..79a3106 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
@@ -43,13 +43,10 @@ public abstract class HoodieFlinkTable<T extends
HoodieRecordPayload>
}
public static <T extends HoodieRecordPayload> HoodieFlinkTable<T>
create(HoodieWriteConfig config, HoodieFlinkEngineContext context) {
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(
- context.getHadoopConf().get(),
- config.getBasePath(),
- true,
- config.getConsistencyGuardConfig(),
- Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion()))
- );
+ HoodieTableMetaClient metaClient =
+
HoodieTableMetaClient.builder().setConf(context.getHadoopConf().get()).setBasePath(config.getBasePath())
+
.setLoadActiveTimelineOnLoad(true).setConsistencyGuardConfig(config.getConsistencyGuardConfig())
+ .setLayoutVersion(Option.of(new
TimelineLayoutVersion(config.getTimelineLayoutVersion()))).build();
return HoodieFlinkTable.create(config, context, metaClient);
}
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java
index bd8f954..219dec4 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java
@@ -42,13 +42,10 @@ public abstract class HoodieJavaTable<T extends
HoodieRecordPayload>
}
public static <T extends HoodieRecordPayload> HoodieJavaTable<T>
create(HoodieWriteConfig config, HoodieEngineContext context) {
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(
- context.getHadoopConf().get(),
- config.getBasePath(),
- true,
- config.getConsistencyGuardConfig(),
- Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion()))
- );
+ HoodieTableMetaClient metaClient =
+
HoodieTableMetaClient.builder().setConf(context.getHadoopConf().get()).setBasePath(config.getBasePath())
+
.setLoadActiveTimelineOnLoad(true).setConsistencyGuardConfig(config.getConsistencyGuardConfig())
+ .setLayoutVersion(Option.of(new
TimelineLayoutVersion(config.getTimelineLayoutVersion()))).build();
return HoodieJavaTable.create(config, (HoodieJavaEngineContext) context,
metaClient);
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java
index 4fb9f22..f8cc757 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java
@@ -97,7 +97,7 @@ public class HoodieReadClient<T extends HoodieRecordPayload>
implements Serializ
this.hadoopConf = context.getHadoopConf().get();
final String basePath = clientConfig.getBasePath();
// Create a Hoodie table which encapsulated the commits and files visible
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
basePath, true);
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
this.hoodieTable = HoodieSparkTable.create(clientConfig, context,
metaClient);
this.index = SparkHoodieIndex.createIndex(clientConfig);
this.sqlContextOpt = Option.empty();
@@ -199,7 +199,7 @@ public class HoodieReadClient<T extends
HoodieRecordPayload> implements Serializ
*/
public List<Pair<String, HoodieCompactionPlan>> getPendingCompactions() {
HoodieTableMetaClient metaClient =
- new HoodieTableMetaClient(hadoopConf,
hoodieTable.getMetaClient().getBasePath(), true);
+
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(hoodieTable.getMetaClient().getBasePath()).setLoadActiveTimelineOnLoad(true).build();
return CompactionUtils.getAllPendingCompactionPlans(metaClient).stream()
.map(
instantWorkloadPair ->
Pair.of(instantWorkloadPair.getKey().getTimestamp(),
instantWorkloadPair.getValue()))
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
index dd8106f..70a57b7 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
@@ -42,13 +42,10 @@ public abstract class HoodieSparkTable<T extends
HoodieRecordPayload>
}
public static <T extends HoodieRecordPayload> HoodieSparkTable<T>
create(HoodieWriteConfig config, HoodieEngineContext context) {
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(
- context.getHadoopConf().get(),
- config.getBasePath(),
- true,
- config.getConsistencyGuardConfig(),
- Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion()))
- );
+ HoodieTableMetaClient metaClient =
+
HoodieTableMetaClient.builder().setConf(context.getHadoopConf().get()).setBasePath(config.getBasePath())
+
.setLoadActiveTimelineOnLoad(true).setConsistencyGuardConfig(config.getConsistencyGuardConfig())
+ .setLayoutVersion(Option.of(new
TimelineLayoutVersion(config.getTimelineLayoutVersion()))).build();
return HoodieSparkTable.create(config, (HoodieSparkEngineContext) context,
metaClient);
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java
index e59a950..67d8257 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java
@@ -135,7 +135,7 @@ public class TestCompactionAdminClient extends
HoodieClientTestBase {
int expNumRepairs) throws Exception {
List<Pair<HoodieLogFile, HoodieLogFile>> renameFiles =
validateUnSchedulePlan(client, ingestionInstant, compactionInstant,
numEntriesPerInstant, expNumRepairs, true);
- metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(),
basePath, true);
+ metaClient =
HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
List<ValidationOpResult> result =
client.validateCompactionPlan(metaClient, compactionInstant, 1);
if (expNumRepairs > 0) {
assertTrue(result.stream().anyMatch(r -> !r.isSuccess()), "Expect some
failures in validation");
@@ -176,7 +176,7 @@ public class TestCompactionAdminClient extends
HoodieClientTestBase {
* @param compactionInstant Compaction Instant
*/
private void ensureValidCompactionPlan(String compactionInstant) throws
Exception {
- metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(),
basePath, true);
+ metaClient =
HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
// Ensure compaction-plan is good to begin with
List<ValidationOpResult> validationResults =
client.validateCompactionPlan(metaClient, compactionInstant, 1);
assertFalse(validationResults.stream().anyMatch(v -> !v.isSuccess()),
@@ -234,7 +234,7 @@ public class TestCompactionAdminClient extends
HoodieClientTestBase {
// Check suggested rename operations
List<Pair<HoodieLogFile, HoodieLogFile>> renameFiles =
client.getRenamingActionsForUnschedulingCompactionPlan(metaClient,
compactionInstant, 1, Option.empty(), false);
- metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(),
basePath, true);
+ metaClient =
HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
// Log files belonging to file-slices created because of compaction
request must be renamed
@@ -270,7 +270,7 @@ public class TestCompactionAdminClient extends
HoodieClientTestBase {
client.unscheduleCompactionPlan(compactionInstant, false, 1, false);
- metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(),
basePath, true);
+ metaClient =
HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
final HoodieTableFileSystemView newFsView =
new HoodieTableFileSystemView(metaClient,
metaClient.getCommitsAndCompactionTimeline());
// Expect all file-slice whose base-commit is same as compaction commit to
contain no new Log files
@@ -306,7 +306,7 @@ public class TestCompactionAdminClient extends
HoodieClientTestBase {
// Check suggested rename operations
List<Pair<HoodieLogFile, HoodieLogFile>> renameFiles = client
.getRenamingActionsForUnschedulingCompactionOperation(metaClient,
compactionInstant, op, Option.empty(), false);
- metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(),
basePath, true);
+ metaClient =
HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
// Log files belonging to file-slices created because of compaction
request must be renamed
@@ -331,7 +331,7 @@ public class TestCompactionAdminClient extends
HoodieClientTestBase {
// Call the main unschedule API
client.unscheduleCompactionFileId(op.getFileGroupId(), false, false);
- metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(),
basePath, true);
+ metaClient =
HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
final HoodieTableFileSystemView newFsView =
new HoodieTableFileSystemView(metaClient,
metaClient.getCommitsAndCompactionTimeline());
// Expect all file-slice whose base-commit is same as compaction commit to
contain no new Log files
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
index b4a392e..c81aa11 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
@@ -410,7 +410,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
final HoodieWriteConfig cfg = hoodieWriteConfig;
final String instantTime = "007";
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build();
String basePathStr = basePath;
HoodieTable table = getHoodieTable(metaClient, cfg);
jsc.parallelize(Arrays.asList(1)).map(e -> {
@@ -894,7 +894,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
assertNoWriteErrors(statuses);
assertEquals(2, statuses.size(), "2 files needs to be committed.");
- HoodieTableMetaClient metadata = new HoodieTableMetaClient(hadoopConf,
basePath);
+ HoodieTableMetaClient metadata =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
HoodieTable table = getHoodieTable(metadata, config);
BaseFileOnlyView fileSystemView = table.getBaseFileOnlyView();
@@ -1001,7 +1001,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
+ readRowKeysFromParquet(hadoopConf, new Path(basePath,
statuses.get(1).getStat().getPath())).size(),
"file should contain 340 records");
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
basePath);
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
HoodieTable table = getHoodieTable(metaClient, config);
List<HoodieBaseFile> files = table.getBaseFileOnlyView()
.getLatestBaseFilesBeforeOrOn(testPartitionPath,
commitTime3).collect(Collectors.toList());
@@ -1428,7 +1428,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).build();
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) {
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
basePath);
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
HoodieSparkTable table = HoodieSparkTable.create(cfg, context,
metaClient);
String instantTime = "000";
@@ -1533,7 +1533,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testConsistencyCheckDuringFinalize(boolean
enableOptimisticConsistencyGuard) throws Exception {
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
basePath);
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
String instantTime = "000";
HoodieWriteConfig cfg =
getConfigBuilder().withAutoCommit(false).withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder()
.withEnableOptimisticConsistencyGuard(enableOptimisticConsistencyGuard).build()).build();
@@ -1559,7 +1559,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
private void testRollbackAfterConsistencyCheckFailureUsingFileList(boolean
rollbackUsingMarkers, boolean enableOptimisticConsistencyGuard) throws
Exception {
String instantTime = "000";
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
basePath);
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
HoodieWriteConfig cfg = !enableOptimisticConsistencyGuard ?
getConfigBuilder().withRollbackUsingMarkers(rollbackUsingMarkers).withAutoCommit(false)
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true)
.withMaxConsistencyCheckIntervalMs(1).withInitialConsistencyCheckIntervalMs(1).withEnableOptimisticConsistencyGuard(enableOptimisticConsistencyGuard).build()).build()
:
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiFS.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiFS.java
index 34daed7..b0bd54f 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiFS.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiFS.java
@@ -94,7 +94,7 @@ public class TestMultiFS extends HoodieClientTestHarness {
// Read from hdfs
FileSystem fs = FSUtils.getFs(dfsBasePath,
HoodieTestUtils.getDefaultHadoopConf());
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(fs.getConf(), dfsBasePath);
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(dfsBasePath).build();
HoodieTimeline timeline = new
HoodieActiveTimeline(metaClient).getCommitTimeline();
Dataset<Row> readRecords = HoodieClientTestUtils.readCommit(dfsBasePath,
sqlContext, timeline, readCommitTime);
assertEquals(readRecords.count(), records.size(), "Should contain 100
records");
@@ -112,7 +112,7 @@ public class TestMultiFS extends HoodieClientTestHarness {
LOG.info("Reading from path: " + tablePath);
fs = FSUtils.getFs(tablePath, HoodieTestUtils.getDefaultHadoopConf());
- metaClient = new HoodieTableMetaClient(fs.getConf(), tablePath);
+ metaClient =
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build();
timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
Dataset<Row> localReadRecords =
HoodieClientTestUtils.readCommit(tablePath, sqlContext, timeline,
writeCommitTime);
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java
index c238fc0..f822e85 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java
@@ -118,13 +118,13 @@ public class TestHoodieBackedMetadata extends
HoodieClientTestHarness {
// Metadata table should not exist until created for the first time
assertFalse(fs.exists(new Path(metadataTableBasePath)), "Metadata table
should not exist");
- assertThrows(TableNotFoundException.class, () -> new
HoodieTableMetaClient(hadoopConf, metadataTableBasePath));
+ assertThrows(TableNotFoundException.class, () ->
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build());
// Metadata table is not created if disabled by config
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
getWriteConfig(true, false))) {
client.startCommitWithTime("001");
assertFalse(fs.exists(new Path(metadataTableBasePath)), "Metadata table
should not be created");
- assertThrows(TableNotFoundException.class, () -> new
HoodieTableMetaClient(hadoopConf, metadataTableBasePath));
+ assertThrows(TableNotFoundException.class, () ->
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build());
}
// Metadata table created when enabled by config & sync is called
@@ -565,8 +565,8 @@ public class TestHoodieBackedMetadata extends
HoodieClientTestHarness {
}
}
- HoodieTableMetaClient metadataMetaClient = new
HoodieTableMetaClient(hadoopConf, metadataTableBasePath);
- HoodieTableMetaClient datasetMetaClient = new
HoodieTableMetaClient(hadoopConf, config.getBasePath());
+ HoodieTableMetaClient metadataMetaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build();
+ HoodieTableMetaClient datasetMetaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(config.getBasePath()).build();
HoodieActiveTimeline metadataTimeline =
metadataMetaClient.getActiveTimeline();
// check that there are compactions.
assertTrue(metadataTimeline.getCommitTimeline().filterCompletedInstants().countInstants()
> 0);
@@ -869,7 +869,7 @@ public class TestHoodieBackedMetadata extends
HoodieClientTestHarness {
// Metadata table should be in sync with the dataset
assertTrue(metadata(client).isInSync());
- HoodieTableMetaClient metadataMetaClient = new
HoodieTableMetaClient(hadoopConf, metadataTableBasePath);
+ HoodieTableMetaClient metadataMetaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build();
// Metadata table is MOR
assertEquals(metadataMetaClient.getTableType(),
HoodieTableType.MERGE_ON_READ, "Metadata Table should be MOR");
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java
index 0f6a150..b8bccbc 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java
@@ -88,7 +88,7 @@ public class CompactionTestBase extends HoodieClientTestBase {
**/
protected void validateDeltaCommit(String latestDeltaCommit, final
Map<HoodieFileGroupId, Pair<String, HoodieCompactionOperation>>
fgIdToCompactionOperation,
HoodieWriteConfig cfg) {
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
cfg.getBasePath());
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
HoodieTable table = getHoodieTable(metaClient, cfg);
List<FileSlice> fileSliceList = getCurrentLatestFileSlices(table);
fileSliceList.forEach(fileSlice -> {
@@ -109,7 +109,7 @@ public class CompactionTestBase extends
HoodieClientTestBase {
List<HoodieRecord> records,
HoodieWriteConfig cfg, boolean insertFirst, List<String>
expPendingCompactionInstants)
throws Exception {
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
cfg.getBasePath());
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
List<Pair<String, HoodieCompactionPlan>> pendingCompactions =
readClient.getPendingCompactions();
List<String> gotPendingCompactionInstants =
pendingCompactions.stream().map(pc ->
pc.getKey()).sorted().collect(Collectors.toList());
@@ -131,7 +131,7 @@ public class CompactionTestBase extends
HoodieClientTestBase {
client.commit(firstInstant, statuses);
}
assertNoWriteErrors(statusList);
- metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+ metaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
HoodieTable hoodieTable = getHoodieTable(metaClient, cfg);
List<HoodieBaseFile> dataFilesToRead =
getCurrentLatestBaseFiles(hoodieTable);
assertTrue(dataFilesToRead.stream().findAny().isPresent(),
@@ -142,7 +142,7 @@ public class CompactionTestBase extends
HoodieClientTestBase {
int numRecords = records.size();
for (String instantTime : deltaInstants) {
records = dataGen.generateUpdates(instantTime, numRecords);
- metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+ metaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
createNextDeltaCommit(instantTime, records, client, metaClient, cfg,
false);
validateDeltaCommit(instantTime, fgIdToCompactionOperation, cfg);
}
@@ -150,7 +150,7 @@ public class CompactionTestBase extends
HoodieClientTestBase {
}
protected void moveCompactionFromRequestedToInflight(String
compactionInstantTime, HoodieWriteConfig cfg) {
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
cfg.getBasePath());
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
HoodieInstant compactionInstant =
HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime);
metaClient.getActiveTimeline().transitionCompactionRequestedToInflight(compactionInstant);
HoodieInstant instant =
metaClient.getActiveTimeline().reload().filterPendingCompactionTimeline().getInstants()
@@ -160,7 +160,7 @@ public class CompactionTestBase extends
HoodieClientTestBase {
protected void scheduleCompaction(String compactionInstantTime,
SparkRDDWriteClient client, HoodieWriteConfig cfg) {
client.scheduleCompactionAtInstant(compactionInstantTime, Option.empty());
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
cfg.getBasePath());
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
HoodieInstant instant =
metaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().get();
assertEquals(compactionInstantTime, instant.getTimestamp(), "Last
compaction instant must be the one set");
}
@@ -192,7 +192,7 @@ public class CompactionTestBase extends
HoodieClientTestBase {
}
// verify that there is a commit
- table = getHoodieTable(new HoodieTableMetaClient(hadoopConf,
cfg.getBasePath(), true), cfg);
+ table =
getHoodieTable(HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).setLoadActiveTimelineOnLoad(true).build(),
cfg);
HoodieTimeline timeline =
table.getMetaClient().getCommitTimeline().filterCompletedInstants();
String latestCompactionCommitTime =
timeline.lastInstant().get().getTimestamp();
assertEquals(latestCompactionCommitTime, compactionInstantTime,
@@ -214,7 +214,7 @@ public class CompactionTestBase extends
HoodieClientTestBase {
"Compacted files should not show up in latest slices");
// verify that there is a commit
- table = getHoodieTable(new HoodieTableMetaClient(hadoopConf,
cfg.getBasePath(), true), cfg);
+ table =
getHoodieTable(HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).setLoadActiveTimelineOnLoad(true).build(),
cfg);
HoodieTimeline timeline =
table.getMetaClient().getCommitTimeline().filterCompletedInstants();
// verify compaction commit is visible in timeline
assertTrue(timeline.filterCompletedInstants().getInstants()
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
index 08f9283..6e8326c 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
@@ -75,7 +75,7 @@ public class TestAsyncCompaction extends CompactionTestBase {
// Schedule compaction but do not run them
scheduleCompaction(compactionInstantTime, client, cfg);
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
cfg.getBasePath());
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
HoodieInstant pendingCompactionInstant =
metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get();
@@ -86,12 +86,12 @@ public class TestAsyncCompaction extends CompactionTestBase
{
moveCompactionFromRequestedToInflight(compactionInstantTime, cfg);
// Reload and rollback inflight compaction
- metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+ metaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context,
metaClient);
client.rollbackInflightCompaction(
new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION,
compactionInstantTime), hoodieTable);
- metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+ metaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
pendingCompactionInstant =
metaClient.getCommitsAndCompactionTimeline().filterPendingCompactionTimeline()
.getInstants().findFirst().get();
assertEquals("compaction", pendingCompactionInstant.getAction());
@@ -129,10 +129,10 @@ public class TestAsyncCompaction extends
CompactionTestBase {
// Schedule compaction but do not run them
scheduleCompaction(compactionInstantTime, client, cfg);
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
cfg.getBasePath());
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
createNextDeltaCommit(inflightInstantTime, records, client, metaClient,
cfg, true);
- metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+ metaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
HoodieInstant pendingCompactionInstant =
metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get();
assertEquals(compactionInstantTime,
pendingCompactionInstant.getTimestamp(),
@@ -145,7 +145,7 @@ public class TestAsyncCompaction extends CompactionTestBase
{
client.startCommitWithTime(nextInflightInstantTime);
// Validate
- metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+ metaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
inflightInstant =
metaClient.getActiveTimeline().filterPendingExcludingCompaction().firstInstant().get();
assertEquals(inflightInstant.getTimestamp(), nextInflightInstantTime,
"inflight instant has expected instant time");
assertEquals(1, metaClient.getActiveTimeline()
@@ -177,7 +177,7 @@ public class TestAsyncCompaction extends CompactionTestBase
{
new ArrayList<>());
// Schedule and mark compaction instant as inflight
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
cfg.getBasePath());
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
HoodieTable hoodieTable = getHoodieTable(metaClient, cfg);
scheduleCompaction(compactionInstantTime, client, cfg);
moveCompactionFromRequestedToInflight(compactionInstantTime, cfg);
@@ -210,7 +210,7 @@ public class TestAsyncCompaction extends CompactionTestBase
{
// Schedule compaction but do not run them
scheduleCompaction(compactionInstantTime, client, cfg);
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
cfg.getBasePath());
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
HoodieInstant pendingCompactionInstant =
metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get();
assertEquals(compactionInstantTime,
pendingCompactionInstant.getTimestamp(), "Pending Compaction instant has
expected instant time");
@@ -239,10 +239,10 @@ public class TestAsyncCompaction extends
CompactionTestBase {
records = runNextDeltaCommits(client, readClient,
Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true,
new ArrayList<>());
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
cfg.getBasePath());
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
createNextDeltaCommit(inflightInstantTime, records, client, metaClient,
cfg, true);
- metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+ metaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
HoodieInstant inflightInstant =
metaClient.getActiveTimeline().filterPendingExcludingCompaction().firstInstant().get();
assertEquals(inflightInstantTime, inflightInstant.getTimestamp(),
"inflight instant has expected instant time");
@@ -304,7 +304,7 @@ public class TestAsyncCompaction extends CompactionTestBase
{
runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime,
secondInstantTime), records, cfg, true,
new ArrayList<>());
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
cfg.getBasePath());
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
HoodieTable hoodieTable = getHoodieTable(metaClient, cfg);
scheduleAndExecuteCompaction(compactionInstantTime, client, hoodieTable,
cfg, numRecs, false);
}
@@ -328,7 +328,7 @@ public class TestAsyncCompaction extends CompactionTestBase
{
records = runNextDeltaCommits(client, readClient,
Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true,
new ArrayList<>());
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
cfg.getBasePath());
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
HoodieTable hoodieTable = getHoodieTable(metaClient, cfg);
scheduleCompaction(compactionInstantTime, client, cfg);
@@ -356,7 +356,7 @@ public class TestAsyncCompaction extends CompactionTestBase
{
runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime,
secondInstantTime), records, cfg, true,
new ArrayList<>());
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
cfg.getBasePath());
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
HoodieTable hoodieTable = getHoodieTable(metaClient, cfg);
scheduleCompaction(compactionInstantTime, client, cfg);
metaClient.reloadActiveTimeline();
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
index 80542ed..97d2875 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
@@ -57,7 +57,7 @@ public class TestInlineCompaction extends CompactionTestBase {
HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
List<String> instants = IntStream.range(0, 2).mapToObj(i ->
HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList());
runNextDeltaCommits(writeClient, readClient, instants, records, cfg,
true, new ArrayList<>());
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
cfg.getBasePath());
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
// Then: ensure no compaction is executedm since there are only 2 delta
commits
assertEquals(2,
metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
@@ -76,12 +76,12 @@ public class TestInlineCompaction extends
CompactionTestBase {
runNextDeltaCommits(writeClient, readClient, instants, records, cfg,
true, new ArrayList<>());
// third commit, that will trigger compaction
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
cfg.getBasePath());
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
String finalInstant = HoodieActiveTimeline.createNewInstantTime();
createNextDeltaCommit(finalInstant,
dataGen.generateUpdates(finalInstant, 100), writeClient, metaClient, cfg,
false);
// Then: ensure the file slices are compacted as per policy
- metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+ metaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
assertEquals(4,
metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
assertEquals(HoodieTimeline.COMMIT_ACTION,
metaClient.getActiveTimeline().lastInstant().get().getAction());
}
@@ -100,11 +100,11 @@ public class TestInlineCompaction extends
CompactionTestBase {
// after 10s, that will trigger compaction
String finalInstant = HoodieActiveTimeline.createNewInstantTime(10000);
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
cfg.getBasePath());
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
createNextDeltaCommit(finalInstant,
dataGen.generateUpdates(finalInstant, 100), writeClient, metaClient, cfg,
false);
// Then: ensure the file slices are compacted as per policy
- metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+ metaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
assertEquals(3,
metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
assertEquals(HoodieTimeline.COMMIT_ACTION,
metaClient.getActiveTimeline().lastInstant().get().getAction());
}
@@ -121,17 +121,17 @@ public class TestInlineCompaction extends
CompactionTestBase {
runNextDeltaCommits(writeClient, readClient, instants, records, cfg,
true, new ArrayList<>());
// Then: trigger the compaction because reach 3 commits.
String finalInstant = HoodieActiveTimeline.createNewInstantTime();
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
cfg.getBasePath());
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
createNextDeltaCommit(finalInstant,
dataGen.generateUpdates(finalInstant, 10), writeClient, metaClient, cfg, false);
- metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+ metaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
assertEquals(4,
metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
// 4th commit, that will trigger compaction because reach the time
elapsed
- metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+ metaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
finalInstant = HoodieActiveTimeline.createNewInstantTime(20000);
createNextDeltaCommit(finalInstant,
dataGen.generateUpdates(finalInstant, 10), writeClient, metaClient, cfg, false);
- metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+ metaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
assertEquals(6,
metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
}
}
@@ -145,16 +145,16 @@ public class TestInlineCompaction extends
CompactionTestBase {
HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
List<String> instants = IntStream.range(0, 3).mapToObj(i ->
HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList());
runNextDeltaCommits(writeClient, readClient, instants, records, cfg,
true, new ArrayList<>());
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
cfg.getBasePath());
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
// Then: ensure no compaction is executedm since there are only 3 delta
commits
assertEquals(3,
metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
// 4th commit, that will trigger compaction
- metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+ metaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
String finalInstant = HoodieActiveTimeline.createNewInstantTime(20000);
createNextDeltaCommit(finalInstant,
dataGen.generateUpdates(finalInstant, 10), writeClient, metaClient, cfg, false);
- metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+ metaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
assertEquals(5,
metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
}
}
@@ -183,12 +183,12 @@ public class TestInlineCompaction extends
CompactionTestBase {
HoodieWriteConfig inlineCfg = getConfigForInlineCompaction(2, 60,
CompactionTriggerStrategy.NUM_COMMITS);
String instantTime3 = HoodieActiveTimeline.createNewInstantTime();
try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(inlineCfg))
{
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
cfg.getBasePath());
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
createNextDeltaCommit(instantTime3,
dataGen.generateUpdates(instantTime3, 100), writeClient, metaClient, inlineCfg,
false);
}
// Then: 1 delta commit is done, the failed compaction is retried
- metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+ metaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
assertEquals(4,
metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
assertEquals(instantTime2,
metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().firstInstant().get().getTimestamp());
}
@@ -218,13 +218,13 @@ public class TestInlineCompaction extends
CompactionTestBase {
HoodieWriteConfig inlineCfg = getConfigForInlineCompaction(5, 10,
CompactionTriggerStrategy.TIME_ELAPSED);
String instantTime2;
try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(inlineCfg))
{
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
cfg.getBasePath());
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
instantTime2 = HoodieActiveTimeline.createNewInstantTime();
createNextDeltaCommit(instantTime2,
dataGen.generateUpdates(instantTime2, 10), writeClient, metaClient, inlineCfg,
false);
}
// Then: 1 delta commit is done, the failed compaction is retried
- metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+ metaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
assertEquals(4,
metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
assertEquals(instantTime,
metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().firstInstant().get().getTimestamp());
}
@@ -255,13 +255,13 @@ public class TestInlineCompaction extends
CompactionTestBase {
HoodieWriteConfig inlineCfg = getConfigForInlineCompaction(3, 20,
CompactionTriggerStrategy.NUM_OR_TIME);
String instantTime2;
try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(inlineCfg))
{
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
cfg.getBasePath());
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
instantTime2 = HoodieActiveTimeline.createNewInstantTime();
createNextDeltaCommit(instantTime2,
dataGen.generateUpdates(instantTime2, 10), writeClient, metaClient, inlineCfg,
false);
}
// Then: 1 delta commit is done, the failed compaction is retried
- metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+ metaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
assertEquals(4,
metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
assertEquals(instantTime,
metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().firstInstant().get().getTimestamp());
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
index 1caf9c0..1104631 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
@@ -220,7 +220,7 @@ public class HoodieClientTestBase extends
HoodieClientTestHarness {
return (commit, numRecords) -> {
final SparkHoodieIndex index = SparkHoodieIndex.createIndex(writeConfig);
List<HoodieRecord> records = recordGenFunction.apply(commit, numRecords);
- final HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(hadoopConf, basePath, true);
+ final HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
HoodieSparkTable table = HoodieSparkTable.create(writeConfig, context,
metaClient);
JavaRDD<HoodieRecord> taggedRecords =
index.tagLocation(jsc.parallelize(records, 1), context, table);
return taggedRecords.collect();
@@ -241,7 +241,7 @@ public class HoodieClientTestBase extends
HoodieClientTestHarness {
return (numRecords) -> {
final SparkHoodieIndex index = SparkHoodieIndex.createIndex(writeConfig);
List<HoodieKey> records = keyGenFunction.apply(numRecords);
- final HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(hadoopConf, basePath, true);
+ final HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
HoodieSparkTable table = HoodieSparkTable.create(writeConfig, context,
metaClient);
JavaRDD<HoodieRecord> recordsToDelete = jsc.parallelize(records, 1)
.map(key -> new HoodieRecord(key, new EmptyHoodieRecordPayload()));
@@ -438,7 +438,7 @@ public class HoodieClientTestBase extends
HoodieClientTestHarness {
assertPartitionMetadataForRecords(records, fs);
// verify that there is a commit
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
basePath);
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
HoodieTimeline timeline = new
HoodieActiveTimeline(metaClient).getCommitTimeline();
if (assertForCommit) {
@@ -506,7 +506,7 @@ public class HoodieClientTestBase extends
HoodieClientTestHarness {
assertPartitionMetadataForKeys(keysToDelete, fs);
// verify that there is a commit
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
basePath);
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
HoodieTimeline timeline = new
HoodieActiveTimeline(metaClient).getCommitTimeline();
if (assertForCommit) {
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
index e6523af..f3febab 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
@@ -346,7 +346,7 @@ public abstract class HoodieClientTestHarness extends
HoodieCommonTestHarness im
}
public HoodieTableMetaClient getHoodieMetaClient(Configuration conf, String
basePath) {
- metaClient = new HoodieTableMetaClient(conf, basePath);
+ metaClient =
HoodieTableMetaClient.builder().setConf(conf).setBasePath(basePath).build();
return metaClient;
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
index c91b51b..55c5aa7 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
@@ -151,7 +151,7 @@ public class HoodieClientTestUtils {
String... paths) {
List<HoodieBaseFile> latestFiles = new ArrayList<>();
try {
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(fs.getConf(), basePath, true);
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
for (String path : paths) {
BaseFileOnlyView fileSystemView = new
HoodieTableFileSystemView(metaClient,
metaClient.getCommitsTimeline().filterCompletedInstants(),
fs.globStatus(new Path(path)));
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java
index 5633551..5b37b3b 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java
@@ -66,7 +66,7 @@ public class HoodieMergeOnReadTestUtils {
public static List<GenericRecord> getRecordsUsingInputFormat(Configuration
conf, List<String> inputPaths, String basePath, JobConf jobConf, boolean
realtime, Schema rawSchema,
String
rawHiveColumnTypes, boolean projectCols, List<String> projectedColumns) {
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(conf,
basePath);
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(conf).setBasePath(basePath).build();
FileInputFormat inputFormat =
HoodieInputFormatUtils.getInputFormat(metaClient.getTableConfig().getBaseFileFormat(),
realtime, jobConf);
Schema schema = HoodieAvroUtils.addMetadataFields(rawSchema);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index 32ed7d9..983678d 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -94,26 +94,7 @@ public class HoodieTableMetaClient implements Serializable {
private HoodieArchivedTimeline archivedTimeline;
private ConsistencyGuardConfig consistencyGuardConfig =
ConsistencyGuardConfig.newBuilder().build();
- public HoodieTableMetaClient(Configuration conf, String basePath) {
- // Do not load any timeline by default
- this(conf, basePath, false);
- }
-
- public HoodieTableMetaClient(Configuration conf, String basePath, String
payloadClassName) {
- this(conf, basePath, false, ConsistencyGuardConfig.newBuilder().build(),
Option.of(TimelineLayoutVersion.CURR_LAYOUT_VERSION),
- payloadClassName);
- }
-
- public HoodieTableMetaClient(Configuration conf, String basePath, boolean
loadActiveTimelineOnLoad,
- ConsistencyGuardConfig consistencyGuardConfig,
Option<TimelineLayoutVersion> layoutVersion) {
- this(conf, basePath, loadActiveTimelineOnLoad, consistencyGuardConfig,
layoutVersion, null);
- }
-
- public HoodieTableMetaClient(Configuration conf, String basePath, boolean
loadActiveTimelineOnLoad) {
- this(conf, basePath, loadActiveTimelineOnLoad,
ConsistencyGuardConfig.newBuilder().build(),
Option.of(TimelineLayoutVersion.CURR_LAYOUT_VERSION), null);
- }
-
- public HoodieTableMetaClient(Configuration conf, String basePath, boolean
loadActiveTimelineOnLoad,
+ private HoodieTableMetaClient(Configuration conf, String basePath, boolean
loadActiveTimelineOnLoad,
ConsistencyGuardConfig consistencyGuardConfig,
Option<TimelineLayoutVersion> layoutVersion,
String payloadClassName) {
LOG.info("Loading HoodieTableMetaClient from " + basePath);
@@ -152,9 +133,8 @@ public class HoodieTableMetaClient implements Serializable {
public HoodieTableMetaClient() {}
public static HoodieTableMetaClient reload(HoodieTableMetaClient
oldMetaClient) {
- return new HoodieTableMetaClient(oldMetaClient.hadoopConf.get(),
oldMetaClient.basePath,
- oldMetaClient.loadActiveTimelineOnLoad,
oldMetaClient.consistencyGuardConfig,
- Option.of(oldMetaClient.timelineLayoutVersion), null);
+ return
HoodieTableMetaClient.builder().setConf(oldMetaClient.hadoopConf.get()).setBasePath(oldMetaClient.basePath).setLoadActiveTimelineOnLoad(oldMetaClient.loadActiveTimelineOnLoad)
+
.setConsistencyGuardConfig(oldMetaClient.consistencyGuardConfig).setLayoutVersion(Option.of(oldMetaClient.timelineLayoutVersion)).setPayloadClassName(null).build();
}
/**
@@ -471,7 +451,7 @@ public class HoodieTableMetaClient implements Serializable {
HoodieTableConfig.createHoodieProperties(fs, metaPathDir, props);
// We should not use fs.getConf as this might be different from the
original configuration
// used to create the fs in unit tests
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf,
basePath);
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
LOG.info("Finished initializing Table of type " +
metaClient.getTableConfig().getTableType() + " from " + basePath);
return metaClient;
}
@@ -645,4 +625,59 @@ public class HoodieTableMetaClient implements Serializable
{
public void setActiveTimeline(HoodieActiveTimeline activeTimeline) {
this.activeTimeline = activeTimeline;
}
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /**
+ * Builder for {@link HoodieTableMetaClient}.
+ */
+ public static class Builder {
+
+ private Configuration conf;
+ private String basePath;
+ private boolean loadActiveTimelineOnLoad = false;
+ private String payloadClassName = null;
+ private ConsistencyGuardConfig consistencyGuardConfig =
ConsistencyGuardConfig.newBuilder().build();
+ private Option<TimelineLayoutVersion> layoutVersion =
Option.of(TimelineLayoutVersion.CURR_LAYOUT_VERSION);
+
+ public Builder setConf(Configuration conf) {
+ this.conf = conf;
+ return this;
+ }
+
+ public Builder setBasePath(String basePath) {
+ this.basePath = basePath;
+ return this;
+ }
+
+ public Builder setLoadActiveTimelineOnLoad(boolean
loadActiveTimelineOnLoad) {
+ this.loadActiveTimelineOnLoad = loadActiveTimelineOnLoad;
+ return this;
+ }
+
+ public Builder setPayloadClassName(String payloadClassName) {
+ this.payloadClassName = payloadClassName;
+ return this;
+ }
+
+ public Builder setConsistencyGuardConfig(ConsistencyGuardConfig
consistencyGuardConfig) {
+ this.consistencyGuardConfig = consistencyGuardConfig;
+ return this;
+ }
+
+ public Builder setLayoutVersion(Option<TimelineLayoutVersion>
layoutVersion) {
+ this.layoutVersion = layoutVersion;
+ return this;
+ }
+
+ public HoodieTableMetaClient build() {
+ ValidationUtils.checkArgument(conf != null, "Configuration needs to be
set to init HoodieTableMetaClient");
+ ValidationUtils.checkArgument(basePath != null, "basePath needs to be
set to init HoodieTableMetaClient");
+ return new HoodieTableMetaClient(conf, basePath,
+ loadActiveTimelineOnLoad, consistencyGuardConfig, layoutVersion,
payloadClassName);
+ }
+ }
+
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
index 6fb0a05..02874e6 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
@@ -109,7 +109,7 @@ public abstract class AbstractHoodieLogRecordScanner {
String latestInstantTime, boolean readBlocksLazily, boolean
reverseReader, int bufferSize) {
this.readerSchema = readerSchema;
this.latestInstantTime = latestInstantTime;
- this.hoodieTableMetaClient = new HoodieTableMetaClient(fs.getConf(),
basePath);
+ this.hoodieTableMetaClient =
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(basePath).build();
// load class from the payload fully qualified class name
this.payloadClassFQN =
this.hoodieTableMetaClient.getTableConfig().getPayloadClass();
this.totalLogFiles.addAndGet(logFilePaths.size());
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java
index ffc4b85..fe159df 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java
@@ -64,7 +64,7 @@ public class LogReaderUtils {
public static Schema readLatestSchemaFromLogFiles(String basePath,
List<String> deltaFilePaths, Configuration config)
throws IOException {
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(config,
basePath);
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(config).setBasePath(basePath).build();
List<String> deltaPaths = deltaFilePaths.stream().map(s -> new
HoodieLogFile(new Path(s)))
.sorted(HoodieLogFile.getReverseLogFileComparator()).map(s ->
s.getPath().toString())
.collect(Collectors.toList());
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java
index 6f0e7d5..f614df5 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java
@@ -94,7 +94,7 @@ public class FileSystemViewManager {
*/
public SyncableFileSystemView getFileSystemView(String basePath) {
return globalViewMap.computeIfAbsent(basePath, (path) -> {
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(conf.newCopy(), path);
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(conf.newCopy()).setBasePath(path).build();
return viewCreator.apply(metaClient, viewStorageConfig);
});
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
index c86b37e..b4143f3 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
@@ -71,7 +71,7 @@ public abstract class BaseTableMetadata implements
HoodieTableMetadata {
this.engineContext = engineContext;
this.hadoopConf = new
SerializableConfiguration(engineContext.getHadoopConf());
this.datasetBasePath = datasetBasePath;
- this.datasetMetaClient = new HoodieTableMetaClient(hadoopConf.get(),
datasetBasePath);
+ this.datasetMetaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf.get()).setBasePath(datasetBasePath).build();
this.spillableMapDirectory = spillableMapDirectory;
this.metadataConfig = metadataConfig;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index a34652c..3285606 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -93,7 +93,7 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
if (enabled && this.metaClient == null) {
this.metadataBasePath =
HoodieTableMetadata.getMetadataTableBasePath(datasetBasePath);
try {
- this.metaClient = new HoodieTableMetaClient(hadoopConf.get(),
metadataBasePath);
+ this.metaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf.get()).setBasePath(metadataBasePath).build();
HoodieTableFileSystemView fsView = new
HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline());
latestFileSystemMetadataSlices =
fsView.getLatestFileSlices(MetadataPartitionType.FILES.partitionPath()).collect(Collectors.toList());
} catch (TableNotFoundException e) {
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java
index d7e3bde..3ed111c 100755
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java
@@ -114,9 +114,10 @@ public class TestHoodieActiveTimeline extends
HoodieCommonTestHarness {
metaClient.getArchivePath(),
metaClient.getTableConfig().getPayloadClass(), VERSION_0);
HoodieInstant instant6 = new HoodieInstant(State.REQUESTED,
HoodieTimeline.COMPACTION_ACTION, "9");
byte[] dummy = new byte[5];
- HoodieActiveTimeline oldTimeline = new HoodieActiveTimeline(new
HoodieTableMetaClient(metaClient.getHadoopConf(),
- metaClient.getBasePath(), true, metaClient.getConsistencyGuardConfig(),
- Option.of(new TimelineLayoutVersion(VERSION_0))));
+ HoodieActiveTimeline oldTimeline = new HoodieActiveTimeline(
+
HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(metaClient.getBasePath())
+
.setLoadActiveTimelineOnLoad(true).setConsistencyGuardConfig(metaClient.getConsistencyGuardConfig())
+ .setLayoutVersion(Option.of(new
TimelineLayoutVersion(VERSION_0))).build());
// Old Timeline writes both to aux and timeline folder
oldTimeline.saveToCompactionRequested(instant6, Option.of(dummy));
// Now use latest timeline version
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java
index 146e0bb..0bcebaf 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java
@@ -324,7 +324,7 @@ public class TestIncrementalFSViewSync extends
HoodieCommonTestHarness {
instantsToFiles = testMultipleWriteSteps(view1,
Collections.singletonList("11"), true, "11");
SyncableFileSystemView view2 =
- getFileSystemView(new
HoodieTableMetaClient(metaClient.getHadoopConf(), metaClient.getBasePath()));
+
getFileSystemView(HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(metaClient.getBasePath()).build());
// Run 2 more ingestion on MOR table. View1 is not yet synced but View2 is
instantsToFiles.putAll(testMultipleWriteSteps(view2, Arrays.asList("12",
"13"), true, "11"));
@@ -334,7 +334,7 @@ public class TestIncrementalFSViewSync extends
HoodieCommonTestHarness {
view2.sync();
SyncableFileSystemView view3 =
- getFileSystemView(new
HoodieTableMetaClient(metaClient.getHadoopConf(), metaClient.getBasePath()));
+
getFileSystemView(HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(metaClient.getBasePath()).build());
view3.sync();
areViewsConsistent(view1, view2, partitions.size() *
fileIdsPerPartition.size());
@@ -346,7 +346,7 @@ public class TestIncrementalFSViewSync extends
HoodieCommonTestHarness {
view1.sync();
areViewsConsistent(view1, view2, partitions.size() *
fileIdsPerPartition.size());
SyncableFileSystemView view4 =
- getFileSystemView(new
HoodieTableMetaClient(metaClient.getHadoopConf(), metaClient.getBasePath()));
+
getFileSystemView(HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(metaClient.getBasePath()).build());
view4.sync();
/*
@@ -360,7 +360,7 @@ public class TestIncrementalFSViewSync extends
HoodieCommonTestHarness {
view1.sync();
areViewsConsistent(view1, view2, partitions.size() *
fileIdsPerPartition.size() * 2);
SyncableFileSystemView view5 =
- getFileSystemView(new
HoodieTableMetaClient(metaClient.getHadoopConf(), metaClient.getBasePath()));
+
getFileSystemView(HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(metaClient.getBasePath()).build());
view5.sync();
/*
@@ -383,7 +383,7 @@ public class TestIncrementalFSViewSync extends
HoodieCommonTestHarness {
view1.sync();
areViewsConsistent(view1, view2, partitions.size() *
fileIdsPerPartition.size() * 2);
SyncableFileSystemView view6 =
- getFileSystemView(new
HoodieTableMetaClient(metaClient.getHadoopConf(), metaClient.getBasePath()));
+
getFileSystemView(HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(metaClient.getBasePath()).build());
view6.sync();
/*
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/CompactionTestUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/CompactionTestUtils.java
index 44e3da0..fb5f123 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/CompactionTestUtils.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/CompactionTestUtils.java
@@ -110,7 +110,7 @@ public class CompactionTestUtils {
}
});
- metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(),
metaClient.getBasePath(), true);
+ metaClient =
HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(metaClient.getBasePath()).setLoadActiveTimelineOnLoad(true).build();
Map<HoodieFileGroupId, Pair<String, HoodieCompactionOperation>>
pendingCompactionMap =
CompactionUtils.getAllPendingCompactionOperations(metaClient);
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
index bca91f8..2cca148 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
@@ -224,7 +224,7 @@ public class FileCreateUtils {
public static Map<String, Long> getBaseFileCountsForPaths(String basePath,
FileSystem fs, String... paths) {
Map<String, Long> toReturn = new HashMap<>();
try {
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(fs.getConf(), basePath, true);
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
for (String path : paths) {
TableFileSystemView.BaseFileOnlyView fileSystemView = new
HoodieTableFileSystemView(metaClient,
metaClient.getCommitsTimeline().filterCompletedInstants(),
fs.globStatus(new org.apache.hadoop.fs.Path(path)));
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
index 25b2c8b..9738816 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
@@ -83,7 +83,7 @@ public class HoodieCommonTestHarness {
}
protected void refreshFsView() throws IOException {
- metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(),
basePath, true);
+ metaClient =
HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
}
protected SyncableFileSystemView getFileSystemView(HoodieTimeline timeline)
throws IOException {
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java
index 35ff4cb..92c40c7 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java
@@ -188,7 +188,7 @@ public class TestCompactionUtils extends
HoodieCommonTestHarness {
// schedule similar plan again so that there will be duplicates
plan1.getOperations().get(0).setDataFilePath("bla");
scheduleCompaction(metaClient, "005", plan1);
- metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(),
basePath, true);
+ metaClient =
HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
assertThrows(IllegalStateException.class, () -> {
CompactionUtils.getAllPendingCompactionOperations(metaClient);
});
@@ -203,7 +203,7 @@ public class TestCompactionUtils extends
HoodieCommonTestHarness {
scheduleCompaction(metaClient, "003", plan2);
// schedule same plan again so that there will be duplicates. It should
not fail as it is a full duplicate
scheduleCompaction(metaClient, "005", plan1);
- metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(),
basePath, true);
+ metaClient =
HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
Map<HoodieFileGroupId, Pair<String, HoodieCompactionOperation>> res =
CompactionUtils.getAllPendingCompactionOperations(metaClient);
}
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
index da45fa6..d94018b 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
@@ -171,7 +171,7 @@ public class HoodieROTablePathFilter implements
Configurable, PathFilter, Serial
try {
HoodieTableMetaClient metaClient =
metaClientCache.get(baseDir.toString());
if (null == metaClient) {
- metaClient = new HoodieTableMetaClient(fs.getConf(),
baseDir.toString(), true);
+ metaClient =
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(baseDir.toString()).setLoadActiveTimelineOnLoad(true).build();
metaClientCache.put(baseDir.toString(), metaClient);
}
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
index 050b91a..f378f44 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
@@ -71,7 +71,7 @@ public abstract class AbstractRealtimeRecordReader {
}
private boolean usesCustomPayload() {
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jobConf,
split.getBasePath());
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(jobConf).setBasePath(split.getBasePath()).build();
return
!(metaClient.getTableConfig().getPayloadClass().contains(HoodieAvroPayload.class.getName())
||
metaClient.getTableConfig().getPayloadClass().contains("org.apache.hudi.OverwriteWithLatestAvroPayload"));
}
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
index 9f98136..eac9f4d 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
@@ -324,7 +324,7 @@ public class HoodieInputFormatUtils {
}
Path baseDir = HoodieHiveUtils.getNthParent(dataPath, levels);
LOG.info("Reading hoodie metadata from path " + baseDir.toString());
- return new HoodieTableMetaClient(fs.getConf(), baseDir.toString());
+ return
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(baseDir.toString()).build();
}
public static FileStatus getFileStatus(HoodieBaseFile baseFile) throws
IOException {
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/CompactNode.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/CompactNode.java
index 7c9090d..8043066 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/CompactNode.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/CompactNode.java
@@ -45,8 +45,9 @@ public class CompactNode extends
DagNode<JavaRDD<WriteStatus>> {
*/
@Override
public void execute(ExecutionContext executionContext, int curItrCount)
throws Exception {
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(executionContext.getHoodieTestSuiteWriter().getConfiguration(),
- executionContext.getHoodieTestSuiteWriter().getCfg().targetBasePath);
+ HoodieTableMetaClient metaClient =
+
HoodieTableMetaClient.builder().setConf(executionContext.getHoodieTestSuiteWriter().getConfiguration()).setBasePath(executionContext.getHoodieTestSuiteWriter().getCfg().targetBasePath)
+ .build();
Option<HoodieInstant> lastInstant = metaClient.getActiveTimeline()
.getCommitsAndCompactionTimeline().filterPendingCompactionTimeline().lastInstant();
if (lastInstant.isPresent()) {
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/RollbackNode.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/RollbackNode.java
index 1824cb8..c8cb628 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/RollbackNode.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/RollbackNode.java
@@ -49,8 +49,9 @@ public class RollbackNode extends
DagNode<Option<HoodieInstant>> {
log.info("Executing rollback node {}", this.getName());
// Can only be done with an instantiation of a new WriteClient hence
cannot be done during DeltaStreamer
// testing for now
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(executionContext.getHoodieTestSuiteWriter().getConfiguration(),
- executionContext.getHoodieTestSuiteWriter().getCfg().targetBasePath);
+ HoodieTableMetaClient metaClient =
+
HoodieTableMetaClient.builder().setConf(executionContext.getHoodieTestSuiteWriter().getConfiguration()).setBasePath(executionContext.getHoodieTestSuiteWriter().getCfg().targetBasePath)
+ .build();
Option<HoodieInstant> lastInstant =
metaClient.getActiveTimeline().getCommitsTimeline().lastInstant();
if (lastInstant.isPresent()) {
log.info("Rolling back last instant {}", lastInstant.get());
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ScheduleCompactNode.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ScheduleCompactNode.java
index c54b25a..62bf9b0 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ScheduleCompactNode.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ScheduleCompactNode.java
@@ -41,8 +41,9 @@ public class ScheduleCompactNode extends
DagNode<Option<String>> {
// testing for now
// Find the last commit and extra the extra metadata to be passed to the
schedule compaction. This is
// done to ensure the CHECKPOINT is correctly passed from commit to commit
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(executionContext.getHoodieTestSuiteWriter().getConfiguration(),
- executionContext.getHoodieTestSuiteWriter().getCfg().targetBasePath);
+ HoodieTableMetaClient metaClient =
+
HoodieTableMetaClient.builder().setConf(executionContext.getHoodieTestSuiteWriter().getConfiguration()).setBasePath(executionContext.getHoodieTestSuiteWriter().getCfg().targetBasePath)
+ .build();
Option<HoodieInstant> lastInstant =
metaClient.getActiveTimeline().getCommitsTimeline().lastInstant();
if (lastInstant.isPresent()) {
HoodieCommitMetadata metadata =
org.apache.hudi.common.model.HoodieCommitMetadata.fromBytes(metaClient
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
index 136aa27..1acbc90 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
@@ -37,7 +37,6 @@ import org.apache.hudi.config.HoodieMemoryConfig;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.avro.AvroReadSupport;
@@ -80,7 +79,7 @@ public class DFSHoodieDatasetInputReader extends
DFSDeltaInputReader {
public DFSHoodieDatasetInputReader(JavaSparkContext jsc, String basePath,
String schemaStr) {
this.jsc = jsc;
this.schemaStr = schemaStr;
- this.metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(),
basePath);
+ this.metaClient =
HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build();
}
protected List<String> getPartitions(Option<Integer> partitionsLimit) throws
IOException {
diff --git
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/job/TestHoodieTestSuiteJob.java
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/job/TestHoodieTestSuiteJob.java
index b14ef18..6232b1d 100644
---
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/job/TestHoodieTestSuiteJob.java
+++
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/job/TestHoodieTestSuiteJob.java
@@ -173,7 +173,7 @@ public class TestHoodieTestSuiteJob extends
UtilitiesTestBase {
cfg.workloadDagGenerator = ComplexDagGenerator.class.getName();
HoodieTestSuiteJob hoodieTestSuiteJob = new HoodieTestSuiteJob(cfg, jsc);
hoodieTestSuiteJob.runTestSuite();
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(new
Configuration(), cfg.targetBasePath);
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(new
Configuration()).setBasePath(cfg.targetBasePath).build();
assertEquals(metaClient.getActiveTimeline().getCommitsTimeline().getInstants().count(),
2);
}
@@ -192,7 +192,7 @@ public class TestHoodieTestSuiteJob extends
UtilitiesTestBase {
}
HoodieTestSuiteJob hoodieTestSuiteJob = new HoodieTestSuiteJob(cfg, jsc);
hoodieTestSuiteJob.runTestSuite();
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(new
Configuration(), cfg.targetBasePath);
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(new
Configuration()).setBasePath(cfg.targetBasePath).build();
assertEquals(metaClient.getActiveTimeline().getCommitsTimeline().getInstants().count(),
1);
}
@@ -207,7 +207,7 @@ public class TestHoodieTestSuiteJob extends
UtilitiesTestBase {
cfg.workloadYamlPath = dfsBasePath + "/" + COW_DAG_FILE_NAME;
HoodieTestSuiteJob hoodieTestSuiteJob = new HoodieTestSuiteJob(cfg, jsc);
hoodieTestSuiteJob.runTestSuite();
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(new
Configuration(), cfg.targetBasePath);
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(new
Configuration()).setBasePath(cfg.targetBasePath).build();
//assertEquals(metaClient.getActiveTimeline().getCommitsTimeline().getInstants().count(),
5);
}
@@ -222,7 +222,7 @@ public class TestHoodieTestSuiteJob extends
UtilitiesTestBase {
cfg.workloadYamlPath = dfsBasePath + "/" + MOR_DAG_FILE_NAME;
HoodieTestSuiteJob hoodieTestSuiteJob = new HoodieTestSuiteJob(cfg, jsc);
hoodieTestSuiteJob.runTestSuite();
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(new
Configuration(), cfg.targetBasePath);
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(new
Configuration()).setBasePath(cfg.targetBasePath).build();
//assertEquals(metaClient.getActiveTimeline().getCommitsTimeline().getInstants().count(),
7);
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java
index b40d36b..7b04a65 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java
@@ -63,7 +63,7 @@ public class DataSourceInternalWriterHelper {
this.writeClient = new SparkRDDWriteClient<>(new
HoodieSparkEngineContext(new JavaSparkContext(sparkSession.sparkContext())),
writeConfig, true);
writeClient.setOperationType(operationType);
writeClient.startCommitWithTime(instantTime);
- this.metaClient = new HoodieTableMetaClient(configuration,
writeConfig.getBasePath());
+ this.metaClient =
HoodieTableMetaClient.builder().setConf(configuration).setBasePath(writeConfig.getBasePath()).build();
this.hoodieTable = HoodieSparkTable.create(writeConfig, new
HoodieSparkEngineContext(new JavaSparkContext(sparkSession.sparkContext())),
metaClient);
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java
index 734e0c0..ce80b52 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java
+++
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java
@@ -70,7 +70,7 @@ public class HoodieDataSourceHelpers {
*/
@PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
public static HoodieTimeline allCompletedCommitsCompactions(FileSystem fs,
String basePath) {
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs.getConf(),
basePath, true);
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
if (metaClient.getTableType().equals(HoodieTableType.MERGE_ON_READ)) {
return metaClient.getActiveTimeline().getTimelineOfActions(
CollectionUtils.createSet(HoodieActiveTimeline.COMMIT_ACTION,
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
index ef9bf8c..3299b8f 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -84,7 +84,7 @@ class DefaultSource extends RelationProvider
val tablePath = DataSourceUtils.getTablePath(fs, globPaths.toArray)
log.info("Obtained hudi table path: " + tablePath)
- val metaClient = new HoodieTableMetaClient(fs.getConf, tablePath)
+ val metaClient =
HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(tablePath).build()
val isBootstrappedTable =
metaClient.getTableConfig.getBootstrapBasePath.isPresent
log.info("Is bootstrapped table => " + isBootstrappedTable)
@@ -104,7 +104,7 @@ class DefaultSource extends RelationProvider
} else
if(parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)) {
getBaseFileOnlyView(sqlContext, parameters, schema, readPaths,
isBootstrappedTable, globPaths, metaClient)
} else if
(parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_INCREMENTAL_OPT_VAL)) {
- val metaClient = new HoodieTableMetaClient(fs.getConf, tablePath)
+ val metaClient =
HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(tablePath).build()
if (metaClient.getTableType.equals(HoodieTableType.MERGE_ON_READ)) {
new MergeOnReadIncrementalRelation(sqlContext, optParams, schema,
metaClient)
} else {
@@ -202,8 +202,8 @@ class DefaultSource extends RelationProvider
if (path.isEmpty || path.get == null) {
throw new HoodieException(s"'path' must be specified.")
}
- val metaClient = new HoodieTableMetaClient(
- sqlContext.sparkSession.sessionState.newHadoopConf(), path.get)
+ val metaClient = HoodieTableMetaClient.builder().setConf(
+
sqlContext.sparkSession.sessionState.newHadoopConf()).setBasePath(path.get).build()
val schemaResolver = new TableSchemaResolver(metaClient)
val sqlSchema =
try {
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index ad185cb..f5ba6c8 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -500,7 +500,8 @@ private[hudi] object HoodieSparkSqlWriter {
hoodieTableConfigOpt:
Option[HoodieTableConfig]): HoodieTableConfig = {
if (tableExists) {
hoodieTableConfigOpt.getOrElse(
- new HoodieTableMetaClient(sparkContext.hadoopConfiguration,
tablePath).getTableConfig)
+
HoodieTableMetaClient.builder().setConf(sparkContext.hadoopConfiguration).setBasePath(tablePath)
+ .build().getTableConfig)
} else {
null
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
index 846212d..f9a799e 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
@@ -175,8 +175,8 @@ class HoodieStreamingSink(sqlContext: SQLContext,
}))
// First time, scan .hoodie folder and get all pending compactions
- val metaClient = new
HoodieTableMetaClient(sqlContext.sparkContext.hadoopConfiguration,
- client.getConfig.getBasePath)
+ val metaClient =
HoodieTableMetaClient.builder().setConf(sqlContext.sparkContext.hadoopConfiguration)
+ .setBasePath(client.getConfig.getBasePath).build()
val pendingInstants :java.util.List[HoodieInstant] =
CompactionUtils.getPendingCompactionInstantTimes(metaClient)
pendingInstants.foreach((h : HoodieInstant) =>
asyncCompactorService.enqueuePendingCompaction(h))
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
index c17598a..b0e3163 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
@@ -59,7 +59,7 @@ class HoodieStreamSource(
val fs = path.getFileSystem(hadoopConf)
TablePathUtils.getTablePath(fs, path).get()
}
- private lazy val metaClient = new HoodieTableMetaClient(hadoopConf,
tablePath.toString)
+ private lazy val metaClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(tablePath.toString).build()
private lazy val tableType = metaClient.getTableType
@transient private var lastOffset: HoodieSourceOffset = _
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java
b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java
index 1df12a3..3b55434 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java
@@ -187,7 +187,7 @@ public class HoodieJavaStreamingApp {
executor.shutdownNow();
}
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(jssc.hadoopConfiguration(), tablePath);
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(jssc.hadoopConfiguration()).setBasePath(tablePath).build();
if (tableType.equals(HoodieTableType.MERGE_ON_READ.name())) {
// Ensure we have successfully completed one compaction commit
ValidationUtils.checkArgument(metaClient.getActiveTimeline().getCommitTimeline().getInstants().count()
== 1);
@@ -249,7 +249,7 @@ public class HoodieJavaStreamingApp {
if (timeline.countInstants() >= numCommits) {
return;
}
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(fs.getConf(), tablePath, true);
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).setLoadActiveTimelineOnLoad(true).build();
System.out.println("Instants :" +
metaClient.getActiveTimeline().getInstants().collect(Collectors.toList()));
} catch (TableNotFoundException te) {
LOG.info("Got table not found exception. Retrying");
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index 8fdb02d..856cc00 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -210,7 +210,8 @@ class TestCOWDataSource extends HoodieClientTestBase {
.mode(SaveMode.Append)
.save(basePath)
- val metaClient = new
HoodieTableMetaClient(spark.sparkContext.hadoopConfiguration, basePath, true)
+ val metaClient =
HoodieTableMetaClient.builder().setConf(spark.sparkContext.hadoopConfiguration).setBasePath(basePath)
+ .setLoadActiveTimelineOnLoad(true).build();
val commits =
metaClient.getActiveTimeline.filterCompletedInstants().getInstants.toArray
.map(instant => (instant.asInstanceOf[HoodieInstant]).getAction)
assertEquals(2, commits.size)
@@ -235,7 +236,8 @@ class TestCOWDataSource extends HoodieClientTestBase {
.mode(SaveMode.Overwrite)
.save(basePath)
- val metaClient = new
HoodieTableMetaClient(spark.sparkContext.hadoopConfiguration, basePath, true)
+ val metaClient =
HoodieTableMetaClient.builder().setConf(spark.sparkContext.hadoopConfiguration).setBasePath(basePath)
+ .setLoadActiveTimelineOnLoad(true).build()
val commits =
metaClient.getActiveTimeline.filterCompletedInstants().getInstants.toArray
.map(instant => (instant.asInstanceOf[HoodieInstant]).getAction)
assertEquals(2, commits.size)
@@ -289,7 +291,8 @@ class TestCOWDataSource extends HoodieClientTestBase {
val filterSecondPartitionCount = recordsForPartitionColumn.filter(row =>
row.get(0).equals(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH)).size
assertEquals(7, filterSecondPartitionCount)
- val metaClient = new
HoodieTableMetaClient(spark.sparkContext.hadoopConfiguration, basePath, true)
+ val metaClient =
HoodieTableMetaClient.builder().setConf(spark.sparkContext.hadoopConfiguration).setBasePath(basePath)
+ .setLoadActiveTimelineOnLoad(true).build()
val commits =
metaClient.getActiveTimeline.filterCompletedInstants().getInstants.toArray
.map(instant => instant.asInstanceOf[HoodieInstant].getAction)
assertEquals(3, commits.size)
@@ -339,7 +342,8 @@ class TestCOWDataSource extends HoodieClientTestBase {
val filterSecondPartitionCount = recordsForPartitionColumn.filter(row =>
row.get(0).equals(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH)).size
assertEquals(7, filterSecondPartitionCount)
- val metaClient = new
HoodieTableMetaClient(spark.sparkContext.hadoopConfiguration, basePath, true)
+ val metaClient =
HoodieTableMetaClient.builder().setConf(spark.sparkContext.hadoopConfiguration).setBasePath(basePath)
+ .setLoadActiveTimelineOnLoad(true).build()
val commits =
metaClient.getActiveTimeline.filterCompletedInstants().getInstants.toArray
.map(instant => instant.asInstanceOf[HoodieInstant].getAction)
assertEquals(2, commits.size)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
index 08e1c82..49dff4d 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
@@ -177,7 +177,8 @@ class TestStructuredStreaming extends HoodieClientTestBase {
numInstants = timeline.countInstants
success = true
}
- val metaClient = new HoodieTableMetaClient(fs.getConf, tablePath, true)
+ val metaClient =
HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(tablePath)
+ .setLoadActiveTimelineOnLoad(true).build()
} catch {
case te: TableNotFoundException =>
log.info("Got table not found exception. Retrying")
@@ -253,12 +254,14 @@ class TestStructuredStreaming extends
HoodieClientTestBase {
if (HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs,
destPath).getCompletedReplaceTimeline().countInstants() > 0) {
assertEquals(3, HoodieDataSourceHelpers.listCommitsSince(fs, destPath,
"000").size())
// check have at least one file group
- this.metaClient = new HoodieTableMetaClient(fs.getConf, destPath, true)
+ this.metaClient =
HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(destPath)
+ .setLoadActiveTimelineOnLoad(true).build()
assertTrue(getLatestFileGroupsFileId(partitionOfRecords).size > 0)
} else {
assertEquals(currNumCommits,
HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").size())
// check have more than one file group
- this.metaClient = new HoodieTableMetaClient(fs.getConf, destPath, true)
+ this.metaClient =
HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(destPath)
+ .setLoadActiveTimelineOnLoad(true).build()
assertTrue(getLatestFileGroupsFileId(partitionOfRecords).size > 1)
}
diff --git
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java
index 8d03252..8477ed6 100644
---
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java
+++
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java
@@ -53,7 +53,7 @@ public abstract class AbstractSyncHoodieClient {
public AbstractSyncHoodieClient(String basePath, boolean
assumeDatePartitioning, boolean useFileListingFromMetadata,
boolean verifyMetadataFileListing,
FileSystem fs) {
- this.metaClient = new HoodieTableMetaClient(fs.getConf(), basePath, true);
+ this.metaClient =
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
this.tableType = metaClient.getTableType();
this.basePath = basePath;
this.assumeDatePartitioning = assumeDatePartitioning;
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HiveIncrementalPuller.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HiveIncrementalPuller.java
index 7d356a5..a3570b1 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HiveIncrementalPuller.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HiveIncrementalPuller.java
@@ -276,7 +276,7 @@ public class HiveIncrementalPuller {
if (!fs.exists(new Path(targetDataPath)) || !fs.exists(new
Path(targetDataPath + "/.hoodie"))) {
return "0";
}
- HoodieTableMetaClient metadata = new HoodieTableMetaClient(fs.getConf(),
targetDataPath);
+ HoodieTableMetaClient metadata =
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(targetDataPath).build();
Option<HoodieInstant> lastCommit =
metadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant();
@@ -309,7 +309,7 @@ public class HiveIncrementalPuller {
}
private String getLastCommitTimePulled(FileSystem fs, String
sourceTableLocation) {
- HoodieTableMetaClient metadata = new HoodieTableMetaClient(fs.getConf(),
sourceTableLocation);
+ HoodieTableMetaClient metadata =
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(sourceTableLocation).build();
List<String> commitsToSync =
metadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants()
.findInstantsAfter(config.fromCommitTime,
config.maxCommits).getInstants().map(HoodieInstant::getTimestamp)
.collect(Collectors.toList());
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
index 394771c..44328d3 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
@@ -137,7 +137,7 @@ public class HoodieClusteringJob {
}
private String getSchemaFromLatestInstant() throws Exception {
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.basePath, true);
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(cfg.basePath).setLoadActiveTimelineOnLoad(true).build();
TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
if
(metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants()
== 0) {
throw new HoodieException("Cannot run clustering without any completed
commits");
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactionAdminTool.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactionAdminTool.java
index d3e4dba..d7642c4 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactionAdminTool.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactionAdminTool.java
@@ -60,7 +60,7 @@ public class HoodieCompactionAdminTool {
* Executes one of compaction admin operations.
*/
public void run(JavaSparkContext jsc) throws Exception {
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.basePath);
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(cfg.basePath).build();
try (CompactionAdminClient admin = new CompactionAdminClient(new
HoodieSparkEngineContext(jsc), cfg.basePath)) {
final FileSystem fs = FSUtils.getFs(cfg.basePath,
jsc.hadoopConfiguration());
if (cfg.outputPath != null && fs.exists(new Path(cfg.outputPath))) {
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java
index ece9b8c..72d1dbd 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java
@@ -83,7 +83,7 @@ public class HoodieSnapshotCopier implements Serializable {
final boolean verifyMetadataFileListing) throws
IOException {
FileSystem fs = FSUtils.getFs(baseDir, jsc.hadoopConfiguration());
final SerializableConfiguration serConf = new
SerializableConfiguration(jsc.hadoopConfiguration());
- final HoodieTableMetaClient tableMetadata = new
HoodieTableMetaClient(fs.getConf(), baseDir);
+ final HoodieTableMetaClient tableMetadata =
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(baseDir).build();
final BaseFileOnlyView fsView = new
HoodieTableFileSystemView(tableMetadata,
tableMetadata.getActiveTimeline().getCommitsAndCompactionTimeline().filterCompletedInstants());
HoodieEngineContext context = new HoodieSparkEngineContext(jsc);
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
index b9f32cb..8e792a0 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
@@ -148,7 +148,7 @@ public class HoodieSnapshotExporter {
}
private Option<String> getLatestCommitTimestamp(FileSystem fs, Config cfg) {
- final HoodieTableMetaClient tableMetadata = new
HoodieTableMetaClient(fs.getConf(), cfg.sourceBasePath);
+ final HoodieTableMetaClient tableMetadata =
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(cfg.sourceBasePath).build();
Option<HoodieInstant> latestCommit =
tableMetadata.getActiveTimeline().getCommitsAndCompactionTimeline()
.filterCompletedInstants().lastInstant();
return latestCommit.isPresent() ?
Option.of(latestCommit.get().getTimestamp()) : Option.empty();
@@ -259,7 +259,7 @@ public class HoodieSnapshotExporter {
private BaseFileOnlyView getBaseFileOnlyView(JavaSparkContext jsc, Config
cfg) {
FileSystem fs = FSUtils.getFs(cfg.sourceBasePath,
jsc.hadoopConfiguration());
- HoodieTableMetaClient tableMetadata = new
HoodieTableMetaClient(fs.getConf(), cfg.sourceBasePath);
+ HoodieTableMetaClient tableMetadata =
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(cfg.sourceBasePath).build();
return new HoodieTableFileSystemView(tableMetadata, tableMetadata
.getActiveTimeline().getCommitsAndCompactionTimeline().filterCompletedInstants());
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/InitialCheckpointFromAnotherHoodieTimelineProvider.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/InitialCheckpointFromAnotherHoodieTimelineProvider.java
index 17058da..e2554c4 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/InitialCheckpointFromAnotherHoodieTimelineProvider.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/InitialCheckpointFromAnotherHoodieTimelineProvider.java
@@ -44,7 +44,7 @@ public class
InitialCheckpointFromAnotherHoodieTimelineProvider extends InitialC
@Override
public void init(Configuration config) throws HoodieException {
super.init(config);
- this.anotherDsHoodieMetaclient = new HoodieTableMetaClient(config,
path.toString());
+ this.anotherDsHoodieMetaclient =
HoodieTableMetaClient.builder().setConf(config).setBasePath(path.toString()).build();
}
@Override
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 9eb5c9d..4c494dc 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -223,8 +223,7 @@ public class DeltaSync implements Serializable {
*/
public void refreshTimeline() throws IOException {
if (fs.exists(new Path(cfg.targetBasePath))) {
- HoodieTableMetaClient meta = new HoodieTableMetaClient(new
Configuration(fs.getConf()), cfg.targetBasePath,
- cfg.payloadClassName);
+ HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(new
Configuration(fs.getConf())).setBasePath(cfg.targetBasePath).setPayloadClassName(cfg.payloadClassName).build();
switch (meta.getTableType()) {
case COPY_ON_WRITE:
this.commitTimelineOpt =
Option.of(meta.getActiveTimeline().getCommitTimeline().filterCompletedInstants());
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
index 10d9453..6e3a024 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
@@ -528,7 +528,7 @@ public class HoodieDeltaStreamer implements Serializable {
if (fs.exists(new Path(cfg.targetBasePath))) {
HoodieTableMetaClient meta =
- new HoodieTableMetaClient(new Configuration(fs.getConf()),
cfg.targetBasePath, false);
+ HoodieTableMetaClient.builder().setConf(new
Configuration(fs.getConf())).setBasePath(cfg.targetBasePath).setLoadActiveTimelineOnLoad(false).build();
tableType = meta.getTableType();
// This will guarantee there is no surprise with table type
ValidationUtils.checkArgument(tableType.equals(HoodieTableType.valueOf(cfg.tableType)),
@@ -636,7 +636,7 @@ public class HoodieDeltaStreamer implements Serializable {
asyncCompactService = Option.ofNullable(new
SparkAsyncCompactService(new HoodieSparkEngineContext(jssc), writeClient));
// Enqueue existing pending compactions first
HoodieTableMetaClient meta =
- new HoodieTableMetaClient(new
Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath, true);
+ HoodieTableMetaClient.builder().setConf(new
Configuration(jssc.hadoopConfiguration())).setBasePath(cfg.targetBasePath).setLoadActiveTimelineOnLoad(true).build();
List<HoodieInstant> pending =
CompactionUtils.getPendingCompactionInstantTimes(meta);
pending.forEach(hoodieInstant ->
asyncCompactService.get().enqueuePendingCompaction(hoodieInstant));
asyncCompactService.get().start((error) -> {
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java
index d296f0e..184d7c7 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java
@@ -100,7 +100,7 @@ public class TimelineServerPerf implements Serializable {
this.hostAddr = cfg.serverHost;
}
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(timelineServer.getConf(), cfg.basePath, true);
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(timelineServer.getConf()).setBasePath(cfg.basePath).setLoadActiveTimelineOnLoad(true).build();
SyncableFileSystemView fsView = new
RemoteHoodieTableFileSystemView(this.hostAddr, cfg.serverPort, metaClient);
String reportDir = cfg.reportDir;
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java
index 96dc648..efe3a86 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java
@@ -58,7 +58,7 @@ public class IncrSourceHelper {
int numInstantsPerFetch, Option<String> beginInstant, boolean
readLatestOnMissingBeginInstant) {
ValidationUtils.checkArgument(numInstantsPerFetch > 0,
"Make sure the config
hoodie.deltastreamer.source.hoodieincr.num_instants is set to a positive
value");
- HoodieTableMetaClient srcMetaClient = new
HoodieTableMetaClient(jssc.hadoopConfiguration(), srcBasePath, true);
+ HoodieTableMetaClient srcMetaClient =
HoodieTableMetaClient.builder().setConf(jssc.hadoopConfiguration()).setBasePath(srcBasePath).setLoadActiveTimelineOnLoad(true).build();
final HoodieTimeline activeCommitTimeline =
srcMetaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants();
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
index eeef8ed..616d039 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
@@ -363,7 +363,7 @@ public class TestHoodieDeltaStreamer extends
UtilitiesTestBase {
}
static void assertAtleastNCompactionCommits(int minExpected, String
tablePath, FileSystem fs) {
- HoodieTableMetaClient meta = new HoodieTableMetaClient(fs.getConf(),
tablePath);
+ HoodieTableMetaClient meta =
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build();
HoodieTimeline timeline =
meta.getActiveTimeline().getCommitTimeline().filterCompletedInstants();
LOG.info("Timeline Instants=" +
meta.getActiveTimeline().getInstants().collect(Collectors.toList()));
int numCompactionCommits = (int) timeline.getInstants().count();
@@ -371,7 +371,7 @@ public class TestHoodieDeltaStreamer extends
UtilitiesTestBase {
}
static void assertAtleastNDeltaCommits(int minExpected, String tablePath,
FileSystem fs) {
- HoodieTableMetaClient meta = new HoodieTableMetaClient(fs.getConf(),
tablePath);
+ HoodieTableMetaClient meta =
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build();
HoodieTimeline timeline =
meta.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants();
LOG.info("Timeline Instants=" +
meta.getActiveTimeline().getInstants().collect(Collectors.toList()));
int numDeltaCommits = (int) timeline.getInstants().count();
@@ -380,7 +380,7 @@ public class TestHoodieDeltaStreamer extends
UtilitiesTestBase {
static String assertCommitMetadata(String expected, String tablePath,
FileSystem fs, int totalCommits)
throws IOException {
- HoodieTableMetaClient meta = new HoodieTableMetaClient(fs.getConf(),
tablePath);
+ HoodieTableMetaClient meta =
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build();
HoodieTimeline timeline =
meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
HoodieInstant lastInstant = timeline.lastInstant().get();
HoodieCommitMetadata commitMetadata =
@@ -408,7 +408,7 @@ public class TestHoodieDeltaStreamer extends
UtilitiesTestBase {
}
static void assertAtLeastNCommits(int minExpected, String tablePath,
FileSystem fs) {
- HoodieTableMetaClient meta = new HoodieTableMetaClient(fs.getConf(),
tablePath);
+ HoodieTableMetaClient meta =
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build();
HoodieTimeline timeline =
meta.getActiveTimeline().filterCompletedInstants();
LOG.info("Timeline Instants=" +
meta.getActiveTimeline().getInstants().collect(Collectors.toList()));
int numDeltaCommits = (int) timeline.getInstants().count();
@@ -683,13 +683,13 @@ public class TestHoodieDeltaStreamer extends
UtilitiesTestBase {
cfg.configs.add(String.format("%s=%s",
HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMIT_PROP, "2"));
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
deltaStreamerTestRunner(ds, cfg, (r) -> {
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(this.dfs.getConf(), tableBasePath, true);
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(this.dfs.getConf()).setBasePath(tableBasePath).setLoadActiveTimelineOnLoad(true).build();
int pendingReplaceSize =
metaClient.getActiveTimeline().filterPendingReplaceTimeline().getInstants().toArray().length;
int completeReplaceSize =
metaClient.getActiveTimeline().getCompletedReplaceTimeline().getInstants().toArray().length;
LOG.info("PendingReplaceSize=" + pendingReplaceSize +
",completeReplaceSize = " + completeReplaceSize);
return completeReplaceSize > 0;
});
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(this.dfs.getConf(), tableBasePath, true);
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(this.dfs.getConf()).setBasePath(tableBasePath).setLoadActiveTimelineOnLoad(true).build();
assertEquals(1,
metaClient.getActiveTimeline().getCompletedReplaceTimeline().getInstants().toArray().length);
}
@@ -739,13 +739,13 @@ public class TestHoodieDeltaStreamer extends
UtilitiesTestBase {
} else {
LOG.warn("Schedule clustering failed");
}
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(this.dfs.getConf(), tableBasePath, true);
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(this.dfs.getConf()).setBasePath(tableBasePath).setLoadActiveTimelineOnLoad(true).build();
int pendingReplaceSize =
metaClient.getActiveTimeline().filterPendingReplaceTimeline().getInstants().toArray().length;
int completeReplaceSize =
metaClient.getActiveTimeline().getCompletedReplaceTimeline().getInstants().toArray().length;
System.out.println("PendingReplaceSize=" + pendingReplaceSize +
",completeReplaceSize = " + completeReplaceSize);
return completeReplaceSize > 0;
});
- HoodieTableMetaClient metaClient = new
HoodieTableMetaClient(this.dfs.getConf(), tableBasePath, true);
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(this.dfs.getConf()).setBasePath(tableBasePath).setLoadActiveTimelineOnLoad(true).build();
assertEquals(1,
metaClient.getActiveTimeline().getCompletedReplaceTimeline().getInstants().toArray().length);
}
@@ -921,7 +921,7 @@ public class TestHoodieDeltaStreamer extends
UtilitiesTestBase {
assertEquals(1000, counts.get(1).getLong(1));
// Test with empty commits
- HoodieTableMetaClient mClient = new
HoodieTableMetaClient(jsc.hadoopConfiguration(), tableBasePath, true);
+ HoodieTableMetaClient mClient =
HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(tableBasePath).setLoadActiveTimelineOnLoad(true).build();
HoodieInstant lastFinished =
mClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get();
HoodieDeltaStreamer.Config cfg2 =
TestHelpers.makeDropAllConfig(tableBasePath, WriteOperationType.UPSERT);
cfg2.filterDupes = false;
@@ -930,7 +930,7 @@ public class TestHoodieDeltaStreamer extends
UtilitiesTestBase {
cfg2.configs.add(String.format("%s=false",
HoodieCompactionConfig.AUTO_CLEAN_PROP));
HoodieDeltaStreamer ds2 = new HoodieDeltaStreamer(cfg2, jsc);
ds2.sync();
- mClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(),
tableBasePath, true);
+ mClient =
HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(tableBasePath).setLoadActiveTimelineOnLoad(true).build();
HoodieInstant newLastFinished =
mClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get();
assertTrue(HoodieTimeline.compareTimestamps(newLastFinished.getTimestamp(),
HoodieTimeline.GREATER_THAN, lastFinished.getTimestamp()
));