This is an automated email from the ASF dual-hosted git repository.

vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new c9fcf96  [HUDI-1315] Adding builder for HoodieTableMetaClient 
initialization (#2534)
c9fcf96 is described below

commit c9fcf964b2bae56a54cb72951c8d8999eb323ed6
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Fri Feb 19 20:54:26 2021 -0500

    [HUDI-1315] Adding builder for HoodieTableMetaClient initialization (#2534)
---
 .../main/java/org/apache/hudi/cli/HoodieCLI.java   |  4 +-
 .../apache/hudi/cli/commands/CommitsCommand.java   |  4 +-
 .../hudi/cli/commands/FileSystemViewCommand.java   |  2 +-
 .../org/apache/hudi/cli/commands/SparkMain.java    |  6 +-
 .../org/apache/hudi/cli/commands/TableCommand.java |  2 +-
 .../scala/org/apache/hudi/cli/DedupeSparkJob.scala |  4 +-
 .../apache/hudi/client/AbstractHoodieClient.java   |  6 +-
 .../metadata/HoodieBackedTableMetadataWriter.java  |  2 +-
 .../org/apache/hudi/table/HoodieFlinkTable.java    | 11 ++-
 .../org/apache/hudi/table/HoodieJavaTable.java     | 11 ++-
 .../org/apache/hudi/client/HoodieReadClient.java   |  4 +-
 .../org/apache/hudi/table/HoodieSparkTable.java    | 11 ++-
 .../hudi/client/TestCompactionAdminClient.java     | 12 ++--
 .../TestHoodieClientOnCopyOnWriteStorage.java      | 12 ++--
 .../java/org/apache/hudi/client/TestMultiFS.java   |  4 +-
 .../hudi/metadata/TestHoodieBackedMetadata.java    | 10 +--
 .../table/action/compact/CompactionTestBase.java   | 16 ++---
 .../table/action/compact/TestAsyncCompaction.java  | 26 +++----
 .../table/action/compact/TestInlineCompaction.java | 36 +++++-----
 .../hudi/testutils/HoodieClientTestBase.java       |  8 +--
 .../hudi/testutils/HoodieClientTestHarness.java    |  2 +-
 .../hudi/testutils/HoodieClientTestUtils.java      |  2 +-
 .../hudi/testutils/HoodieMergeOnReadTestUtils.java |  2 +-
 .../hudi/common/table/HoodieTableMetaClient.java   | 83 +++++++++++++++-------
 .../table/log/AbstractHoodieLogRecordScanner.java  |  2 +-
 .../hudi/common/table/log/LogReaderUtils.java      |  2 +-
 .../common/table/view/FileSystemViewManager.java   |  2 +-
 .../apache/hudi/metadata/BaseTableMetadata.java    |  2 +-
 .../hudi/metadata/HoodieBackedTableMetadata.java   |  2 +-
 .../table/timeline/TestHoodieActiveTimeline.java   |  7 +-
 .../table/view/TestIncrementalFSViewSync.java      | 10 +--
 .../hudi/common/testutils/CompactionTestUtils.java |  2 +-
 .../hudi/common/testutils/FileCreateUtils.java     |  2 +-
 .../common/testutils/HoodieCommonTestHarness.java  |  2 +-
 .../hudi/common/util/TestCompactionUtils.java      |  4 +-
 .../hudi/hadoop/HoodieROTablePathFilter.java       |  2 +-
 .../realtime/AbstractRealtimeRecordReader.java     |  2 +-
 .../hudi/hadoop/utils/HoodieInputFormatUtils.java  |  2 +-
 .../integ/testsuite/dag/nodes/CompactNode.java     |  5 +-
 .../integ/testsuite/dag/nodes/RollbackNode.java    |  5 +-
 .../testsuite/dag/nodes/ScheduleCompactNode.java   |  5 +-
 .../reader/DFSHoodieDatasetInputReader.java        |  3 +-
 .../testsuite/job/TestHoodieTestSuiteJob.java      |  8 +--
 .../internal/DataSourceInternalWriterHelper.java   |  2 +-
 .../org/apache/hudi/HoodieDataSourceHelpers.java   |  2 +-
 .../main/scala/org/apache/hudi/DefaultSource.scala |  8 +--
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |  3 +-
 .../org/apache/hudi/HoodieStreamingSink.scala      |  4 +-
 .../sql/hudi/streaming/HoodieStreamSource.scala    |  2 +-
 .../src/test/java/HoodieJavaStreamingApp.java      |  4 +-
 .../apache/hudi/functional/TestCOWDataSource.scala | 12 ++--
 .../hudi/functional/TestStructuredStreaming.scala  |  9 ++-
 .../hudi/sync/common/AbstractSyncHoodieClient.java |  2 +-
 .../hudi/utilities/HiveIncrementalPuller.java      |  4 +-
 .../apache/hudi/utilities/HoodieClusteringJob.java |  2 +-
 .../hudi/utilities/HoodieCompactionAdminTool.java  |  2 +-
 .../hudi/utilities/HoodieSnapshotCopier.java       |  2 +-
 .../hudi/utilities/HoodieSnapshotExporter.java     |  4 +-
 ...heckpointFromAnotherHoodieTimelineProvider.java |  2 +-
 .../hudi/utilities/deltastreamer/DeltaSync.java    |  3 +-
 .../deltastreamer/HoodieDeltaStreamer.java         |  4 +-
 .../hudi/utilities/perf/TimelineServerPerf.java    |  2 +-
 .../sources/helpers/IncrSourceHelper.java          |  2 +-
 .../functional/TestHoodieDeltaStreamer.java        | 20 +++---
 64 files changed, 241 insertions(+), 203 deletions(-)

diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieCLI.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieCLI.java
index a4059e1..04dedc5 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieCLI.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieCLI.java
@@ -85,8 +85,8 @@ public class HoodieCLI {
   }
 
   public static void refreshTableMetadata() {
-    setTableMetaClient(new HoodieTableMetaClient(HoodieCLI.conf, basePath, 
false, HoodieCLI.consistencyGuardConfig,
-        Option.of(layoutVersion)));
+    
setTableMetaClient(HoodieTableMetaClient.builder().setConf(HoodieCLI.conf).setBasePath(basePath).setLoadActiveTimelineOnLoad(false).setConsistencyGuardConfig(HoodieCLI.consistencyGuardConfig)
+        .setLayoutVersion(Option.of(layoutVersion)).build());
   }
 
   public static void connectTo(String basePath, Integer layoutVersion) {
diff --git 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java
index 70e2029..3e216b4 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java
@@ -401,7 +401,7 @@ public class CommitsCommand implements CommandMarker {
   public String compareCommits(@CliOption(key = {"path"}, help = "Path of the 
table to compare to") final String path) {
 
     HoodieTableMetaClient source = HoodieCLI.getTableMetaClient();
-    HoodieTableMetaClient target = new HoodieTableMetaClient(HoodieCLI.conf, 
path);
+    HoodieTableMetaClient target = 
HoodieTableMetaClient.builder().setConf(HoodieCLI.conf).setBasePath(path).build();
     HoodieTimeline targetTimeline = 
target.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
     HoodieTimeline sourceTimeline = 
source.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
     String targetLatestCommit =
@@ -426,7 +426,7 @@ public class CommitsCommand implements CommandMarker {
 
   @CliCommand(value = "commits sync", help = "Compare commits with another 
Hoodie table")
   public String syncCommits(@CliOption(key = {"path"}, help = "Path of the 
table to compare to") final String path) {
-    HoodieCLI.syncTableMetadata = new HoodieTableMetaClient(HoodieCLI.conf, 
path);
+    HoodieCLI.syncTableMetadata = 
HoodieTableMetaClient.builder().setConf(HoodieCLI.conf).setBasePath(path).build();
     HoodieCLI.state = HoodieCLI.CLIState.SYNC;
     return "Load sync state between " + 
HoodieCLI.getTableMetaClient().getTableConfig().getTableName() + " and "
         + HoodieCLI.syncTableMetadata.getTableConfig().getTableName();
diff --git 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/FileSystemViewCommand.java
 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/FileSystemViewCommand.java
index ef76ee4..37bc651 100644
--- 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/FileSystemViewCommand.java
+++ 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/FileSystemViewCommand.java
@@ -237,7 +237,7 @@ public class FileSystemViewCommand implements CommandMarker 
{
       boolean includeMaxInstant, boolean includeInflight, boolean 
excludeCompaction) throws IOException {
     HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
     HoodieTableMetaClient metaClient =
-        new HoodieTableMetaClient(client.getHadoopConf(), 
client.getBasePath(), true);
+        
HoodieTableMetaClient.builder().setConf(client.getHadoopConf()).setBasePath(client.getBasePath()).setLoadActiveTimelineOnLoad(true).build();
     FileSystem fs = HoodieCLI.fs;
     String globPath = String.format("%s/%s/*", client.getBasePath(), 
globRegex);
     List<FileStatus> statuses = FSUtils.getGlobStatusExcludingMetaFolder(fs, 
new Path(globPath));
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
index f715b16..0f0a851 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
@@ -402,8 +402,10 @@ public class SparkMain {
    */
   protected static int upgradeOrDowngradeTable(JavaSparkContext jsc, String 
basePath, String toVersion) {
     HoodieWriteConfig config = getWriteConfig(basePath);
-    HoodieTableMetaClient metaClient = new 
HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), false,
-        config.getConsistencyGuardConfig(), Option.of(new 
TimelineLayoutVersion(config.getTimelineLayoutVersion())));
+    HoodieTableMetaClient metaClient =
+        
HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(config.getBasePath())
+            
.setLoadActiveTimelineOnLoad(false).setConsistencyGuardConfig(config.getConsistencyGuardConfig())
+            .setLayoutVersion(Option.of(new 
TimelineLayoutVersion(config.getTimelineLayoutVersion()))).build();
     try {
       new SparkUpgradeDowngrade(metaClient, config, new 
HoodieSparkEngineContext(jsc)).run(metaClient, 
HoodieTableVersion.valueOf(toVersion), config, new 
HoodieSparkEngineContext(jsc), null);
       LOG.info(String.format("Table at \"%s\" upgraded / downgraded to version 
\"%s\".", basePath, toVersion));
diff --git 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java
index 9c947e4..168de26 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java
@@ -95,7 +95,7 @@ public class TableCommand implements CommandMarker {
 
     boolean existing = false;
     try {
-      new HoodieTableMetaClient(HoodieCLI.conf, path);
+      
HoodieTableMetaClient.builder().setConf(HoodieCLI.conf).setBasePath(path).build();
       existing = true;
     } catch (TableNotFoundException dfe) {
       // expected
diff --git a/hudi-cli/src/main/scala/org/apache/hudi/cli/DedupeSparkJob.scala 
b/hudi-cli/src/main/scala/org/apache/hudi/cli/DedupeSparkJob.scala
index 96944c5..04c5f93 100644
--- a/hudi-cli/src/main/scala/org/apache/hudi/cli/DedupeSparkJob.scala
+++ b/hudi-cli/src/main/scala/org/apache/hudi/cli/DedupeSparkJob.scala
@@ -75,7 +75,7 @@ class DedupeSparkJob(basePath: String,
     val tmpTableName = s"htbl_${System.currentTimeMillis()}"
     val dedupeTblName = s"${tmpTableName}_dupeKeys"
 
-    val metadata = new HoodieTableMetaClient(fs.getConf, basePath)
+    val metadata = 
HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(basePath).build()
 
     val allFiles = fs.listStatus(new 
org.apache.hadoop.fs.Path(s"$basePath/$duplicatedPartitionPath"))
     val fsView = new HoodieTableFileSystemView(metadata, 
metadata.getActiveTimeline.getCommitTimeline.filterCompletedInstants(), 
allFiles)
@@ -184,7 +184,7 @@ class DedupeSparkJob(basePath: String,
   }
 
   def fixDuplicates(dryRun: Boolean = true) = {
-    val metadata = new HoodieTableMetaClient(fs.getConf, basePath)
+    val metadata = 
HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(basePath).build()
 
     val allFiles = fs.listStatus(new 
Path(s"$basePath/$duplicatedPartitionPath"))
     val fsView = new HoodieTableFileSystemView(metadata, 
metadata.getActiveTimeline.getCommitTimeline.filterCompletedInstants(), 
allFiles)
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java
index 765965f..0266a65 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java
@@ -128,9 +128,9 @@ public abstract class AbstractHoodieClient implements 
Serializable, AutoCloseabl
   }
 
   protected HoodieTableMetaClient createMetaClient(boolean 
loadActiveTimelineOnLoad) {
-    return new HoodieTableMetaClient(hadoopConf, config.getBasePath(), 
loadActiveTimelineOnLoad,
-        config.getConsistencyGuardConfig(),
-        Option.of(new 
TimelineLayoutVersion(config.getTimelineLayoutVersion())));
+    return 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(config.getBasePath())
+        
.setLoadActiveTimelineOnLoad(loadActiveTimelineOnLoad).setConsistencyGuardConfig(config.getConsistencyGuardConfig())
+        .setLayoutVersion(Option.of(new 
TimelineLayoutVersion(config.getTimelineLayoutVersion()))).build();
   }
 
   public Option<EmbeddedTimelineService> getTimelineServer() {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 1f76a5e..be1fc3a 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -106,7 +106,7 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
       
ValidationUtils.checkArgument(!this.metadataWriteConfig.useFileListingMetadata(),
 "File listing cannot be used for Metadata Table");
 
       initRegistry();
-      HoodieTableMetaClient datasetMetaClient = new 
HoodieTableMetaClient(hadoopConf, datasetWriteConfig.getBasePath());
+      HoodieTableMetaClient datasetMetaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(datasetWriteConfig.getBasePath()).build();
       initialize(engineContext, datasetMetaClient);
       if (enabled) {
         // This is always called even in case the table was created for the 
first time. This is because
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
index 6b7c4a6..79a3106 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
@@ -43,13 +43,10 @@ public abstract class HoodieFlinkTable<T extends 
HoodieRecordPayload>
   }
 
   public static <T extends HoodieRecordPayload> HoodieFlinkTable<T> 
create(HoodieWriteConfig config, HoodieFlinkEngineContext context) {
-    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(
-        context.getHadoopConf().get(),
-        config.getBasePath(),
-        true,
-        config.getConsistencyGuardConfig(),
-        Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion()))
-    );
+    HoodieTableMetaClient metaClient =
+        
HoodieTableMetaClient.builder().setConf(context.getHadoopConf().get()).setBasePath(config.getBasePath())
+            
.setLoadActiveTimelineOnLoad(true).setConsistencyGuardConfig(config.getConsistencyGuardConfig())
+            .setLayoutVersion(Option.of(new 
TimelineLayoutVersion(config.getTimelineLayoutVersion()))).build();
     return HoodieFlinkTable.create(config, context, metaClient);
   }
 
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java
index bd8f954..219dec4 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java
@@ -42,13 +42,10 @@ public abstract class HoodieJavaTable<T extends 
HoodieRecordPayload>
   }
 
   public static <T extends HoodieRecordPayload> HoodieJavaTable<T> 
create(HoodieWriteConfig config, HoodieEngineContext context) {
-    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(
-        context.getHadoopConf().get(),
-        config.getBasePath(),
-        true,
-        config.getConsistencyGuardConfig(),
-        Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion()))
-    );
+    HoodieTableMetaClient metaClient =
+        
HoodieTableMetaClient.builder().setConf(context.getHadoopConf().get()).setBasePath(config.getBasePath())
+            
.setLoadActiveTimelineOnLoad(true).setConsistencyGuardConfig(config.getConsistencyGuardConfig())
+            .setLayoutVersion(Option.of(new 
TimelineLayoutVersion(config.getTimelineLayoutVersion()))).build();
     return HoodieJavaTable.create(config, (HoodieJavaEngineContext) context, 
metaClient);
   }
 
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java
index 4fb9f22..f8cc757 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieReadClient.java
@@ -97,7 +97,7 @@ public class HoodieReadClient<T extends HoodieRecordPayload> 
implements Serializ
     this.hadoopConf = context.getHadoopConf().get();
     final String basePath = clientConfig.getBasePath();
     // Create a Hoodie table which encapsulated the commits and files visible
-    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, 
basePath, true);
+    HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
     this.hoodieTable = HoodieSparkTable.create(clientConfig, context, 
metaClient);
     this.index = SparkHoodieIndex.createIndex(clientConfig);
     this.sqlContextOpt = Option.empty();
@@ -199,7 +199,7 @@ public class HoodieReadClient<T extends 
HoodieRecordPayload> implements Serializ
    */
   public List<Pair<String, HoodieCompactionPlan>> getPendingCompactions() {
     HoodieTableMetaClient metaClient =
-        new HoodieTableMetaClient(hadoopConf, 
hoodieTable.getMetaClient().getBasePath(), true);
+        
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(hoodieTable.getMetaClient().getBasePath()).setLoadActiveTimelineOnLoad(true).build();
     return CompactionUtils.getAllPendingCompactionPlans(metaClient).stream()
         .map(
             instantWorkloadPair -> 
Pair.of(instantWorkloadPair.getKey().getTimestamp(), 
instantWorkloadPair.getValue()))
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
index dd8106f..70a57b7 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
@@ -42,13 +42,10 @@ public abstract class HoodieSparkTable<T extends 
HoodieRecordPayload>
   }
 
   public static <T extends HoodieRecordPayload> HoodieSparkTable<T> 
create(HoodieWriteConfig config, HoodieEngineContext context) {
-    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(
-        context.getHadoopConf().get(),
-        config.getBasePath(),
-        true,
-        config.getConsistencyGuardConfig(),
-        Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion()))
-    );
+    HoodieTableMetaClient metaClient =
+        
HoodieTableMetaClient.builder().setConf(context.getHadoopConf().get()).setBasePath(config.getBasePath())
+            
.setLoadActiveTimelineOnLoad(true).setConsistencyGuardConfig(config.getConsistencyGuardConfig())
+            .setLayoutVersion(Option.of(new 
TimelineLayoutVersion(config.getTimelineLayoutVersion()))).build();
     return HoodieSparkTable.create(config, (HoodieSparkEngineContext) context, 
metaClient);
   }
 
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java
index e59a950..67d8257 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java
@@ -135,7 +135,7 @@ public class TestCompactionAdminClient extends 
HoodieClientTestBase {
       int expNumRepairs) throws Exception {
     List<Pair<HoodieLogFile, HoodieLogFile>> renameFiles =
         validateUnSchedulePlan(client, ingestionInstant, compactionInstant, 
numEntriesPerInstant, expNumRepairs, true);
-    metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), 
basePath, true);
+    metaClient = 
HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
     List<ValidationOpResult> result = 
client.validateCompactionPlan(metaClient, compactionInstant, 1);
     if (expNumRepairs > 0) {
       assertTrue(result.stream().anyMatch(r -> !r.isSuccess()), "Expect some 
failures in validation");
@@ -176,7 +176,7 @@ public class TestCompactionAdminClient extends 
HoodieClientTestBase {
    * @param compactionInstant Compaction Instant
    */
   private void ensureValidCompactionPlan(String compactionInstant) throws 
Exception {
-    metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), 
basePath, true);
+    metaClient = 
HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
     // Ensure compaction-plan is good to begin with
     List<ValidationOpResult> validationResults = 
client.validateCompactionPlan(metaClient, compactionInstant, 1);
     assertFalse(validationResults.stream().anyMatch(v -> !v.isSuccess()),
@@ -234,7 +234,7 @@ public class TestCompactionAdminClient extends 
HoodieClientTestBase {
     // Check suggested rename operations
     List<Pair<HoodieLogFile, HoodieLogFile>> renameFiles =
         client.getRenamingActionsForUnschedulingCompactionPlan(metaClient, 
compactionInstant, 1, Option.empty(), false);
-    metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), 
basePath, true);
+    metaClient = 
HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
 
     // Log files belonging to file-slices created because of compaction 
request must be renamed
 
@@ -270,7 +270,7 @@ public class TestCompactionAdminClient extends 
HoodieClientTestBase {
 
     client.unscheduleCompactionPlan(compactionInstant, false, 1, false);
 
-    metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), 
basePath, true);
+    metaClient = 
HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
     final HoodieTableFileSystemView newFsView =
         new HoodieTableFileSystemView(metaClient, 
metaClient.getCommitsAndCompactionTimeline());
     // Expect all file-slice whose base-commit is same as compaction commit to 
contain no new Log files
@@ -306,7 +306,7 @@ public class TestCompactionAdminClient extends 
HoodieClientTestBase {
     // Check suggested rename operations
     List<Pair<HoodieLogFile, HoodieLogFile>> renameFiles = client
         .getRenamingActionsForUnschedulingCompactionOperation(metaClient, 
compactionInstant, op, Option.empty(), false);
-    metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), 
basePath, true);
+    metaClient = 
HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
 
     // Log files belonging to file-slices created because of compaction 
request must be renamed
 
@@ -331,7 +331,7 @@ public class TestCompactionAdminClient extends 
HoodieClientTestBase {
     // Call the main unschedule API
     client.unscheduleCompactionFileId(op.getFileGroupId(), false, false);
 
-    metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), 
basePath, true);
+    metaClient = 
HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
     final HoodieTableFileSystemView newFsView =
         new HoodieTableFileSystemView(metaClient, 
metaClient.getCommitsAndCompactionTimeline());
     // Expect all file-slice whose base-commit is same as compaction commit to 
contain no new Log files
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
index b4a392e..c81aa11 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
@@ -410,7 +410,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
 
     final HoodieWriteConfig cfg = hoodieWriteConfig;
     final String instantTime = "007";
-    HoodieTableMetaClient metaClient = new 
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+    HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build();
     String basePathStr = basePath;
     HoodieTable table = getHoodieTable(metaClient, cfg);
     jsc.parallelize(Arrays.asList(1)).map(e -> {
@@ -894,7 +894,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
     assertNoWriteErrors(statuses);
 
     assertEquals(2, statuses.size(), "2 files needs to be committed.");
-    HoodieTableMetaClient metadata = new HoodieTableMetaClient(hadoopConf, 
basePath);
+    HoodieTableMetaClient metadata = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
 
     HoodieTable table = getHoodieTable(metadata, config);
     BaseFileOnlyView fileSystemView = table.getBaseFileOnlyView();
@@ -1001,7 +1001,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
             + readRowKeysFromParquet(hadoopConf, new Path(basePath, 
statuses.get(1).getStat().getPath())).size(),
         "file should contain 340 records");
 
-    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, 
basePath);
+    HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
     HoodieTable table = getHoodieTable(metaClient, config);
     List<HoodieBaseFile> files = table.getBaseFileOnlyView()
         .getLatestBaseFilesBeforeOrOn(testPartitionPath, 
commitTime3).collect(Collectors.toList());
@@ -1428,7 +1428,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
 
     HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).build();
     try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) {
-      HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, 
basePath);
+      HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
       HoodieSparkTable table = HoodieSparkTable.create(cfg, context, 
metaClient);
 
       String instantTime = "000";
@@ -1533,7 +1533,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
   @ParameterizedTest
   @ValueSource(booleans = {true, false})
   public void testConsistencyCheckDuringFinalize(boolean 
enableOptimisticConsistencyGuard) throws Exception {
-    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, 
basePath);
+    HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
     String instantTime = "000";
     HoodieWriteConfig cfg = 
getConfigBuilder().withAutoCommit(false).withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder()
         
.withEnableOptimisticConsistencyGuard(enableOptimisticConsistencyGuard).build()).build();
@@ -1559,7 +1559,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
 
   private void testRollbackAfterConsistencyCheckFailureUsingFileList(boolean 
rollbackUsingMarkers, boolean enableOptimisticConsistencyGuard) throws 
Exception {
     String instantTime = "000";
-    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, 
basePath);
+    HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
     HoodieWriteConfig cfg = !enableOptimisticConsistencyGuard ? 
getConfigBuilder().withRollbackUsingMarkers(rollbackUsingMarkers).withAutoCommit(false)
         
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true)
             
.withMaxConsistencyCheckIntervalMs(1).withInitialConsistencyCheckIntervalMs(1).withEnableOptimisticConsistencyGuard(enableOptimisticConsistencyGuard).build()).build()
 :
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiFS.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiFS.java
index 34daed7..b0bd54f 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiFS.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiFS.java
@@ -94,7 +94,7 @@ public class TestMultiFS extends HoodieClientTestHarness {
 
       // Read from hdfs
       FileSystem fs = FSUtils.getFs(dfsBasePath, 
HoodieTestUtils.getDefaultHadoopConf());
-      HoodieTableMetaClient metaClient = new 
HoodieTableMetaClient(fs.getConf(), dfsBasePath);
+      HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(dfsBasePath).build();
       HoodieTimeline timeline = new 
HoodieActiveTimeline(metaClient).getCommitTimeline();
       Dataset<Row> readRecords = HoodieClientTestUtils.readCommit(dfsBasePath, 
sqlContext, timeline, readCommitTime);
       assertEquals(readRecords.count(), records.size(), "Should contain 100 
records");
@@ -112,7 +112,7 @@ public class TestMultiFS extends HoodieClientTestHarness {
 
       LOG.info("Reading from path: " + tablePath);
       fs = FSUtils.getFs(tablePath, HoodieTestUtils.getDefaultHadoopConf());
-      metaClient = new HoodieTableMetaClient(fs.getConf(), tablePath);
+      metaClient = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build();
       timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
       Dataset<Row> localReadRecords =
           HoodieClientTestUtils.readCommit(tablePath, sqlContext, timeline, 
writeCommitTime);
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java
index c238fc0..f822e85 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java
@@ -118,13 +118,13 @@ public class TestHoodieBackedMetadata extends 
HoodieClientTestHarness {
 
     // Metadata table should not exist until created for the first time
     assertFalse(fs.exists(new Path(metadataTableBasePath)), "Metadata table 
should not exist");
-    assertThrows(TableNotFoundException.class, () -> new 
HoodieTableMetaClient(hadoopConf, metadataTableBasePath));
+    assertThrows(TableNotFoundException.class, () -> 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build());
 
     // Metadata table is not created if disabled by config
     try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, 
getWriteConfig(true, false))) {
       client.startCommitWithTime("001");
       assertFalse(fs.exists(new Path(metadataTableBasePath)), "Metadata table 
should not be created");
-      assertThrows(TableNotFoundException.class, () -> new 
HoodieTableMetaClient(hadoopConf, metadataTableBasePath));
+      assertThrows(TableNotFoundException.class, () -> 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build());
     }
 
     // Metadata table created when enabled by config & sync is called
@@ -565,8 +565,8 @@ public class TestHoodieBackedMetadata extends 
HoodieClientTestHarness {
       }
     }
 
-    HoodieTableMetaClient metadataMetaClient = new 
HoodieTableMetaClient(hadoopConf, metadataTableBasePath);
-    HoodieTableMetaClient datasetMetaClient = new 
HoodieTableMetaClient(hadoopConf, config.getBasePath());
+    HoodieTableMetaClient metadataMetaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build();
+    HoodieTableMetaClient datasetMetaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(config.getBasePath()).build();
     HoodieActiveTimeline metadataTimeline = 
metadataMetaClient.getActiveTimeline();
     // check that there are compactions.
     
assertTrue(metadataTimeline.getCommitTimeline().filterCompletedInstants().countInstants()
 > 0);
@@ -869,7 +869,7 @@ public class TestHoodieBackedMetadata extends 
HoodieClientTestHarness {
 
     // Metadata table should be in sync with the dataset
     assertTrue(metadata(client).isInSync());
-    HoodieTableMetaClient metadataMetaClient = new 
HoodieTableMetaClient(hadoopConf, metadataTableBasePath);
+    HoodieTableMetaClient metadataMetaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build();
 
     // Metadata table is MOR
     assertEquals(metadataMetaClient.getTableType(), 
HoodieTableType.MERGE_ON_READ, "Metadata Table should be MOR");
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java
index 0f6a150..b8bccbc 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java
@@ -88,7 +88,7 @@ public class CompactionTestBase extends HoodieClientTestBase {
    **/
   protected void validateDeltaCommit(String latestDeltaCommit, final 
Map<HoodieFileGroupId, Pair<String, HoodieCompactionOperation>> 
fgIdToCompactionOperation,
                                      HoodieWriteConfig cfg) {
-    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, 
cfg.getBasePath());
+    HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
     HoodieTable table = getHoodieTable(metaClient, cfg);
     List<FileSlice> fileSliceList = getCurrentLatestFileSlices(table);
     fileSliceList.forEach(fileSlice -> {
@@ -109,7 +109,7 @@ public class CompactionTestBase extends 
HoodieClientTestBase {
                                                    List<HoodieRecord> records, 
HoodieWriteConfig cfg, boolean insertFirst, List<String> 
expPendingCompactionInstants)
       throws Exception {
 
-    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, 
cfg.getBasePath());
+    HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
     List<Pair<String, HoodieCompactionPlan>> pendingCompactions = 
readClient.getPendingCompactions();
     List<String> gotPendingCompactionInstants =
         pendingCompactions.stream().map(pc -> 
pc.getKey()).sorted().collect(Collectors.toList());
@@ -131,7 +131,7 @@ public class CompactionTestBase extends 
HoodieClientTestBase {
         client.commit(firstInstant, statuses);
       }
       assertNoWriteErrors(statusList);
-      metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+      metaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
       HoodieTable hoodieTable = getHoodieTable(metaClient, cfg);
       List<HoodieBaseFile> dataFilesToRead = 
getCurrentLatestBaseFiles(hoodieTable);
       assertTrue(dataFilesToRead.stream().findAny().isPresent(),
@@ -142,7 +142,7 @@ public class CompactionTestBase extends 
HoodieClientTestBase {
     int numRecords = records.size();
     for (String instantTime : deltaInstants) {
       records = dataGen.generateUpdates(instantTime, numRecords);
-      metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+      metaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
       createNextDeltaCommit(instantTime, records, client, metaClient, cfg, 
false);
       validateDeltaCommit(instantTime, fgIdToCompactionOperation, cfg);
     }
@@ -150,7 +150,7 @@ public class CompactionTestBase extends 
HoodieClientTestBase {
   }
 
   protected void moveCompactionFromRequestedToInflight(String 
compactionInstantTime, HoodieWriteConfig cfg) {
-    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, 
cfg.getBasePath());
+    HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
     HoodieInstant compactionInstant = 
HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime);
     
metaClient.getActiveTimeline().transitionCompactionRequestedToInflight(compactionInstant);
     HoodieInstant instant = 
metaClient.getActiveTimeline().reload().filterPendingCompactionTimeline().getInstants()
@@ -160,7 +160,7 @@ public class CompactionTestBase extends 
HoodieClientTestBase {
 
   protected void scheduleCompaction(String compactionInstantTime, 
SparkRDDWriteClient client, HoodieWriteConfig cfg) {
     client.scheduleCompactionAtInstant(compactionInstantTime, Option.empty());
-    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, 
cfg.getBasePath());
+    HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
     HoodieInstant instant = 
metaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().get();
     assertEquals(compactionInstantTime, instant.getTimestamp(), "Last 
compaction instant must be the one set");
   }
@@ -192,7 +192,7 @@ public class CompactionTestBase extends 
HoodieClientTestBase {
     }
 
     // verify that there is a commit
-    table = getHoodieTable(new HoodieTableMetaClient(hadoopConf, 
cfg.getBasePath(), true), cfg);
+    table = 
getHoodieTable(HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).setLoadActiveTimelineOnLoad(true).build(),
 cfg);
     HoodieTimeline timeline = 
table.getMetaClient().getCommitTimeline().filterCompletedInstants();
     String latestCompactionCommitTime = 
timeline.lastInstant().get().getTimestamp();
     assertEquals(latestCompactionCommitTime, compactionInstantTime,
@@ -214,7 +214,7 @@ public class CompactionTestBase extends 
HoodieClientTestBase {
         "Compacted files should not show up in latest slices");
 
     // verify that there is a commit
-    table = getHoodieTable(new HoodieTableMetaClient(hadoopConf, 
cfg.getBasePath(), true), cfg);
+    table = 
getHoodieTable(HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).setLoadActiveTimelineOnLoad(true).build(),
 cfg);
     HoodieTimeline timeline = 
table.getMetaClient().getCommitTimeline().filterCompletedInstants();
     // verify compaction commit is visible in timeline
     assertTrue(timeline.filterCompletedInstants().getInstants()
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
index 08f9283..6e8326c 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
@@ -75,7 +75,7 @@ public class TestAsyncCompaction extends CompactionTestBase {
       // Schedule compaction but do not run them
       scheduleCompaction(compactionInstantTime, client, cfg);
 
-      HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, 
cfg.getBasePath());
+      HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
 
       HoodieInstant pendingCompactionInstant =
           
metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get();
@@ -86,12 +86,12 @@ public class TestAsyncCompaction extends CompactionTestBase 
{
       moveCompactionFromRequestedToInflight(compactionInstantTime, cfg);
 
       // Reload and rollback inflight compaction
-      metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+      metaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
       HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context, 
metaClient);
 
       client.rollbackInflightCompaction(
           new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, 
compactionInstantTime), hoodieTable);
-      metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+      metaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
       pendingCompactionInstant = 
metaClient.getCommitsAndCompactionTimeline().filterPendingCompactionTimeline()
           .getInstants().findFirst().get();
       assertEquals("compaction", pendingCompactionInstant.getAction());
@@ -129,10 +129,10 @@ public class TestAsyncCompaction extends 
CompactionTestBase {
       // Schedule compaction but do not run them
       scheduleCompaction(compactionInstantTime, client, cfg);
 
-      HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, 
cfg.getBasePath());
+      HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
       createNextDeltaCommit(inflightInstantTime, records, client, metaClient, 
cfg, true);
 
-      metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+      metaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
       HoodieInstant pendingCompactionInstant =
           
metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get();
       assertEquals(compactionInstantTime, 
pendingCompactionInstant.getTimestamp(),
@@ -145,7 +145,7 @@ public class TestAsyncCompaction extends CompactionTestBase 
{
       client.startCommitWithTime(nextInflightInstantTime);
 
       // Validate
-      metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+      metaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
       inflightInstant = 
metaClient.getActiveTimeline().filterPendingExcludingCompaction().firstInstant().get();
       assertEquals(inflightInstant.getTimestamp(), nextInflightInstantTime, 
"inflight instant has expected instant time");
       assertEquals(1, metaClient.getActiveTimeline()
@@ -177,7 +177,7 @@ public class TestAsyncCompaction extends CompactionTestBase 
{
           new ArrayList<>());
 
       // Schedule and mark compaction instant as inflight
-      HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, 
cfg.getBasePath());
+      HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
       HoodieTable hoodieTable = getHoodieTable(metaClient, cfg);
       scheduleCompaction(compactionInstantTime, client, cfg);
       moveCompactionFromRequestedToInflight(compactionInstantTime, cfg);
@@ -210,7 +210,7 @@ public class TestAsyncCompaction extends CompactionTestBase 
{
 
     // Schedule compaction but do not run them
     scheduleCompaction(compactionInstantTime, client, cfg);
-    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, 
cfg.getBasePath());
+    HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
     HoodieInstant pendingCompactionInstant =
         
metaClient.getActiveTimeline().filterPendingCompactionTimeline().firstInstant().get();
     assertEquals(compactionInstantTime, 
pendingCompactionInstant.getTimestamp(), "Pending Compaction instant has 
expected instant time");
@@ -239,10 +239,10 @@ public class TestAsyncCompaction extends 
CompactionTestBase {
     records = runNextDeltaCommits(client, readClient, 
Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true,
         new ArrayList<>());
 
-    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, 
cfg.getBasePath());
+    HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
     createNextDeltaCommit(inflightInstantTime, records, client, metaClient, 
cfg, true);
 
-    metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+    metaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
     HoodieInstant inflightInstant =
         
metaClient.getActiveTimeline().filterPendingExcludingCompaction().firstInstant().get();
     assertEquals(inflightInstantTime, inflightInstant.getTimestamp(), 
"inflight instant has expected instant time");
@@ -304,7 +304,7 @@ public class TestAsyncCompaction extends CompactionTestBase 
{
       runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime, 
secondInstantTime), records, cfg, true,
           new ArrayList<>());
 
-      HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, 
cfg.getBasePath());
+      HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
       HoodieTable hoodieTable = getHoodieTable(metaClient, cfg);
       scheduleAndExecuteCompaction(compactionInstantTime, client, hoodieTable, 
cfg, numRecs, false);
     }
@@ -328,7 +328,7 @@ public class TestAsyncCompaction extends CompactionTestBase 
{
       records = runNextDeltaCommits(client, readClient, 
Arrays.asList(firstInstantTime, secondInstantTime), records, cfg, true,
           new ArrayList<>());
 
-      HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, 
cfg.getBasePath());
+      HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
       HoodieTable hoodieTable = getHoodieTable(metaClient, cfg);
       scheduleCompaction(compactionInstantTime, client, cfg);
 
@@ -356,7 +356,7 @@ public class TestAsyncCompaction extends CompactionTestBase 
{
       runNextDeltaCommits(client, readClient, Arrays.asList(firstInstantTime, 
secondInstantTime), records, cfg, true,
           new ArrayList<>());
 
-      HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, 
cfg.getBasePath());
+      HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
       HoodieTable hoodieTable = getHoodieTable(metaClient, cfg);
       scheduleCompaction(compactionInstantTime, client, cfg);
       metaClient.reloadActiveTimeline();
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
index 80542ed..97d2875 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestInlineCompaction.java
@@ -57,7 +57,7 @@ public class TestInlineCompaction extends CompactionTestBase {
       HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
       List<String> instants = IntStream.range(0, 2).mapToObj(i -> 
HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList());
       runNextDeltaCommits(writeClient, readClient, instants, records, cfg, 
true, new ArrayList<>());
-      HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, 
cfg.getBasePath());
+      HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
 
       // Then: ensure no compaction is executedm since there are only 2 delta 
commits
       assertEquals(2, 
metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
@@ -76,12 +76,12 @@ public class TestInlineCompaction extends 
CompactionTestBase {
       runNextDeltaCommits(writeClient, readClient, instants, records, cfg, 
true, new ArrayList<>());
 
       // third commit, that will trigger compaction
-      HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, 
cfg.getBasePath());
+      HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
       String finalInstant = HoodieActiveTimeline.createNewInstantTime();
       createNextDeltaCommit(finalInstant, 
dataGen.generateUpdates(finalInstant, 100), writeClient, metaClient, cfg, 
false);
 
       // Then: ensure the file slices are compacted as per policy
-      metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+      metaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
       assertEquals(4, 
metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
       assertEquals(HoodieTimeline.COMMIT_ACTION, 
metaClient.getActiveTimeline().lastInstant().get().getAction());
     }
@@ -100,11 +100,11 @@ public class TestInlineCompaction extends 
CompactionTestBase {
 
       // after 10s, that will trigger compaction
       String finalInstant = HoodieActiveTimeline.createNewInstantTime(10000);
-      HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, 
cfg.getBasePath());
+      HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
       createNextDeltaCommit(finalInstant, 
dataGen.generateUpdates(finalInstant, 100), writeClient, metaClient, cfg, 
false);
 
       // Then: ensure the file slices are compacted as per policy
-      metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+      metaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
       assertEquals(3, 
metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
       assertEquals(HoodieTimeline.COMMIT_ACTION, 
metaClient.getActiveTimeline().lastInstant().get().getAction());
     }
@@ -121,17 +121,17 @@ public class TestInlineCompaction extends 
CompactionTestBase {
       runNextDeltaCommits(writeClient, readClient, instants, records, cfg, 
true, new ArrayList<>());
       // Then: trigger the compaction because reach 3 commits.
       String finalInstant = HoodieActiveTimeline.createNewInstantTime();
-      HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, 
cfg.getBasePath());
+      HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
       createNextDeltaCommit(finalInstant, 
dataGen.generateUpdates(finalInstant, 10), writeClient, metaClient, cfg, false);
 
-      metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+      metaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
       assertEquals(4, 
metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
       // 4th commit, that will trigger compaction because reach the time 
elapsed
-      metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+      metaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
       finalInstant = HoodieActiveTimeline.createNewInstantTime(20000);
       createNextDeltaCommit(finalInstant, 
dataGen.generateUpdates(finalInstant, 10), writeClient, metaClient, cfg, false);
 
-      metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+      metaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
       assertEquals(6, 
metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
     }
   }
@@ -145,16 +145,16 @@ public class TestInlineCompaction extends 
CompactionTestBase {
       HoodieReadClient readClient = getHoodieReadClient(cfg.getBasePath());
       List<String> instants = IntStream.range(0, 3).mapToObj(i -> 
HoodieActiveTimeline.createNewInstantTime()).collect(Collectors.toList());
       runNextDeltaCommits(writeClient, readClient, instants, records, cfg, 
true, new ArrayList<>());
-      HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, 
cfg.getBasePath());
+      HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
 
       // Then: ensure no compaction is executedm since there are only 3 delta 
commits
       assertEquals(3, 
metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
       // 4th commit, that will trigger compaction
-      metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+      metaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
       String finalInstant = HoodieActiveTimeline.createNewInstantTime(20000);
       createNextDeltaCommit(finalInstant, 
dataGen.generateUpdates(finalInstant, 10), writeClient, metaClient, cfg, false);
 
-      metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+      metaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
       assertEquals(5, 
metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
     }
   }
@@ -183,12 +183,12 @@ public class TestInlineCompaction extends 
CompactionTestBase {
     HoodieWriteConfig inlineCfg = getConfigForInlineCompaction(2, 60, 
CompactionTriggerStrategy.NUM_COMMITS);
     String instantTime3 = HoodieActiveTimeline.createNewInstantTime();
     try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(inlineCfg)) 
{
-      HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, 
cfg.getBasePath());
+      HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
       createNextDeltaCommit(instantTime3, 
dataGen.generateUpdates(instantTime3, 100), writeClient, metaClient, inlineCfg, 
false);
     }
 
     // Then: 1 delta commit is done, the failed compaction is retried
-    metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+    metaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
     assertEquals(4, 
metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
     assertEquals(instantTime2, 
metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().firstInstant().get().getTimestamp());
   }
@@ -218,13 +218,13 @@ public class TestInlineCompaction extends 
CompactionTestBase {
     HoodieWriteConfig inlineCfg = getConfigForInlineCompaction(5, 10, 
CompactionTriggerStrategy.TIME_ELAPSED);
     String instantTime2;
     try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(inlineCfg)) 
{
-      HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, 
cfg.getBasePath());
+      HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
       instantTime2 = HoodieActiveTimeline.createNewInstantTime();
       createNextDeltaCommit(instantTime2, 
dataGen.generateUpdates(instantTime2, 10), writeClient, metaClient, inlineCfg, 
false);
     }
 
     // Then: 1 delta commit is done, the failed compaction is retried
-    metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+    metaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
     assertEquals(4, 
metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
     assertEquals(instantTime, 
metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().firstInstant().get().getTimestamp());
   }
@@ -255,13 +255,13 @@ public class TestInlineCompaction extends 
CompactionTestBase {
     HoodieWriteConfig inlineCfg = getConfigForInlineCompaction(3, 20, 
CompactionTriggerStrategy.NUM_OR_TIME);
     String instantTime2;
     try (SparkRDDWriteClient<?> writeClient = getHoodieWriteClient(inlineCfg)) 
{
-      HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, 
cfg.getBasePath());
+      HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
       instantTime2 = HoodieActiveTimeline.createNewInstantTime();
       createNextDeltaCommit(instantTime2, 
dataGen.generateUpdates(instantTime2, 10), writeClient, metaClient, inlineCfg, 
false);
     }
 
     // Then: 1 delta commit is done, the failed compaction is retried
-    metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+    metaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(cfg.getBasePath()).build();
     assertEquals(4, 
metaClient.getActiveTimeline().getCommitsAndCompactionTimeline().countInstants());
     assertEquals(instantTime, 
metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().firstInstant().get().getTimestamp());
   }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
index 1caf9c0..1104631 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
@@ -220,7 +220,7 @@ public class HoodieClientTestBase extends 
HoodieClientTestHarness {
     return (commit, numRecords) -> {
       final SparkHoodieIndex index = SparkHoodieIndex.createIndex(writeConfig);
       List<HoodieRecord> records = recordGenFunction.apply(commit, numRecords);
-      final HoodieTableMetaClient metaClient = new 
HoodieTableMetaClient(hadoopConf, basePath, true);
+      final HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
       HoodieSparkTable table = HoodieSparkTable.create(writeConfig, context, 
metaClient);
       JavaRDD<HoodieRecord> taggedRecords = 
index.tagLocation(jsc.parallelize(records, 1), context, table);
       return taggedRecords.collect();
@@ -241,7 +241,7 @@ public class HoodieClientTestBase extends 
HoodieClientTestHarness {
     return (numRecords) -> {
       final SparkHoodieIndex index = SparkHoodieIndex.createIndex(writeConfig);
       List<HoodieKey> records = keyGenFunction.apply(numRecords);
-      final HoodieTableMetaClient metaClient = new 
HoodieTableMetaClient(hadoopConf, basePath, true);
+      final HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
       HoodieSparkTable table = HoodieSparkTable.create(writeConfig, context, 
metaClient);
       JavaRDD<HoodieRecord> recordsToDelete = jsc.parallelize(records, 1)
           .map(key -> new HoodieRecord(key, new EmptyHoodieRecordPayload()));
@@ -438,7 +438,7 @@ public class HoodieClientTestBase extends 
HoodieClientTestHarness {
     assertPartitionMetadataForRecords(records, fs);
 
     // verify that there is a commit
-    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, 
basePath);
+    HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
     HoodieTimeline timeline = new 
HoodieActiveTimeline(metaClient).getCommitTimeline();
 
     if (assertForCommit) {
@@ -506,7 +506,7 @@ public class HoodieClientTestBase extends 
HoodieClientTestHarness {
     assertPartitionMetadataForKeys(keysToDelete, fs);
 
     // verify that there is a commit
-    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, 
basePath);
+    HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
     HoodieTimeline timeline = new 
HoodieActiveTimeline(metaClient).getCommitTimeline();
 
     if (assertForCommit) {
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
index e6523af..f3febab 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
@@ -346,7 +346,7 @@ public abstract class HoodieClientTestHarness extends 
HoodieCommonTestHarness im
   }
 
   public HoodieTableMetaClient getHoodieMetaClient(Configuration conf, String 
basePath) {
-    metaClient = new HoodieTableMetaClient(conf, basePath);
+    metaClient = 
HoodieTableMetaClient.builder().setConf(conf).setBasePath(basePath).build();
     return metaClient;
   }
 
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
index c91b51b..55c5aa7 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java
@@ -151,7 +151,7 @@ public class HoodieClientTestUtils {
                                                 String... paths) {
     List<HoodieBaseFile> latestFiles = new ArrayList<>();
     try {
-      HoodieTableMetaClient metaClient = new 
HoodieTableMetaClient(fs.getConf(), basePath, true);
+      HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
       for (String path : paths) {
         BaseFileOnlyView fileSystemView = new 
HoodieTableFileSystemView(metaClient,
             metaClient.getCommitsTimeline().filterCompletedInstants(), 
fs.globStatus(new Path(path)));
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java
index 5633551..5b37b3b 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java
@@ -66,7 +66,7 @@ public class HoodieMergeOnReadTestUtils {
   public static List<GenericRecord> getRecordsUsingInputFormat(Configuration 
conf, List<String> inputPaths, String basePath, JobConf jobConf, boolean 
realtime, Schema rawSchema,
                                                                String 
rawHiveColumnTypes, boolean projectCols, List<String> projectedColumns) {
 
-    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(conf, 
basePath);
+    HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(conf).setBasePath(basePath).build();
     FileInputFormat inputFormat = 
HoodieInputFormatUtils.getInputFormat(metaClient.getTableConfig().getBaseFileFormat(),
 realtime, jobConf);
 
     Schema schema = HoodieAvroUtils.addMetadataFields(rawSchema);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index 32ed7d9..983678d 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -94,26 +94,7 @@ public class HoodieTableMetaClient implements Serializable {
   private HoodieArchivedTimeline archivedTimeline;
   private ConsistencyGuardConfig consistencyGuardConfig = 
ConsistencyGuardConfig.newBuilder().build();
 
-  public HoodieTableMetaClient(Configuration conf, String basePath) {
-    // Do not load any timeline by default
-    this(conf, basePath, false);
-  }
-
-  public HoodieTableMetaClient(Configuration conf, String basePath, String 
payloadClassName) {
-    this(conf, basePath, false, ConsistencyGuardConfig.newBuilder().build(), 
Option.of(TimelineLayoutVersion.CURR_LAYOUT_VERSION),
-        payloadClassName);
-  }
-
-  public HoodieTableMetaClient(Configuration conf, String basePath, boolean 
loadActiveTimelineOnLoad,
-                               ConsistencyGuardConfig consistencyGuardConfig, 
Option<TimelineLayoutVersion> layoutVersion) {
-    this(conf, basePath, loadActiveTimelineOnLoad, consistencyGuardConfig, 
layoutVersion, null);
-  }
-
-  public HoodieTableMetaClient(Configuration conf, String basePath, boolean 
loadActiveTimelineOnLoad) {
-    this(conf, basePath, loadActiveTimelineOnLoad, 
ConsistencyGuardConfig.newBuilder().build(), 
Option.of(TimelineLayoutVersion.CURR_LAYOUT_VERSION), null);
-  }
-
-  public HoodieTableMetaClient(Configuration conf, String basePath, boolean 
loadActiveTimelineOnLoad,
+  private HoodieTableMetaClient(Configuration conf, String basePath, boolean 
loadActiveTimelineOnLoad,
                                ConsistencyGuardConfig consistencyGuardConfig, 
Option<TimelineLayoutVersion> layoutVersion,
                                String payloadClassName) {
     LOG.info("Loading HoodieTableMetaClient from " + basePath);
@@ -152,9 +133,8 @@ public class HoodieTableMetaClient implements Serializable {
   public HoodieTableMetaClient() {}
 
   public static HoodieTableMetaClient reload(HoodieTableMetaClient 
oldMetaClient) {
-    return new HoodieTableMetaClient(oldMetaClient.hadoopConf.get(), 
oldMetaClient.basePath,
-        oldMetaClient.loadActiveTimelineOnLoad, 
oldMetaClient.consistencyGuardConfig,
-        Option.of(oldMetaClient.timelineLayoutVersion), null);
+    return 
HoodieTableMetaClient.builder().setConf(oldMetaClient.hadoopConf.get()).setBasePath(oldMetaClient.basePath).setLoadActiveTimelineOnLoad(oldMetaClient.loadActiveTimelineOnLoad)
+        
.setConsistencyGuardConfig(oldMetaClient.consistencyGuardConfig).setLayoutVersion(Option.of(oldMetaClient.timelineLayoutVersion)).setPayloadClassName(null).build();
   }
 
   /**
@@ -471,7 +451,7 @@ public class HoodieTableMetaClient implements Serializable {
     HoodieTableConfig.createHoodieProperties(fs, metaPathDir, props);
     // We should not use fs.getConf as this might be different from the 
original configuration
     // used to create the fs in unit tests
-    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, 
basePath);
+    HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
     LOG.info("Finished initializing Table of type " + 
metaClient.getTableConfig().getTableType() + " from " + basePath);
     return metaClient;
   }
@@ -645,4 +625,59 @@ public class HoodieTableMetaClient implements Serializable 
{
   public void setActiveTimeline(HoodieActiveTimeline activeTimeline) {
     this.activeTimeline = activeTimeline;
   }
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  /**
+   * Builder for {@link HoodieTableMetaClient}.
+   */
+  public static class Builder {
+
+    private Configuration conf;
+    private String basePath;
+    private boolean loadActiveTimelineOnLoad = false;
+    private String payloadClassName = null;
+    private ConsistencyGuardConfig consistencyGuardConfig = 
ConsistencyGuardConfig.newBuilder().build();
+    private Option<TimelineLayoutVersion> layoutVersion = 
Option.of(TimelineLayoutVersion.CURR_LAYOUT_VERSION);
+
+    public Builder setConf(Configuration conf) {
+      this.conf = conf;
+      return this;
+    }
+
+    public Builder setBasePath(String basePath) {
+      this.basePath = basePath;
+      return this;
+    }
+
+    public Builder setLoadActiveTimelineOnLoad(boolean 
loadActiveTimelineOnLoad) {
+      this.loadActiveTimelineOnLoad = loadActiveTimelineOnLoad;
+      return this;
+    }
+
+    public Builder setPayloadClassName(String payloadClassName) {
+      this.payloadClassName = payloadClassName;
+      return this;
+    }
+
+    public Builder setConsistencyGuardConfig(ConsistencyGuardConfig 
consistencyGuardConfig) {
+      this.consistencyGuardConfig = consistencyGuardConfig;
+      return this;
+    }
+
+    public Builder setLayoutVersion(Option<TimelineLayoutVersion> 
layoutVersion) {
+      this.layoutVersion = layoutVersion;
+      return this;
+    }
+
+    public HoodieTableMetaClient build() {
+      ValidationUtils.checkArgument(conf != null, "Configuration needs to be 
set to init HoodieTableMetaClient");
+      ValidationUtils.checkArgument(basePath != null, "basePath needs to be 
set to init HoodieTableMetaClient");
+      return new HoodieTableMetaClient(conf, basePath,
+          loadActiveTimelineOnLoad, consistencyGuardConfig, layoutVersion, 
payloadClassName);
+    }
+  }
+
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
index 6fb0a05..02874e6 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
@@ -109,7 +109,7 @@ public abstract class AbstractHoodieLogRecordScanner {
       String latestInstantTime, boolean readBlocksLazily, boolean 
reverseReader, int bufferSize) {
     this.readerSchema = readerSchema;
     this.latestInstantTime = latestInstantTime;
-    this.hoodieTableMetaClient = new HoodieTableMetaClient(fs.getConf(), 
basePath);
+    this.hoodieTableMetaClient = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(basePath).build();
     // load class from the payload fully qualified class name
     this.payloadClassFQN = 
this.hoodieTableMetaClient.getTableConfig().getPayloadClass();
     this.totalLogFiles.addAndGet(logFilePaths.size());
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java
index ffc4b85..fe159df 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java
@@ -64,7 +64,7 @@ public class LogReaderUtils {
 
   public static Schema readLatestSchemaFromLogFiles(String basePath, 
List<String> deltaFilePaths, Configuration config)
       throws IOException {
-    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(config, 
basePath);
+    HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(config).setBasePath(basePath).build();
     List<String> deltaPaths = deltaFilePaths.stream().map(s -> new 
HoodieLogFile(new Path(s)))
         .sorted(HoodieLogFile.getReverseLogFileComparator()).map(s -> 
s.getPath().toString())
         .collect(Collectors.toList());
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java
index 6f0e7d5..f614df5 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java
@@ -94,7 +94,7 @@ public class FileSystemViewManager {
    */
   public SyncableFileSystemView getFileSystemView(String basePath) {
     return globalViewMap.computeIfAbsent(basePath, (path) -> {
-      HoodieTableMetaClient metaClient = new 
HoodieTableMetaClient(conf.newCopy(), path);
+      HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(conf.newCopy()).setBasePath(path).build();
       return viewCreator.apply(metaClient, viewStorageConfig);
     });
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java 
b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
index c86b37e..b4143f3 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
@@ -71,7 +71,7 @@ public abstract class BaseTableMetadata implements 
HoodieTableMetadata {
     this.engineContext = engineContext;
     this.hadoopConf = new 
SerializableConfiguration(engineContext.getHadoopConf());
     this.datasetBasePath = datasetBasePath;
-    this.datasetMetaClient = new HoodieTableMetaClient(hadoopConf.get(), 
datasetBasePath);
+    this.datasetMetaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf.get()).setBasePath(datasetBasePath).build();
     this.spillableMapDirectory = spillableMapDirectory;
     this.metadataConfig = metadataConfig;
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index a34652c..3285606 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -93,7 +93,7 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
     if (enabled && this.metaClient == null) {
       this.metadataBasePath = 
HoodieTableMetadata.getMetadataTableBasePath(datasetBasePath);
       try {
-        this.metaClient = new HoodieTableMetaClient(hadoopConf.get(), 
metadataBasePath);
+        this.metaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf.get()).setBasePath(metadataBasePath).build();
         HoodieTableFileSystemView fsView = new 
HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline());
         latestFileSystemMetadataSlices = 
fsView.getLatestFileSlices(MetadataPartitionType.FILES.partitionPath()).collect(Collectors.toList());
       } catch (TableNotFoundException e) {
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java
index d7e3bde..3ed111c 100755
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java
@@ -114,9 +114,10 @@ public class TestHoodieActiveTimeline extends 
HoodieCommonTestHarness {
         metaClient.getArchivePath(), 
metaClient.getTableConfig().getPayloadClass(), VERSION_0);
     HoodieInstant instant6 = new HoodieInstant(State.REQUESTED, 
HoodieTimeline.COMPACTION_ACTION, "9");
     byte[] dummy = new byte[5];
-    HoodieActiveTimeline oldTimeline = new HoodieActiveTimeline(new 
HoodieTableMetaClient(metaClient.getHadoopConf(),
-        metaClient.getBasePath(), true, metaClient.getConsistencyGuardConfig(),
-        Option.of(new TimelineLayoutVersion(VERSION_0))));
+    HoodieActiveTimeline oldTimeline = new HoodieActiveTimeline(
+        
HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(metaClient.getBasePath())
+            
.setLoadActiveTimelineOnLoad(true).setConsistencyGuardConfig(metaClient.getConsistencyGuardConfig())
+            .setLayoutVersion(Option.of(new 
TimelineLayoutVersion(VERSION_0))).build());
     // Old Timeline writes both to aux and timeline folder
     oldTimeline.saveToCompactionRequested(instant6, Option.of(dummy));
     // Now use latest timeline version
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java
index 146e0bb..0bcebaf 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java
@@ -324,7 +324,7 @@ public class TestIncrementalFSViewSync extends 
HoodieCommonTestHarness {
     instantsToFiles = testMultipleWriteSteps(view1, 
Collections.singletonList("11"), true, "11");
 
     SyncableFileSystemView view2 =
-        getFileSystemView(new 
HoodieTableMetaClient(metaClient.getHadoopConf(), metaClient.getBasePath()));
+        
getFileSystemView(HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(metaClient.getBasePath()).build());
 
     // Run 2 more ingestion on MOR table. View1 is not yet synced but View2 is
     instantsToFiles.putAll(testMultipleWriteSteps(view2, Arrays.asList("12", 
"13"), true, "11"));
@@ -334,7 +334,7 @@ public class TestIncrementalFSViewSync extends 
HoodieCommonTestHarness {
 
     view2.sync();
     SyncableFileSystemView view3 =
-        getFileSystemView(new 
HoodieTableMetaClient(metaClient.getHadoopConf(), metaClient.getBasePath()));
+        
getFileSystemView(HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(metaClient.getBasePath()).build());
     view3.sync();
     areViewsConsistent(view1, view2, partitions.size() * 
fileIdsPerPartition.size());
 
@@ -346,7 +346,7 @@ public class TestIncrementalFSViewSync extends 
HoodieCommonTestHarness {
     view1.sync();
     areViewsConsistent(view1, view2, partitions.size() * 
fileIdsPerPartition.size());
     SyncableFileSystemView view4 =
-        getFileSystemView(new 
HoodieTableMetaClient(metaClient.getHadoopConf(), metaClient.getBasePath()));
+        
getFileSystemView(HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(metaClient.getBasePath()).build());
     view4.sync();
 
     /*
@@ -360,7 +360,7 @@ public class TestIncrementalFSViewSync extends 
HoodieCommonTestHarness {
     view1.sync();
     areViewsConsistent(view1, view2, partitions.size() * 
fileIdsPerPartition.size() * 2);
     SyncableFileSystemView view5 =
-        getFileSystemView(new 
HoodieTableMetaClient(metaClient.getHadoopConf(), metaClient.getBasePath()));
+        
getFileSystemView(HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(metaClient.getBasePath()).build());
     view5.sync();
 
     /*
@@ -383,7 +383,7 @@ public class TestIncrementalFSViewSync extends 
HoodieCommonTestHarness {
     view1.sync();
     areViewsConsistent(view1, view2, partitions.size() * 
fileIdsPerPartition.size() * 2);
     SyncableFileSystemView view6 =
-        getFileSystemView(new 
HoodieTableMetaClient(metaClient.getHadoopConf(), metaClient.getBasePath()));
+        
getFileSystemView(HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(metaClient.getBasePath()).build());
     view6.sync();
 
     /*
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/CompactionTestUtils.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/CompactionTestUtils.java
index 44e3da0..fb5f123 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/CompactionTestUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/CompactionTestUtils.java
@@ -110,7 +110,7 @@ public class CompactionTestUtils {
       }
     });
 
-    metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), 
metaClient.getBasePath(), true);
+    metaClient = 
HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(metaClient.getBasePath()).setLoadActiveTimelineOnLoad(true).build();
     Map<HoodieFileGroupId, Pair<String, HoodieCompactionOperation>> 
pendingCompactionMap =
         CompactionUtils.getAllPendingCompactionOperations(metaClient);
 
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
index bca91f8..2cca148 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
@@ -224,7 +224,7 @@ public class FileCreateUtils {
   public static Map<String, Long> getBaseFileCountsForPaths(String basePath, 
FileSystem fs, String... paths) {
     Map<String, Long> toReturn = new HashMap<>();
     try {
-      HoodieTableMetaClient metaClient = new 
HoodieTableMetaClient(fs.getConf(), basePath, true);
+      HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
       for (String path : paths) {
         TableFileSystemView.BaseFileOnlyView fileSystemView = new 
HoodieTableFileSystemView(metaClient,
             metaClient.getCommitsTimeline().filterCompletedInstants(), 
fs.globStatus(new org.apache.hadoop.fs.Path(path)));
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
index 25b2c8b..9738816 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
@@ -83,7 +83,7 @@ public class HoodieCommonTestHarness {
   }
 
   protected void refreshFsView() throws IOException {
-    metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), 
basePath, true);
+    metaClient = 
HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
   }
 
   protected SyncableFileSystemView getFileSystemView(HoodieTimeline timeline) 
throws IOException {
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java
index 35ff4cb..92c40c7 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestCompactionUtils.java
@@ -188,7 +188,7 @@ public class TestCompactionUtils extends 
HoodieCommonTestHarness {
     // schedule similar plan again so that there will be duplicates
     plan1.getOperations().get(0).setDataFilePath("bla");
     scheduleCompaction(metaClient, "005", plan1);
-    metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), 
basePath, true);
+    metaClient = 
HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
     assertThrows(IllegalStateException.class, () -> {
       CompactionUtils.getAllPendingCompactionOperations(metaClient);
     });
@@ -203,7 +203,7 @@ public class TestCompactionUtils extends 
HoodieCommonTestHarness {
     scheduleCompaction(metaClient, "003", plan2);
     // schedule same plan again so that there will be duplicates. It should 
not fail as it is a full duplicate
     scheduleCompaction(metaClient, "005", plan1);
-    metaClient = new HoodieTableMetaClient(metaClient.getHadoopConf(), 
basePath, true);
+    metaClient = 
HoodieTableMetaClient.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
     Map<HoodieFileGroupId, Pair<String, HoodieCompactionOperation>> res =
         CompactionUtils.getAllPendingCompactionOperations(metaClient);
   }
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
index da45fa6..d94018b 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
@@ -171,7 +171,7 @@ public class HoodieROTablePathFilter implements 
Configurable, PathFilter, Serial
         try {
           HoodieTableMetaClient metaClient = 
metaClientCache.get(baseDir.toString());
           if (null == metaClient) {
-            metaClient = new HoodieTableMetaClient(fs.getConf(), 
baseDir.toString(), true);
+            metaClient = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(baseDir.toString()).setLoadActiveTimelineOnLoad(true).build();
             metaClientCache.put(baseDir.toString(), metaClient);
           }
 
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
index 050b91a..f378f44 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
@@ -71,7 +71,7 @@ public abstract class AbstractRealtimeRecordReader {
   }
 
   private boolean usesCustomPayload() {
-    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jobConf, 
split.getBasePath());
+    HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(jobConf).setBasePath(split.getBasePath()).build();
     return 
!(metaClient.getTableConfig().getPayloadClass().contains(HoodieAvroPayload.class.getName())
         || 
metaClient.getTableConfig().getPayloadClass().contains("org.apache.hudi.OverwriteWithLatestAvroPayload"));
   }
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
index 9f98136..eac9f4d 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
@@ -324,7 +324,7 @@ public class HoodieInputFormatUtils {
     }
     Path baseDir = HoodieHiveUtils.getNthParent(dataPath, levels);
     LOG.info("Reading hoodie metadata from path " + baseDir.toString());
-    return new HoodieTableMetaClient(fs.getConf(), baseDir.toString());
+    return 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(baseDir.toString()).build();
   }
 
   public static FileStatus getFileStatus(HoodieBaseFile baseFile) throws 
IOException {
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/CompactNode.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/CompactNode.java
index 7c9090d..8043066 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/CompactNode.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/CompactNode.java
@@ -45,8 +45,9 @@ public class CompactNode extends 
DagNode<JavaRDD<WriteStatus>> {
    */
   @Override
   public void execute(ExecutionContext executionContext, int curItrCount) 
throws Exception {
-    HoodieTableMetaClient metaClient = new 
HoodieTableMetaClient(executionContext.getHoodieTestSuiteWriter().getConfiguration(),
-        executionContext.getHoodieTestSuiteWriter().getCfg().targetBasePath);
+    HoodieTableMetaClient metaClient =
+        
HoodieTableMetaClient.builder().setConf(executionContext.getHoodieTestSuiteWriter().getConfiguration()).setBasePath(executionContext.getHoodieTestSuiteWriter().getCfg().targetBasePath)
+            .build();
     Option<HoodieInstant> lastInstant = metaClient.getActiveTimeline()
         
.getCommitsAndCompactionTimeline().filterPendingCompactionTimeline().lastInstant();
     if (lastInstant.isPresent()) {
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/RollbackNode.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/RollbackNode.java
index 1824cb8..c8cb628 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/RollbackNode.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/RollbackNode.java
@@ -49,8 +49,9 @@ public class RollbackNode extends 
DagNode<Option<HoodieInstant>> {
     log.info("Executing rollback node {}", this.getName());
     // Can only be done with an instantiation of a new WriteClient hence 
cannot be done during DeltaStreamer
     // testing for now
-    HoodieTableMetaClient metaClient = new 
HoodieTableMetaClient(executionContext.getHoodieTestSuiteWriter().getConfiguration(),
-        executionContext.getHoodieTestSuiteWriter().getCfg().targetBasePath);
+    HoodieTableMetaClient metaClient =
+        
HoodieTableMetaClient.builder().setConf(executionContext.getHoodieTestSuiteWriter().getConfiguration()).setBasePath(executionContext.getHoodieTestSuiteWriter().getCfg().targetBasePath)
+            .build();
     Option<HoodieInstant> lastInstant = 
metaClient.getActiveTimeline().getCommitsTimeline().lastInstant();
     if (lastInstant.isPresent()) {
       log.info("Rolling back last instant {}", lastInstant.get());
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ScheduleCompactNode.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ScheduleCompactNode.java
index c54b25a..62bf9b0 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ScheduleCompactNode.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ScheduleCompactNode.java
@@ -41,8 +41,9 @@ public class ScheduleCompactNode extends 
DagNode<Option<String>> {
     // testing for now
     // Find the last commit and extra the extra metadata to be passed to the 
schedule compaction. This is
     // done to ensure the CHECKPOINT is correctly passed from commit to commit
-    HoodieTableMetaClient metaClient = new 
HoodieTableMetaClient(executionContext.getHoodieTestSuiteWriter().getConfiguration(),
-        executionContext.getHoodieTestSuiteWriter().getCfg().targetBasePath);
+    HoodieTableMetaClient metaClient =
+        
HoodieTableMetaClient.builder().setConf(executionContext.getHoodieTestSuiteWriter().getConfiguration()).setBasePath(executionContext.getHoodieTestSuiteWriter().getCfg().targetBasePath)
+            .build();
     Option<HoodieInstant> lastInstant = 
metaClient.getActiveTimeline().getCommitsTimeline().lastInstant();
     if (lastInstant.isPresent()) {
       HoodieCommitMetadata metadata = 
org.apache.hudi.common.model.HoodieCommitMetadata.fromBytes(metaClient
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
index 136aa27..1acbc90 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
@@ -37,7 +37,6 @@ import org.apache.hudi.config.HoodieMemoryConfig;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.avro.AvroParquetReader;
 import org.apache.parquet.avro.AvroReadSupport;
@@ -80,7 +79,7 @@ public class DFSHoodieDatasetInputReader extends 
DFSDeltaInputReader {
   public DFSHoodieDatasetInputReader(JavaSparkContext jsc, String basePath, 
String schemaStr) {
     this.jsc = jsc;
     this.schemaStr = schemaStr;
-    this.metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), 
basePath);
+    this.metaClient = 
HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build();
   }
 
   protected List<String> getPartitions(Option<Integer> partitionsLimit) throws 
IOException {
diff --git 
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/job/TestHoodieTestSuiteJob.java
 
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/job/TestHoodieTestSuiteJob.java
index b14ef18..6232b1d 100644
--- 
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/job/TestHoodieTestSuiteJob.java
+++ 
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/job/TestHoodieTestSuiteJob.java
@@ -173,7 +173,7 @@ public class TestHoodieTestSuiteJob extends 
UtilitiesTestBase {
     cfg.workloadDagGenerator = ComplexDagGenerator.class.getName();
     HoodieTestSuiteJob hoodieTestSuiteJob = new HoodieTestSuiteJob(cfg, jsc);
     hoodieTestSuiteJob.runTestSuite();
-    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(new 
Configuration(), cfg.targetBasePath);
+    HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(new 
Configuration()).setBasePath(cfg.targetBasePath).build();
     
assertEquals(metaClient.getActiveTimeline().getCommitsTimeline().getInstants().count(),
 2);
   }
 
@@ -192,7 +192,7 @@ public class TestHoodieTestSuiteJob extends 
UtilitiesTestBase {
     }
     HoodieTestSuiteJob hoodieTestSuiteJob = new HoodieTestSuiteJob(cfg, jsc);
     hoodieTestSuiteJob.runTestSuite();
-    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(new 
Configuration(), cfg.targetBasePath);
+    HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(new 
Configuration()).setBasePath(cfg.targetBasePath).build();
     
assertEquals(metaClient.getActiveTimeline().getCommitsTimeline().getInstants().count(),
 1);
   }
 
@@ -207,7 +207,7 @@ public class TestHoodieTestSuiteJob extends 
UtilitiesTestBase {
     cfg.workloadYamlPath = dfsBasePath + "/" + COW_DAG_FILE_NAME;
     HoodieTestSuiteJob hoodieTestSuiteJob = new HoodieTestSuiteJob(cfg, jsc);
     hoodieTestSuiteJob.runTestSuite();
-    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(new 
Configuration(), cfg.targetBasePath);
+    HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(new 
Configuration()).setBasePath(cfg.targetBasePath).build();
     
//assertEquals(metaClient.getActiveTimeline().getCommitsTimeline().getInstants().count(),
 5);
   }
 
@@ -222,7 +222,7 @@ public class TestHoodieTestSuiteJob extends 
UtilitiesTestBase {
     cfg.workloadYamlPath = dfsBasePath + "/" + MOR_DAG_FILE_NAME;
     HoodieTestSuiteJob hoodieTestSuiteJob = new HoodieTestSuiteJob(cfg, jsc);
     hoodieTestSuiteJob.runTestSuite();
-    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(new 
Configuration(), cfg.targetBasePath);
+    HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(new 
Configuration()).setBasePath(cfg.targetBasePath).build();
     
//assertEquals(metaClient.getActiveTimeline().getCommitsTimeline().getInstants().count(),
 7);
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java
 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java
index b40d36b..7b04a65 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java
@@ -63,7 +63,7 @@ public class DataSourceInternalWriterHelper {
     this.writeClient  = new SparkRDDWriteClient<>(new 
HoodieSparkEngineContext(new JavaSparkContext(sparkSession.sparkContext())), 
writeConfig, true);
     writeClient.setOperationType(operationType);
     writeClient.startCommitWithTime(instantTime);
-    this.metaClient = new HoodieTableMetaClient(configuration, 
writeConfig.getBasePath());
+    this.metaClient = 
HoodieTableMetaClient.builder().setConf(configuration).setBasePath(writeConfig.getBasePath()).build();
     this.hoodieTable = HoodieSparkTable.create(writeConfig, new 
HoodieSparkEngineContext(new JavaSparkContext(sparkSession.sparkContext())), 
metaClient);
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java
 
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java
index 734e0c0..ce80b52 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java
@@ -70,7 +70,7 @@ public class HoodieDataSourceHelpers {
    */
   @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
   public static HoodieTimeline allCompletedCommitsCompactions(FileSystem fs, 
String basePath) {
-    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs.getConf(), 
basePath, true);
+    HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
     if (metaClient.getTableType().equals(HoodieTableType.MERGE_ON_READ)) {
       return metaClient.getActiveTimeline().getTimelineOfActions(
           CollectionUtils.createSet(HoodieActiveTimeline.COMMIT_ACTION,
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
index ef9bf8c..3299b8f 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -84,7 +84,7 @@ class DefaultSource extends RelationProvider
     val tablePath = DataSourceUtils.getTablePath(fs, globPaths.toArray)
     log.info("Obtained hudi table path: " + tablePath)
 
-    val metaClient = new HoodieTableMetaClient(fs.getConf, tablePath)
+    val metaClient = 
HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(tablePath).build()
     val isBootstrappedTable = 
metaClient.getTableConfig.getBootstrapBasePath.isPresent
     log.info("Is bootstrapped table => " + isBootstrappedTable)
 
@@ -104,7 +104,7 @@ class DefaultSource extends RelationProvider
     } else 
if(parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)) {
       getBaseFileOnlyView(sqlContext, parameters, schema, readPaths, 
isBootstrappedTable, globPaths, metaClient)
     } else if 
(parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_INCREMENTAL_OPT_VAL)) {
-      val metaClient = new HoodieTableMetaClient(fs.getConf, tablePath)
+      val metaClient = 
HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(tablePath).build()
       if (metaClient.getTableType.equals(HoodieTableType.MERGE_ON_READ)) {
         new MergeOnReadIncrementalRelation(sqlContext, optParams, schema, 
metaClient)
       } else {
@@ -202,8 +202,8 @@ class DefaultSource extends RelationProvider
     if (path.isEmpty || path.get == null) {
       throw new HoodieException(s"'path'  must be specified.")
     }
-    val metaClient = new HoodieTableMetaClient(
-      sqlContext.sparkSession.sessionState.newHadoopConf(), path.get)
+    val metaClient = HoodieTableMetaClient.builder().setConf(
+      
sqlContext.sparkSession.sessionState.newHadoopConf()).setBasePath(path.get).build()
     val schemaResolver = new TableSchemaResolver(metaClient)
     val sqlSchema =
       try {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index ad185cb..f5ba6c8 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -500,7 +500,8 @@ private[hudi] object HoodieSparkSqlWriter {
                                    hoodieTableConfigOpt: 
Option[HoodieTableConfig]): HoodieTableConfig = {
     if (tableExists) {
       hoodieTableConfigOpt.getOrElse(
-        new HoodieTableMetaClient(sparkContext.hadoopConfiguration, 
tablePath).getTableConfig)
+        
HoodieTableMetaClient.builder().setConf(sparkContext.hadoopConfiguration).setBasePath(tablePath)
+          .build().getTableConfig)
     } else {
       null
     }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
index 846212d..f9a799e 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
@@ -175,8 +175,8 @@ class HoodieStreamingSink(sqlContext: SQLContext,
       }))
 
       // First time, scan .hoodie folder and get all pending compactions
-      val metaClient = new 
HoodieTableMetaClient(sqlContext.sparkContext.hadoopConfiguration,
-        client.getConfig.getBasePath)
+      val metaClient = 
HoodieTableMetaClient.builder().setConf(sqlContext.sparkContext.hadoopConfiguration)
+        .setBasePath(client.getConfig.getBasePath).build()
       val pendingInstants :java.util.List[HoodieInstant] =
         CompactionUtils.getPendingCompactionInstantTimes(metaClient)
       pendingInstants.foreach((h : HoodieInstant) => 
asyncCompactorService.enqueuePendingCompaction(h))
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
index c17598a..b0e3163 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
@@ -59,7 +59,7 @@ class HoodieStreamSource(
     val fs = path.getFileSystem(hadoopConf)
     TablePathUtils.getTablePath(fs, path).get()
   }
-  private lazy val metaClient = new HoodieTableMetaClient(hadoopConf, 
tablePath.toString)
+  private lazy val metaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(tablePath.toString).build()
   private lazy val tableType = metaClient.getTableType
 
   @transient private var lastOffset: HoodieSourceOffset = _
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java 
b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java
index 1df12a3..3b55434 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java
@@ -187,7 +187,7 @@ public class HoodieJavaStreamingApp {
       executor.shutdownNow();
     }
 
-    HoodieTableMetaClient metaClient = new 
HoodieTableMetaClient(jssc.hadoopConfiguration(), tablePath);
+    HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(jssc.hadoopConfiguration()).setBasePath(tablePath).build();
     if (tableType.equals(HoodieTableType.MERGE_ON_READ.name())) {
       // Ensure we have successfully completed one compaction commit
       
ValidationUtils.checkArgument(metaClient.getActiveTimeline().getCommitTimeline().getInstants().count()
 == 1);
@@ -249,7 +249,7 @@ public class HoodieJavaStreamingApp {
         if (timeline.countInstants() >= numCommits) {
           return;
         }
-        HoodieTableMetaClient metaClient = new 
HoodieTableMetaClient(fs.getConf(), tablePath, true);
+        HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).setLoadActiveTimelineOnLoad(true).build();
         System.out.println("Instants :" + 
metaClient.getActiveTimeline().getInstants().collect(Collectors.toList()));
       } catch (TableNotFoundException te) {
         LOG.info("Got table not found exception. Retrying");
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index 8fdb02d..856cc00 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -210,7 +210,8 @@ class TestCOWDataSource extends HoodieClientTestBase {
       .mode(SaveMode.Append)
       .save(basePath)
 
-    val metaClient = new 
HoodieTableMetaClient(spark.sparkContext.hadoopConfiguration, basePath, true)
+    val metaClient = 
HoodieTableMetaClient.builder().setConf(spark.sparkContext.hadoopConfiguration).setBasePath(basePath)
+    .setLoadActiveTimelineOnLoad(true).build();
     val commits = 
metaClient.getActiveTimeline.filterCompletedInstants().getInstants.toArray
       .map(instant => (instant.asInstanceOf[HoodieInstant]).getAction)
     assertEquals(2, commits.size)
@@ -235,7 +236,8 @@ class TestCOWDataSource extends HoodieClientTestBase {
       .mode(SaveMode.Overwrite)
       .save(basePath)
 
-    val metaClient = new 
HoodieTableMetaClient(spark.sparkContext.hadoopConfiguration, basePath, true)
+    val metaClient = 
HoodieTableMetaClient.builder().setConf(spark.sparkContext.hadoopConfiguration).setBasePath(basePath)
+    .setLoadActiveTimelineOnLoad(true).build()
     val commits = 
metaClient.getActiveTimeline.filterCompletedInstants().getInstants.toArray
       .map(instant => (instant.asInstanceOf[HoodieInstant]).getAction)
     assertEquals(2, commits.size)
@@ -289,7 +291,8 @@ class TestCOWDataSource extends HoodieClientTestBase {
     val filterSecondPartitionCount = recordsForPartitionColumn.filter(row => 
row.get(0).equals(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH)).size
     assertEquals(7, filterSecondPartitionCount)
 
-    val metaClient = new 
HoodieTableMetaClient(spark.sparkContext.hadoopConfiguration, basePath, true)
+    val metaClient = 
HoodieTableMetaClient.builder().setConf(spark.sparkContext.hadoopConfiguration).setBasePath(basePath)
+    .setLoadActiveTimelineOnLoad(true).build()
     val commits = 
metaClient.getActiveTimeline.filterCompletedInstants().getInstants.toArray
       .map(instant => instant.asInstanceOf[HoodieInstant].getAction)
     assertEquals(3, commits.size)
@@ -339,7 +342,8 @@ class TestCOWDataSource extends HoodieClientTestBase {
     val filterSecondPartitionCount = recordsForPartitionColumn.filter(row => 
row.get(0).equals(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH)).size
     assertEquals(7, filterSecondPartitionCount)
 
-    val metaClient = new 
HoodieTableMetaClient(spark.sparkContext.hadoopConfiguration, basePath, true)
+    val metaClient = 
HoodieTableMetaClient.builder().setConf(spark.sparkContext.hadoopConfiguration).setBasePath(basePath)
+    .setLoadActiveTimelineOnLoad(true).build()
     val commits = 
metaClient.getActiveTimeline.filterCompletedInstants().getInstants.toArray
       .map(instant => instant.asInstanceOf[HoodieInstant].getAction)
     assertEquals(2, commits.size)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
index 08e1c82..49dff4d 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
@@ -177,7 +177,8 @@ class TestStructuredStreaming extends HoodieClientTestBase {
         numInstants = timeline.countInstants
         success = true
       }
-      val metaClient = new HoodieTableMetaClient(fs.getConf, tablePath, true)
+      val metaClient = 
HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(tablePath)
+      .setLoadActiveTimelineOnLoad(true).build()
     } catch {
       case te: TableNotFoundException =>
         log.info("Got table not found exception. Retrying")
@@ -253,12 +254,14 @@ class TestStructuredStreaming extends 
HoodieClientTestBase {
       if (HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, 
destPath).getCompletedReplaceTimeline().countInstants() > 0) {
         assertEquals(3, HoodieDataSourceHelpers.listCommitsSince(fs, destPath, 
"000").size())
         // check have at least one file group
-        this.metaClient = new HoodieTableMetaClient(fs.getConf, destPath, true)
+        this.metaClient = 
HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(destPath)
+        .setLoadActiveTimelineOnLoad(true).build()
         assertTrue(getLatestFileGroupsFileId(partitionOfRecords).size > 0)
       } else {
         assertEquals(currNumCommits, 
HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").size())
         // check have more than one file group
-        this.metaClient = new HoodieTableMetaClient(fs.getConf, destPath, true)
+        this.metaClient = 
HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(destPath)
+        .setLoadActiveTimelineOnLoad(true).build()
         assertTrue(getLatestFileGroupsFileId(partitionOfRecords).size > 1)
       }
 
diff --git 
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java
 
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java
index 8d03252..8477ed6 100644
--- 
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java
+++ 
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java
@@ -53,7 +53,7 @@ public abstract class AbstractSyncHoodieClient {
 
   public AbstractSyncHoodieClient(String basePath, boolean 
assumeDatePartitioning, boolean useFileListingFromMetadata,
                                   boolean verifyMetadataFileListing, 
FileSystem fs) {
-    this.metaClient = new HoodieTableMetaClient(fs.getConf(), basePath, true);
+    this.metaClient = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
     this.tableType = metaClient.getTableType();
     this.basePath = basePath;
     this.assumeDatePartitioning = assumeDatePartitioning;
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HiveIncrementalPuller.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HiveIncrementalPuller.java
index 7d356a5..a3570b1 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HiveIncrementalPuller.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HiveIncrementalPuller.java
@@ -276,7 +276,7 @@ public class HiveIncrementalPuller {
     if (!fs.exists(new Path(targetDataPath)) || !fs.exists(new 
Path(targetDataPath + "/.hoodie"))) {
       return "0";
     }
-    HoodieTableMetaClient metadata = new HoodieTableMetaClient(fs.getConf(), 
targetDataPath);
+    HoodieTableMetaClient metadata = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(targetDataPath).build();
 
     Option<HoodieInstant> lastCommit =
         
metadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant();
@@ -309,7 +309,7 @@ public class HiveIncrementalPuller {
   }
 
   private String getLastCommitTimePulled(FileSystem fs, String 
sourceTableLocation) {
-    HoodieTableMetaClient metadata = new HoodieTableMetaClient(fs.getConf(), 
sourceTableLocation);
+    HoodieTableMetaClient metadata = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(sourceTableLocation).build();
     List<String> commitsToSync = 
metadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants()
         .findInstantsAfter(config.fromCommitTime, 
config.maxCommits).getInstants().map(HoodieInstant::getTimestamp)
         .collect(Collectors.toList());
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
index 394771c..44328d3 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java
@@ -137,7 +137,7 @@ public class HoodieClusteringJob {
   }
 
   private String getSchemaFromLatestInstant() throws Exception {
-    HoodieTableMetaClient metaClient =  new 
HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.basePath, true);
+    HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(cfg.basePath).setLoadActiveTimelineOnLoad(true).build();
     TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
     if 
(metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants()
 == 0) {
       throw new HoodieException("Cannot run clustering without any completed 
commits");
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactionAdminTool.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactionAdminTool.java
index d3e4dba..d7642c4 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactionAdminTool.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactionAdminTool.java
@@ -60,7 +60,7 @@ public class HoodieCompactionAdminTool {
    * Executes one of compaction admin operations.
    */
   public void run(JavaSparkContext jsc) throws Exception {
-    HoodieTableMetaClient metaClient = new 
HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.basePath);
+    HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(cfg.basePath).build();
     try (CompactionAdminClient admin = new CompactionAdminClient(new 
HoodieSparkEngineContext(jsc), cfg.basePath)) {
       final FileSystem fs = FSUtils.getFs(cfg.basePath, 
jsc.hadoopConfiguration());
       if (cfg.outputPath != null && fs.exists(new Path(cfg.outputPath))) {
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java
index ece9b8c..72d1dbd 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java
@@ -83,7 +83,7 @@ public class HoodieSnapshotCopier implements Serializable {
                        final boolean verifyMetadataFileListing) throws 
IOException {
     FileSystem fs = FSUtils.getFs(baseDir, jsc.hadoopConfiguration());
     final SerializableConfiguration serConf = new 
SerializableConfiguration(jsc.hadoopConfiguration());
-    final HoodieTableMetaClient tableMetadata = new 
HoodieTableMetaClient(fs.getConf(), baseDir);
+    final HoodieTableMetaClient tableMetadata = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(baseDir).build();
     final BaseFileOnlyView fsView = new 
HoodieTableFileSystemView(tableMetadata,
         
tableMetadata.getActiveTimeline().getCommitsAndCompactionTimeline().filterCompletedInstants());
     HoodieEngineContext context = new HoodieSparkEngineContext(jsc);
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
index b9f32cb..8e792a0 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
@@ -148,7 +148,7 @@ public class HoodieSnapshotExporter {
   }
 
   private Option<String> getLatestCommitTimestamp(FileSystem fs, Config cfg) {
-    final HoodieTableMetaClient tableMetadata = new 
HoodieTableMetaClient(fs.getConf(), cfg.sourceBasePath);
+    final HoodieTableMetaClient tableMetadata = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(cfg.sourceBasePath).build();
     Option<HoodieInstant> latestCommit = 
tableMetadata.getActiveTimeline().getCommitsAndCompactionTimeline()
         .filterCompletedInstants().lastInstant();
     return latestCommit.isPresent() ? 
Option.of(latestCommit.get().getTimestamp()) : Option.empty();
@@ -259,7 +259,7 @@ public class HoodieSnapshotExporter {
 
   private BaseFileOnlyView getBaseFileOnlyView(JavaSparkContext jsc, Config 
cfg) {
     FileSystem fs = FSUtils.getFs(cfg.sourceBasePath, 
jsc.hadoopConfiguration());
-    HoodieTableMetaClient tableMetadata = new 
HoodieTableMetaClient(fs.getConf(), cfg.sourceBasePath);
+    HoodieTableMetaClient tableMetadata = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(cfg.sourceBasePath).build();
     return new HoodieTableFileSystemView(tableMetadata, tableMetadata
         
.getActiveTimeline().getCommitsAndCompactionTimeline().filterCompletedInstants());
   }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/InitialCheckpointFromAnotherHoodieTimelineProvider.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/InitialCheckpointFromAnotherHoodieTimelineProvider.java
index 17058da..e2554c4 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/InitialCheckpointFromAnotherHoodieTimelineProvider.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/InitialCheckpointFromAnotherHoodieTimelineProvider.java
@@ -44,7 +44,7 @@ public class 
InitialCheckpointFromAnotherHoodieTimelineProvider extends InitialC
   @Override
   public void init(Configuration config) throws HoodieException {
     super.init(config);
-    this.anotherDsHoodieMetaclient = new HoodieTableMetaClient(config, 
path.toString());
+    this.anotherDsHoodieMetaclient = 
HoodieTableMetaClient.builder().setConf(config).setBasePath(path.toString()).build();
   }
 
   @Override
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 9eb5c9d..4c494dc 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -223,8 +223,7 @@ public class DeltaSync implements Serializable {
    */
   public void refreshTimeline() throws IOException {
     if (fs.exists(new Path(cfg.targetBasePath))) {
-      HoodieTableMetaClient meta = new HoodieTableMetaClient(new 
Configuration(fs.getConf()), cfg.targetBasePath,
-          cfg.payloadClassName);
+      HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(new 
Configuration(fs.getConf())).setBasePath(cfg.targetBasePath).setPayloadClassName(cfg.payloadClassName).build();
       switch (meta.getTableType()) {
         case COPY_ON_WRITE:
           this.commitTimelineOpt = 
Option.of(meta.getActiveTimeline().getCommitTimeline().filterCompletedInstants());
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
index 10d9453..6e3a024 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
@@ -528,7 +528,7 @@ public class HoodieDeltaStreamer implements Serializable {
 
       if (fs.exists(new Path(cfg.targetBasePath))) {
         HoodieTableMetaClient meta =
-            new HoodieTableMetaClient(new Configuration(fs.getConf()), 
cfg.targetBasePath, false);
+            HoodieTableMetaClient.builder().setConf(new 
Configuration(fs.getConf())).setBasePath(cfg.targetBasePath).setLoadActiveTimelineOnLoad(false).build();
         tableType = meta.getTableType();
         // This will guarantee there is no surprise with table type
         
ValidationUtils.checkArgument(tableType.equals(HoodieTableType.valueOf(cfg.tableType)),
@@ -636,7 +636,7 @@ public class HoodieDeltaStreamer implements Serializable {
           asyncCompactService = Option.ofNullable(new 
SparkAsyncCompactService(new HoodieSparkEngineContext(jssc), writeClient));
           // Enqueue existing pending compactions first
           HoodieTableMetaClient meta =
-              new HoodieTableMetaClient(new 
Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath, true);
+              HoodieTableMetaClient.builder().setConf(new 
Configuration(jssc.hadoopConfiguration())).setBasePath(cfg.targetBasePath).setLoadActiveTimelineOnLoad(true).build();
           List<HoodieInstant> pending = 
CompactionUtils.getPendingCompactionInstantTimes(meta);
           pending.forEach(hoodieInstant -> 
asyncCompactService.get().enqueuePendingCompaction(hoodieInstant));
           asyncCompactService.get().start((error) -> {
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java
index d296f0e..184d7c7 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java
@@ -100,7 +100,7 @@ public class TimelineServerPerf implements Serializable {
       this.hostAddr = cfg.serverHost;
     }
 
-    HoodieTableMetaClient metaClient = new 
HoodieTableMetaClient(timelineServer.getConf(), cfg.basePath, true);
+    HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(timelineServer.getConf()).setBasePath(cfg.basePath).setLoadActiveTimelineOnLoad(true).build();
     SyncableFileSystemView fsView = new 
RemoteHoodieTableFileSystemView(this.hostAddr, cfg.serverPort, metaClient);
 
     String reportDir = cfg.reportDir;
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java
index 96dc648..efe3a86 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java
@@ -58,7 +58,7 @@ public class IncrSourceHelper {
       int numInstantsPerFetch, Option<String> beginInstant, boolean 
readLatestOnMissingBeginInstant) {
     ValidationUtils.checkArgument(numInstantsPerFetch > 0,
         "Make sure the config 
hoodie.deltastreamer.source.hoodieincr.num_instants is set to a positive 
value");
-    HoodieTableMetaClient srcMetaClient = new 
HoodieTableMetaClient(jssc.hadoopConfiguration(), srcBasePath, true);
+    HoodieTableMetaClient srcMetaClient = 
HoodieTableMetaClient.builder().setConf(jssc.hadoopConfiguration()).setBasePath(srcBasePath).setLoadActiveTimelineOnLoad(true).build();
 
     final HoodieTimeline activeCommitTimeline =
         
srcMetaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants();
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
index eeef8ed..616d039 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
@@ -363,7 +363,7 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
     }
 
     static void assertAtleastNCompactionCommits(int minExpected, String 
tablePath, FileSystem fs) {
-      HoodieTableMetaClient meta = new HoodieTableMetaClient(fs.getConf(), 
tablePath);
+      HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build();
       HoodieTimeline timeline = 
meta.getActiveTimeline().getCommitTimeline().filterCompletedInstants();
       LOG.info("Timeline Instants=" + 
meta.getActiveTimeline().getInstants().collect(Collectors.toList()));
       int numCompactionCommits = (int) timeline.getInstants().count();
@@ -371,7 +371,7 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
     }
 
     static void assertAtleastNDeltaCommits(int minExpected, String tablePath, 
FileSystem fs) {
-      HoodieTableMetaClient meta = new HoodieTableMetaClient(fs.getConf(), 
tablePath);
+      HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build();
       HoodieTimeline timeline = 
meta.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants();
       LOG.info("Timeline Instants=" + 
meta.getActiveTimeline().getInstants().collect(Collectors.toList()));
       int numDeltaCommits = (int) timeline.getInstants().count();
@@ -380,7 +380,7 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
 
     static String assertCommitMetadata(String expected, String tablePath, 
FileSystem fs, int totalCommits)
         throws IOException {
-      HoodieTableMetaClient meta = new HoodieTableMetaClient(fs.getConf(), 
tablePath);
+      HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build();
       HoodieTimeline timeline = 
meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
       HoodieInstant lastInstant = timeline.lastInstant().get();
       HoodieCommitMetadata commitMetadata =
@@ -408,7 +408,7 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
     }
 
     static void assertAtLeastNCommits(int minExpected, String tablePath, 
FileSystem fs) {
-      HoodieTableMetaClient meta = new HoodieTableMetaClient(fs.getConf(), 
tablePath);
+      HoodieTableMetaClient meta = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(tablePath).build();
       HoodieTimeline timeline = 
meta.getActiveTimeline().filterCompletedInstants();
       LOG.info("Timeline Instants=" + 
meta.getActiveTimeline().getInstants().collect(Collectors.toList()));
       int numDeltaCommits = (int) timeline.getInstants().count();
@@ -683,13 +683,13 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
     cfg.configs.add(String.format("%s=%s", 
HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMIT_PROP, "2"));
     HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
     deltaStreamerTestRunner(ds, cfg, (r) -> {
-      HoodieTableMetaClient metaClient =  new 
HoodieTableMetaClient(this.dfs.getConf(), tableBasePath, true);
+      HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(this.dfs.getConf()).setBasePath(tableBasePath).setLoadActiveTimelineOnLoad(true).build();
       int pendingReplaceSize = 
metaClient.getActiveTimeline().filterPendingReplaceTimeline().getInstants().toArray().length;
       int completeReplaceSize = 
metaClient.getActiveTimeline().getCompletedReplaceTimeline().getInstants().toArray().length;
       LOG.info("PendingReplaceSize=" + pendingReplaceSize + 
",completeReplaceSize = " + completeReplaceSize);
       return completeReplaceSize > 0;
     });
-    HoodieTableMetaClient metaClient =  new 
HoodieTableMetaClient(this.dfs.getConf(), tableBasePath, true);
+    HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(this.dfs.getConf()).setBasePath(tableBasePath).setLoadActiveTimelineOnLoad(true).build();
     assertEquals(1, 
metaClient.getActiveTimeline().getCompletedReplaceTimeline().getInstants().toArray().length);
   }
 
@@ -739,13 +739,13 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
       } else {
         LOG.warn("Schedule clustering failed");
       }
-      HoodieTableMetaClient metaClient =  new 
HoodieTableMetaClient(this.dfs.getConf(), tableBasePath, true);
+      HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(this.dfs.getConf()).setBasePath(tableBasePath).setLoadActiveTimelineOnLoad(true).build();
       int pendingReplaceSize = 
metaClient.getActiveTimeline().filterPendingReplaceTimeline().getInstants().toArray().length;
       int completeReplaceSize = 
metaClient.getActiveTimeline().getCompletedReplaceTimeline().getInstants().toArray().length;
       System.out.println("PendingReplaceSize=" + pendingReplaceSize + 
",completeReplaceSize = " + completeReplaceSize);
       return completeReplaceSize > 0;
     });
-    HoodieTableMetaClient metaClient =  new 
HoodieTableMetaClient(this.dfs.getConf(), tableBasePath, true);
+    HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(this.dfs.getConf()).setBasePath(tableBasePath).setLoadActiveTimelineOnLoad(true).build();
     assertEquals(1, 
metaClient.getActiveTimeline().getCompletedReplaceTimeline().getInstants().toArray().length);
   }
 
@@ -921,7 +921,7 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
     assertEquals(1000, counts.get(1).getLong(1));
 
     // Test with empty commits
-    HoodieTableMetaClient mClient = new 
HoodieTableMetaClient(jsc.hadoopConfiguration(), tableBasePath, true);
+    HoodieTableMetaClient mClient = 
HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(tableBasePath).setLoadActiveTimelineOnLoad(true).build();
     HoodieInstant lastFinished = 
mClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get();
     HoodieDeltaStreamer.Config cfg2 = 
TestHelpers.makeDropAllConfig(tableBasePath, WriteOperationType.UPSERT);
     cfg2.filterDupes = false;
@@ -930,7 +930,7 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
     cfg2.configs.add(String.format("%s=false", 
HoodieCompactionConfig.AUTO_CLEAN_PROP));
     HoodieDeltaStreamer ds2 = new HoodieDeltaStreamer(cfg2, jsc);
     ds2.sync();
-    mClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), 
tableBasePath, true);
+    mClient = 
HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(tableBasePath).setLoadActiveTimelineOnLoad(true).build();
     HoodieInstant newLastFinished = 
mClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get();
     
assertTrue(HoodieTimeline.compareTimestamps(newLastFinished.getTimestamp(), 
HoodieTimeline.GREATER_THAN, lastFinished.getTimestamp()
     ));

Reply via email to