This is an automated email from the ASF dual-hosted git repository.

vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new bc883db  [HUDI-1636] Support Builder Pattern To Build Table Properties 
For HoodieTableConfig (#2596)
bc883db is described below

commit bc883db5de5832fa429bbb04a35d3606fdacdb2a
Author: pengzhiwei <[email protected]>
AuthorDate: Fri Mar 5 14:10:27 2021 +0800

    [HUDI-1636] Support Builder Pattern To Build Table Properties For 
HoodieTableConfig (#2596)
---
 .../org/apache/hudi/cli/commands/TableCommand.java |  12 +-
 .../metadata/HoodieBackedTableMetadataWriter.java  |  11 +-
 .../TestHoodieClientOnCopyOnWriteStorage.java      |  23 +-
 .../java/org/apache/hudi/client/TestMultiFS.java   |  14 +-
 .../hudi/client/TestTableSchemaEvolution.java      |  15 +-
 .../hudi/testutils/FunctionalTestHarness.java      |  12 +-
 .../hudi/common/table/HoodieTableMetaClient.java   | 276 +++++++++++++--------
 .../table/timeline/TestHoodieActiveTimeline.java   |   8 +-
 .../hudi/common/testutils/HoodieTestUtils.java     |   9 +-
 .../java/HoodieJavaWriteClientExample.java         |   7 +-
 .../examples/spark/HoodieWriteClientExample.java   |   7 +-
 .../java/org/apache/hudi/util/StreamerUtil.java    |  16 +-
 .../hudi/integ/testsuite/HoodieTestSuiteJob.java   |   8 +-
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |  26 +-
 .../hudi/functional/TestStreamingSource.scala      |  14 +-
 .../apache/hudi/hive/testutils/HiveTestUtil.java   |  22 +-
 .../apache/hudi/utilities/HDFSParquetImporter.java |   8 +-
 .../utilities/deltastreamer/BootstrapExecutor.java |  14 +-
 .../hudi/utilities/deltastreamer/DeltaSync.java    |  20 +-
 .../functional/TestHoodieSnapshotExporter.java     |   9 +-
 .../utilities/testutils/UtilitiesTestBase.java     |   7 +-
 21 files changed, 341 insertions(+), 197 deletions(-)

diff --git 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java
index 168de26..d25e0c8 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java
@@ -22,7 +22,6 @@ import org.apache.hudi.cli.HoodieCLI;
 import org.apache.hudi.cli.HoodiePrintHelper;
 import org.apache.hudi.cli.TableHeader;
 import org.apache.hudi.common.fs.ConsistencyGuardConfig;
-import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.exception.TableNotFoundException;
 
@@ -106,10 +105,13 @@ public class TableCommand implements CommandMarker {
       throw new IllegalStateException("Table already existing in path : " + 
path);
     }
 
-    final HoodieTableType tableType = HoodieTableType.valueOf(tableTypeStr);
-    HoodieTableMetaClient.initTableType(HoodieCLI.conf, path, tableType, name, 
archiveFolder,
-        payloadClass, layoutVersion);
-
+    HoodieTableMetaClient.withPropertyBuilder()
+      .setTableType(tableTypeStr)
+      .setTableName(name)
+      .setArchiveLogFolder(archiveFolder)
+      .setPayloadClassName(payloadClass)
+      .setTimelineLayoutVersion(layoutVersion)
+      .initTable(HoodieCLI.conf, path);
     // Now connect to ensure loading works
     return connect(path, layoutVersion, false, 0, 0, 0);
   }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 6629410..dbd678f 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -288,9 +288,14 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
     String createInstantTime = 
latestInstant.map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
     LOG.info("Creating a new metadata table in " + 
metadataWriteConfig.getBasePath() + " at instant " + createInstantTime);
 
-    HoodieTableMetaClient.initTableType(hadoopConf.get(), 
metadataWriteConfig.getBasePath(),
-        HoodieTableType.MERGE_ON_READ, tableName, "archived", 
HoodieMetadataPayload.class.getName(),
-        HoodieFileFormat.HFILE.toString());
+    HoodieTableMetaClient.withPropertyBuilder()
+      .setTableType(HoodieTableType.MERGE_ON_READ)
+      .setTableName(tableName)
+      .setArchiveLogFolder("archived")
+      .setPayloadClassName(HoodieMetadataPayload.class.getName())
+      .setBaseFileFormat(HoodieFileFormat.HFILE.toString())
+      .initTable(hadoopConf.get(), metadataWriteConfig.getBasePath());
+
     initTableMetadata();
 
     // List all partitions in the basePath of the containing dataset
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
index 9383854..41ee4b2 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
@@ -349,9 +349,11 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
     HoodieWriteConfig hoodieWriteConfig = 
getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY)
         .withProps(config.getProps()).withTimelineLayoutVersion(
         VERSION_0).build();
-    HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(), 
metaClient.getBasePath(), metaClient.getTableType(),
-        metaClient.getTableConfig().getTableName(), 
metaClient.getArchivePath(),
-        metaClient.getTableConfig().getPayloadClass(), VERSION_0);
+    HoodieTableMetaClient.withPropertyBuilder()
+      .fromMetaClient(metaClient)
+      .setTimelineLayoutVersion(VERSION_0)
+      .initTable(metaClient.getHadoopConf(), metaClient.getBasePath());
+
     SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig);
 
     // Write 1 (only inserts)
@@ -493,10 +495,11 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
     HoodieWriteConfig hoodieWriteConfig = getConfigBuilder()
         
.withProps(config.getProps()).withMergeAllowDuplicateOnInserts(true).withTimelineLayoutVersion(
             VERSION_0).build();
+    HoodieTableMetaClient.withPropertyBuilder()
+      .fromMetaClient(metaClient)
+      .setTimelineLayoutVersion(VERSION_0)
+      .initTable(metaClient.getHadoopConf(), metaClient.getBasePath());
 
-    HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(), 
metaClient.getBasePath(), metaClient.getTableType(),
-        metaClient.getTableConfig().getTableName(), 
metaClient.getArchivePath(),
-        metaClient.getTableConfig().getPayloadClass(), VERSION_0);
     SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig);
 
     // Write 1 (only inserts)
@@ -629,9 +632,11 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
             .withBloomIndexUpdatePartitionPath(true)
             .withGlobalSimpleIndexUpdatePartitionPath(true)
             .build()).withTimelineLayoutVersion(VERSION_0).build();
-    HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(), 
metaClient.getBasePath(),
-        metaClient.getTableType(), metaClient.getTableConfig().getTableName(), 
metaClient.getArchivePath(),
-        metaClient.getTableConfig().getPayloadClass(), VERSION_0);
+
+    HoodieTableMetaClient.withPropertyBuilder()
+      .fromMetaClient(metaClient)
+      .setTimelineLayoutVersion(VERSION_0)
+      .initTable(metaClient.getHadoopConf(), metaClient.getBasePath());
     // Set rollback to LAZY so no inflights are deleted
     
hoodieWriteConfig.getProps().put(HoodieCompactionConfig.FAILED_WRITES_CLEANER_POLICY_PROP,
         HoodieFailedWritesCleaningPolicy.LAZY.name());
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiFS.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiFS.java
index b0bd54f..83761c9 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiFS.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestMultiFS.java
@@ -75,8 +75,11 @@ public class TestMultiFS extends HoodieClientTestHarness {
   @Test
   public void readLocalWriteHDFS() throws Exception {
     // Initialize table and filesystem
-    HoodieTableMetaClient.initTableType(hadoopConf, dfsBasePath, 
HoodieTableType.valueOf(tableType),
-        tableName, HoodieAvroPayload.class.getName());
+    HoodieTableMetaClient.withPropertyBuilder()
+      .setTableType(tableType)
+      .setTableName(tableName)
+      .setPayloadClass(HoodieAvroPayload.class)
+      .initTable(hadoopConf, dfsBasePath);
 
     // Create write client to write some records in
     HoodieWriteConfig cfg = getHoodieWriteConfig(dfsBasePath);
@@ -100,8 +103,11 @@ public class TestMultiFS extends HoodieClientTestHarness {
       assertEquals(readRecords.count(), records.size(), "Should contain 100 
records");
 
       // Write to local
-      HoodieTableMetaClient.initTableType(hadoopConf, tablePath, 
HoodieTableType.valueOf(tableType),
-          tableName, HoodieAvroPayload.class.getName());
+      HoodieTableMetaClient.withPropertyBuilder()
+        .setTableType(tableType)
+        .setTableName(tableName)
+        .setPayloadClass(HoodieAvroPayload.class)
+        .initTable(hadoopConf, tablePath);
 
       String writeCommitTime = localWriteClient.startCommit();
       LOG.info("Starting write commit " + writeCommitTime);
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
index 9bcacc9..7065247 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
@@ -150,9 +150,11 @@ public class TestTableSchemaEvolution extends 
HoodieClientTestBase {
     tableType = HoodieTableType.MERGE_ON_READ;
 
     // Create the table
-    HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(), 
metaClient.getBasePath(),
-        HoodieTableType.MERGE_ON_READ, 
metaClient.getTableConfig().getTableName(),
-        metaClient.getArchivePath(), 
metaClient.getTableConfig().getPayloadClass(), VERSION_1);
+    HoodieTableMetaClient.withPropertyBuilder()
+      .fromMetaClient(metaClient)
+      .setTableType(HoodieTableType.MERGE_ON_READ)
+      .setTimelineLayoutVersion(VERSION_1)
+      .initTable(metaClient.getHadoopConf(), metaClient.getBasePath());
 
     HoodieWriteConfig hoodieWriteConfig = getWriteConfig(TRIP_EXAMPLE_SCHEMA);
     SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig);
@@ -295,9 +297,10 @@ public class TestTableSchemaEvolution extends 
HoodieClientTestBase {
   @Test
   public void testCopyOnWriteTable() throws Exception {
     // Create the table
-    HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(), 
metaClient.getBasePath(),
-        HoodieTableType.COPY_ON_WRITE, 
metaClient.getTableConfig().getTableName(),
-        metaClient.getArchivePath(), 
metaClient.getTableConfig().getPayloadClass(), VERSION_1);
+    HoodieTableMetaClient.withPropertyBuilder()
+      .fromMetaClient(metaClient)
+      .setTimelineLayoutVersion(VERSION_1)
+      .initTable(metaClient.getHadoopConf(), metaClient.getBasePath());
 
     HoodieWriteConfig hoodieWriteConfig = getWriteConfig(TRIP_EXAMPLE_SCHEMA);
     SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig);
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/FunctionalTestHarness.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/FunctionalTestHarness.java
index 1db1b7f..fc02e6d 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/FunctionalTestHarness.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/FunctionalTestHarness.java
@@ -24,7 +24,6 @@ import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieAvroPayload;
-import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.testutils.minicluster.HdfsTestService;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -117,10 +116,13 @@ public class FunctionalTestHarness implements 
SparkProvider, DFSProvider, Hoodie
 
   @Override
   public HoodieTableMetaClient getHoodieMetaClient(Configuration hadoopConf, 
String basePath, Properties props) throws IOException {
-    props.putIfAbsent(HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP_NAME, 
PARQUET.toString());
-    props.putIfAbsent(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, 
RAW_TRIPS_TEST_NAME);
-    props.putIfAbsent(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, 
COPY_ON_WRITE.name());
-    props.putIfAbsent(HoodieTableConfig.HOODIE_PAYLOAD_CLASS_PROP_NAME, 
HoodieAvroPayload.class.getName());
+    props = HoodieTableMetaClient.withPropertyBuilder()
+      .setTableName(RAW_TRIPS_TEST_NAME)
+      .setTableType(COPY_ON_WRITE)
+      .setPayloadClass(HoodieAvroPayload.class)
+      .setBaseFileFormat(PARQUET.toString())
+      .fromProperties(props)
+      .build();
     return HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, 
basePath, props);
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index 8aa0a3d..5de3b9a 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.fs.FailSafeConsistencyGuard;
 import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
 import org.apache.hudi.common.fs.NoOpConsistencyGuard;
+import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
@@ -310,112 +311,6 @@ public class HoodieTableMetaClient implements 
Serializable {
   }
 
   /**
-   * Helper method to initialize a table, with given basePath, tableType, 
name, archiveFolder, payloadClass and
-   * base file format.
-   */
-  public static HoodieTableMetaClient initTableTypeWithBootstrap(Configuration 
hadoopConf, String basePath, HoodieTableType tableType,
-                                                                 String 
tableName, String archiveLogFolder, String payloadClassName,
-                                                                 String 
baseFileFormat, String preCombineField, String bootstrapIndexClass,
-                                                                 String 
bootstrapBasePath) throws IOException {
-    return initTableType(hadoopConf, basePath, tableType, tableName,
-        archiveLogFolder, payloadClassName, null,
-      baseFileFormat, preCombineField, bootstrapIndexClass, bootstrapBasePath);
-  }
-
-  public static HoodieTableMetaClient initTableTypeWithBootstrap(Configuration 
hadoopConf, String basePath, HoodieTableType tableType,
-                                                                 String 
tableName, String archiveLogFolder, String payloadClassName,
-                                                                 String 
baseFileFormat, String bootstrapIndexClass,
-                                                                 String 
bootstrapBasePath) throws IOException {
-    return initTableType(hadoopConf, basePath, tableType, tableName,
-      archiveLogFolder, payloadClassName, null,
-      baseFileFormat, null, bootstrapIndexClass, bootstrapBasePath);
-  }
-
-  public static HoodieTableMetaClient initTableType(Configuration hadoopConf, 
String basePath, HoodieTableType tableType,
-                                                    String tableName, String 
archiveLogFolder, String payloadClassName,
-                                                    String baseFileFormat, 
String preCombineField) throws IOException {
-    return initTableType(hadoopConf, basePath, tableType, tableName,
-        archiveLogFolder, payloadClassName, null, baseFileFormat, 
preCombineField,
-       null, null);
-  }
-
-  public static HoodieTableMetaClient initTableType(Configuration hadoopConf, 
String basePath, HoodieTableType tableType,
-                                                    String tableName, String 
archiveLogFolder, String payloadClassName,
-                                                    String baseFileFormat) 
throws IOException {
-    return initTableType(hadoopConf, basePath, tableType, tableName,
-      archiveLogFolder, payloadClassName, null, baseFileFormat, null,
-      null, null);
-  }
-
-  /**
-   * Used primarily by tests, examples.
-   */
-  public static HoodieTableMetaClient initTableType(Configuration hadoopConf, 
String basePath, HoodieTableType tableType,
-                                                    String tableName, String 
payloadClassName, String preCombineField) throws IOException {
-    return initTableType(hadoopConf, basePath, tableType, tableName, null, 
payloadClassName,
-        null, preCombineField);
-  }
-
-  public static HoodieTableMetaClient initTableType(Configuration hadoopConf, 
String basePath, HoodieTableType tableType,
-                                                    String tableName, String 
payloadClassName) throws IOException {
-    return initTableType(hadoopConf, basePath, tableType, tableName, null, 
payloadClassName,
-      null, (String) null);
-  }
-
-  public static HoodieTableMetaClient initTableType(Configuration hadoopConf, 
String basePath, HoodieTableType tableType,
-                                                    String tableName, String 
archiveLogFolder, String payloadClassName,
-                                                    String preCombineField, 
Integer timelineLayoutVersion) throws IOException {
-    return initTableType(hadoopConf, basePath, tableType, tableName, 
archiveLogFolder, payloadClassName,
-        timelineLayoutVersion, null, preCombineField, null, null);
-  }
-
-  public static HoodieTableMetaClient initTableType(Configuration hadoopConf, 
String basePath, HoodieTableType tableType,
-                                                    String tableName, String 
archiveLogFolder, String payloadClassName,
-                                                    Integer 
timelineLayoutVersion) throws IOException {
-    return initTableType(hadoopConf, basePath, tableType, tableName, 
archiveLogFolder, payloadClassName,
-      timelineLayoutVersion, null, null, null, null);
-  }
-
-  private static HoodieTableMetaClient initTableType(Configuration hadoopConf, 
String basePath, HoodieTableType tableType,
-                                                     String tableName, String 
archiveLogFolder, String payloadClassName,
-                                                     Integer 
timelineLayoutVersion,
-                                                     String baseFileFormat, 
String preCombineField,
-                                                     String 
bootstrapIndexClass, String bootstrapBasePath) throws IOException {
-    Properties properties = new Properties();
-    properties.setProperty(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, 
tableName);
-    properties.setProperty(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, 
tableType.name());
-    properties.setProperty(HoodieTableConfig.HOODIE_TABLE_VERSION_PROP_NAME, 
String.valueOf(HoodieTableVersion.current().versionCode()));
-    if (tableType == HoodieTableType.MERGE_ON_READ && payloadClassName != 
null) {
-      properties.setProperty(HoodieTableConfig.HOODIE_PAYLOAD_CLASS_PROP_NAME, 
payloadClassName);
-    }
-
-    if (null != archiveLogFolder) {
-      properties.put(HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP_NAME, 
archiveLogFolder);
-    }
-
-    if (null != timelineLayoutVersion) {
-      properties.put(HoodieTableConfig.HOODIE_TIMELINE_LAYOUT_VERSION, 
String.valueOf(timelineLayoutVersion));
-    }
-
-    if (null != baseFileFormat) {
-      
properties.setProperty(HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP_NAME, 
baseFileFormat.toUpperCase());
-    }
-
-    if (null != bootstrapIndexClass) {
-      properties.put(HoodieTableConfig.HOODIE_BOOTSTRAP_INDEX_CLASS_PROP_NAME, 
bootstrapIndexClass);
-    }
-
-    if (null != bootstrapBasePath) {
-      properties.put(HoodieTableConfig.HOODIE_BOOTSTRAP_BASE_PATH, 
bootstrapBasePath);
-    }
-
-    if (null != preCombineField) {
-      properties.put(HoodieTableConfig.HOODIE_TABLE_PRECOMBINE_FIELD, 
preCombineField);
-    }
-    return HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, 
basePath, properties);
-  }
-
-  /**
    * Helper method to initialize a given path as a hoodie table with configs 
passed in as as Properties.
    *
    * @return Instance of HoodieTableMetaClient
@@ -688,4 +583,173 @@ public class HoodieTableMetaClient implements 
Serializable {
     }
   }
 
+  public static PropertyBuilder withPropertyBuilder() {
+    return new PropertyBuilder();
+  }
+
+  public static class PropertyBuilder {
+
+    private HoodieTableType tableType;
+    private String tableName;
+    private String archiveLogFolder;
+    private String payloadClassName;
+    private Integer timelineLayoutVersion;
+    private String baseFileFormat;
+    private String preCombineField;
+    private String bootstrapIndexClass;
+    private String bootstrapBasePath;
+
+    private PropertyBuilder() {
+
+    }
+
+    public PropertyBuilder setTableType(HoodieTableType tableType) {
+      this.tableType = tableType;
+      return this;
+    }
+
+    public PropertyBuilder setTableType(String tableType) {
+      return setTableType(HoodieTableType.valueOf(tableType));
+    }
+
+    public PropertyBuilder setTableName(String tableName) {
+      this.tableName = tableName;
+      return this;
+    }
+
+    public PropertyBuilder setArchiveLogFolder(String archiveLogFolder) {
+      this.archiveLogFolder = archiveLogFolder;
+      return this;
+    }
+
+    public PropertyBuilder setPayloadClassName(String payloadClassName) {
+      this.payloadClassName = payloadClassName;
+      return this;
+    }
+
+    public PropertyBuilder setPayloadClass(Class<? extends 
HoodieRecordPayload> payloadClass) {
+      return setPayloadClassName(payloadClass.getName());
+    }
+
+    public PropertyBuilder setTimelineLayoutVersion(Integer 
timelineLayoutVersion) {
+      this.timelineLayoutVersion = timelineLayoutVersion;
+      return this;
+    }
+
+    public PropertyBuilder setBaseFileFormat(String baseFileFormat) {
+      this.baseFileFormat = baseFileFormat;
+      return this;
+    }
+
+    public PropertyBuilder setPreCombineField(String preCombineField) {
+      this.preCombineField = preCombineField;
+      return this;
+    }
+
+    public PropertyBuilder setBootstrapIndexClass(String bootstrapIndexClass) {
+      this.bootstrapIndexClass = bootstrapIndexClass;
+      return this;
+    }
+
+    public PropertyBuilder setBootstrapBasePath(String bootstrapBasePath) {
+      this.bootstrapBasePath = bootstrapBasePath;
+      return this;
+    }
+
+    public PropertyBuilder fromMetaClient(HoodieTableMetaClient metaClient) {
+      return setTableType(metaClient.getTableType())
+        .setTableName(metaClient.getTableConfig().getTableName())
+        .setArchiveLogFolder(metaClient.getArchivePath())
+        .setPayloadClassName(metaClient.getTableConfig().getPayloadClass());
+    }
+
+    public PropertyBuilder fromProperties(Properties properties) {
+      if 
(properties.containsKey(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME)) {
+        
setTableName(properties.getProperty(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME));
+      }
+      if 
(properties.containsKey(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME)) {
+        
setTableType(properties.getProperty(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME));
+      }
+      if 
(properties.containsKey(HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP_NAME)) {
+        setArchiveLogFolder(
+            
properties.getProperty(HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP_NAME));
+      }
+      if 
(properties.containsKey(HoodieTableConfig.HOODIE_PAYLOAD_CLASS_PROP_NAME)) {
+        setPayloadClassName(
+            
properties.getProperty(HoodieTableConfig.HOODIE_PAYLOAD_CLASS_PROP_NAME));
+      }
+      if 
(properties.containsKey(HoodieTableConfig.HOODIE_TIMELINE_LAYOUT_VERSION)) {
+        setTimelineLayoutVersion(Integer
+            
.parseInt(properties.getProperty(HoodieTableConfig.HOODIE_TIMELINE_LAYOUT_VERSION)));
+      }
+      if 
(properties.containsKey(HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP_NAME)) {
+        setBaseFileFormat(
+            
properties.getProperty(HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP_NAME));
+      }
+      if 
(properties.containsKey(HoodieTableConfig.HOODIE_BOOTSTRAP_INDEX_CLASS_PROP_NAME))
 {
+        setBootstrapIndexClass(
+            
properties.getProperty(HoodieTableConfig.HOODIE_BOOTSTRAP_INDEX_CLASS_PROP_NAME));
+      }
+      if 
(properties.containsKey(HoodieTableConfig.HOODIE_BOOTSTRAP_BASE_PATH)) {
+        
setBootstrapBasePath(properties.getProperty(HoodieTableConfig.HOODIE_BOOTSTRAP_BASE_PATH));
+      }
+      if 
(properties.containsKey(HoodieTableConfig.HOODIE_TABLE_PRECOMBINE_FIELD)) {
+        
setPreCombineField(properties.getProperty(HoodieTableConfig.HOODIE_TABLE_PRECOMBINE_FIELD));
+      }
+      return this;
+    }
+
+    public Properties build() {
+      ValidationUtils.checkArgument(tableType != null, "tableType is null");
+      ValidationUtils.checkArgument(tableName != null, "tableName is null");
+
+      Properties properties = new Properties();
+      properties.setProperty(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, 
tableName);
+      properties.setProperty(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, 
tableType.name());
+      properties.setProperty(HoodieTableConfig.HOODIE_TABLE_VERSION_PROP_NAME,
+          String.valueOf(HoodieTableVersion.current().versionCode()));
+      if (tableType == HoodieTableType.MERGE_ON_READ && payloadClassName != 
null) {
+        
properties.setProperty(HoodieTableConfig.HOODIE_PAYLOAD_CLASS_PROP_NAME, 
payloadClassName);
+      }
+
+      if (null != archiveLogFolder) {
+        properties.put(HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP_NAME, 
archiveLogFolder);
+      }
+
+      if (null != timelineLayoutVersion) {
+        properties.put(HoodieTableConfig.HOODIE_TIMELINE_LAYOUT_VERSION,
+            String.valueOf(timelineLayoutVersion));
+      }
+
+      if (null != baseFileFormat) {
+        
properties.setProperty(HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP_NAME,
+            baseFileFormat.toUpperCase());
+      }
+
+      if (null != bootstrapIndexClass) {
+        properties
+          .put(HoodieTableConfig.HOODIE_BOOTSTRAP_INDEX_CLASS_PROP_NAME, 
bootstrapIndexClass);
+      }
+
+      if (null != bootstrapBasePath) {
+        properties.put(HoodieTableConfig.HOODIE_BOOTSTRAP_BASE_PATH, 
bootstrapBasePath);
+      }
+
+      if (null != preCombineField) {
+        properties.put(HoodieTableConfig.HOODIE_TABLE_PRECOMBINE_FIELD, 
preCombineField);
+      }
+      return properties;
+    }
+
+    /**
+     * Init Table with the properties build by this builder.
+     *
+     * @param configuration The hadoop config.
+     * @param basePath The base path for hoodie table.
+     */
+    public HoodieTableMetaClient initTable(Configuration configuration, String 
basePath)
+        throws IOException {
+      return HoodieTableMetaClient.initTableAndGetMetaClient(configuration, 
basePath, build());
+    }
+  }
 }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java
index d80de8e..5c4c911 100755
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java
@@ -109,9 +109,11 @@ public class TestHoodieActiveTimeline extends 
HoodieCommonTestHarness {
         "Check the instants stream");
 
     // Backwards compatibility testing for reading compaction plans
-    metaClient = 
HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(),
-        metaClient.getBasePath(), metaClient.getTableType(), 
metaClient.getTableConfig().getTableName(),
-        metaClient.getArchivePath(), 
metaClient.getTableConfig().getPayloadClass(), VERSION_0);
+    metaClient = HoodieTableMetaClient.withPropertyBuilder()
+      .fromMetaClient(metaClient)
+      .setTimelineLayoutVersion(VERSION_0)
+      .initTable(metaClient.getHadoopConf(), metaClient.getBasePath());
+
     HoodieInstant instant6 = new HoodieInstant(State.REQUESTED, 
HoodieTimeline.COMPACTION_ACTION, "9");
     byte[] dummy = new byte[5];
     HoodieActiveTimeline oldTimeline = new HoodieActiveTimeline(
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
index d94f41f..d5d5cb2 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
@@ -101,9 +101,12 @@ public class HoodieTestUtils {
   public static HoodieTableMetaClient init(Configuration hadoopConf, String 
basePath, HoodieTableType tableType,
                                            Properties properties)
       throws IOException {
-    properties.putIfAbsent(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, 
RAW_TRIPS_TEST_NAME);
-    properties.putIfAbsent(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, 
tableType.name());
-    properties.putIfAbsent(HoodieTableConfig.HOODIE_PAYLOAD_CLASS_PROP_NAME, 
HoodieAvroPayload.class.getName());
+    properties = HoodieTableMetaClient.withPropertyBuilder()
+      .setTableName(RAW_TRIPS_TEST_NAME)
+      .setTableType(tableType)
+      .setPayloadClass(HoodieAvroPayload.class)
+      .fromProperties(properties)
+      .build();
     return HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, 
basePath, properties);
   }
 
diff --git 
a/hudi-examples/src/main/java/org/apache/hudi/examples/java/HoodieJavaWriteClientExample.java
 
b/hudi-examples/src/main/java/org/apache/hudi/examples/java/HoodieJavaWriteClientExample.java
index 1ee5d1a..4d06e4d 100644
--- 
a/hudi-examples/src/main/java/org/apache/hudi/examples/java/HoodieJavaWriteClientExample.java
+++ 
b/hudi-examples/src/main/java/org/apache/hudi/examples/java/HoodieJavaWriteClientExample.java
@@ -72,8 +72,11 @@ public class HoodieJavaWriteClientExample {
     Path path = new Path(tablePath);
     FileSystem fs = FSUtils.getFs(tablePath, hadoopConf);
     if (!fs.exists(path)) {
-      HoodieTableMetaClient.initTableType(hadoopConf, tablePath, 
HoodieTableType.valueOf(tableType),
-          tableName, HoodieAvroPayload.class.getName());
+      HoodieTableMetaClient.withPropertyBuilder()
+        .setTableType(tableType)
+        .setTableName(tableName)
+        .setPayloadClassName(HoodieAvroPayload.class.getName())
+        .initTable(hadoopConf, tablePath);
     }
 
     // Create the write client to write some records in
diff --git 
a/hudi-examples/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java
 
b/hudi-examples/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java
index b606c52..257519b 100644
--- 
a/hudi-examples/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java
+++ 
b/hudi-examples/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java
@@ -85,8 +85,11 @@ public class HoodieWriteClientExample {
       Path path = new Path(tablePath);
       FileSystem fs = FSUtils.getFs(tablePath, jsc.hadoopConfiguration());
       if (!fs.exists(path)) {
-        HoodieTableMetaClient.initTableType(jsc.hadoopConfiguration(), 
tablePath, HoodieTableType.valueOf(tableType),
-                tableName, HoodieAvroPayload.class.getName());
+        HoodieTableMetaClient.withPropertyBuilder()
+          .setTableType(tableType)
+          .setTableName(tableName)
+          .setPayloadClass(HoodieAvroPayload.class)
+          .initTable(jsc.hadoopConfiguration(), tablePath);
       }
 
       // Create the write client to write some records in
diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java 
b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 81a234a..cc161ce 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -19,7 +19,6 @@
 package org.apache.hudi.util;
 
 import org.apache.hudi.common.model.HoodieRecordLocation;
-import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.TablePathUtils;
 import org.apache.hudi.exception.HoodieException;
@@ -284,14 +283,13 @@ public class StreamerUtil {
     // Hadoop FileSystem
     try (FileSystem fs = FSUtils.getFs(basePath, hadoopConf)) {
       if (!fs.exists(new Path(basePath, 
HoodieTableMetaClient.METAFOLDER_NAME))) {
-        HoodieTableMetaClient.initTableType(
-            hadoopConf,
-            basePath,
-            HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE)),
-            conf.getString(FlinkOptions.TABLE_NAME),
-            DEFAULT_ARCHIVE_LOG_FOLDER,
-            conf.getString(FlinkOptions.PAYLOAD_CLASS),
-            1);
+        HoodieTableMetaClient.withPropertyBuilder()
+          .setTableType(conf.getString(FlinkOptions.TABLE_TYPE))
+          .setTableName(conf.getString(FlinkOptions.TABLE_NAME))
+          .setPayloadClassName(conf.getString(FlinkOptions.PAYLOAD_CLASS))
+          .setArchiveLogFolder(DEFAULT_ARCHIVE_LOG_FOLDER)
+          .setTimelineLayoutVersion(1)
+          .initTable(hadoopConf, basePath);
         LOG.info("Table initialized under base path {}", basePath);
       } else {
         LOG.info("Table [{}/{}] already exists, no need to initialize the 
table",
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java
index aeb8748..00f4d1d 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java
@@ -21,7 +21,6 @@ package org.apache.hudi.integ.testsuite;
 import org.apache.hudi.DataSourceUtils;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.exception.HoodieException;
@@ -94,8 +93,11 @@ public class HoodieTestSuiteJob {
     this.keyGenerator = (BuiltinKeyGenerator) 
DataSourceUtils.createKeyGenerator(props);
 
     if (!fs.exists(new Path(cfg.targetBasePath))) {
-      HoodieTableMetaClient.initTableType(jsc.hadoopConfiguration(), 
cfg.targetBasePath,
-          HoodieTableType.valueOf(cfg.tableType), cfg.targetTableName, 
"archived");
+      HoodieTableMetaClient.withPropertyBuilder()
+        .setTableType(cfg.tableType)
+        .setTableName(cfg.targetTableName)
+        .setArchiveLogFolder("archived")
+        .initTable(jsc.hadoopConfiguration(), cfg.targetBasePath);
     }
 
     if (cfg.cleanInput) {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index ef28191..1f1dc4d 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -38,7 +38,7 @@ import 
org.apache.hudi.config.HoodieBootstrapConfig.{BOOTSTRAP_BASE_PATH_PROP, B
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool}
-import org.apache.hudi.internal.{DataSourceInternalWriterHelper, 
HoodieDataSourceInternalWriter}
+import org.apache.hudi.internal.DataSourceInternalWriterHelper
 import org.apache.hudi.sync.common.AbstractSyncTool
 import org.apache.log4j.LogManager
 import org.apache.spark.SPARK_VERSION
@@ -111,9 +111,14 @@ private[hudi] object HoodieSparkSqlWriter {
       if (!tableExists) {
         val archiveLogFolder = parameters.getOrElse(
           HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP_NAME, "archived")
-        val tableMetaClient = 
HoodieTableMetaClient.initTableType(sparkContext.hadoopConfiguration, path.get,
-          tableType, tblName, archiveLogFolder, 
parameters(PAYLOAD_CLASS_OPT_KEY),
-          null.asInstanceOf[String], 
parameters.getOrDefault(PRECOMBINE_FIELD_OPT_KEY, null))
+
+        val tableMetaClient = HoodieTableMetaClient.withPropertyBuilder()
+          .setTableType(tableType)
+          .setTableName(tblName)
+          .setArchiveLogFolder(archiveLogFolder)
+          .setPayloadClassName(parameters(PAYLOAD_CLASS_OPT_KEY))
+          
.setPreCombineField(parameters.getOrDefault(PRECOMBINE_FIELD_OPT_KEY, null))
+          .initTable(sparkContext.hadoopConfiguration, path.get)
         tableConfig = tableMetaClient.getTableConfig
       }
 
@@ -261,10 +266,15 @@ private[hudi] object HoodieSparkSqlWriter {
     if (!tableExists) {
       val archiveLogFolder = parameters.getOrElse(
         HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP_NAME, "archived")
-      
HoodieTableMetaClient.initTableTypeWithBootstrap(sparkContext.hadoopConfiguration,
 path,
-        HoodieTableType.valueOf(tableType), tableName, archiveLogFolder, 
parameters(PAYLOAD_CLASS_OPT_KEY),
-        null, parameters.getOrDefault(PRECOMBINE_FIELD_OPT_KEY, null),
-        bootstrapIndexClass, bootstrapBasePath)
+      HoodieTableMetaClient.withPropertyBuilder()
+          .setTableType(HoodieTableType.valueOf(tableType))
+          .setTableName(tableName)
+          .setArchiveLogFolder(archiveLogFolder)
+          .setPayloadClassName(parameters(PAYLOAD_CLASS_OPT_KEY))
+          
.setPreCombineField(parameters.getOrDefault(PRECOMBINE_FIELD_OPT_KEY, null))
+          .setBootstrapIndexClass(bootstrapIndexClass)
+          .setBootstrapBasePath(bootstrapBasePath)
+          .initTable(sparkContext.hadoopConfiguration, path)
     }
 
     val jsc = new JavaSparkContext(sqlContext.sparkContext)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
index a98152a..fbd0e7a 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
@@ -44,8 +44,11 @@ class TestStreamingSource extends StreamTest {
   test("test cow stream source") {
     withTempDir { inputDir =>
       val tablePath = s"${inputDir.getCanonicalPath}/test_cow_stream"
-      HoodieTableMetaClient.initTableType(spark.sessionState.newHadoopConf(), 
tablePath,
-        COPY_ON_WRITE, getTableName(tablePath), 
DataSourceWriteOptions.DEFAULT_PAYLOAD_OPT_VAL)
+      HoodieTableMetaClient.withPropertyBuilder()
+          .setTableType(COPY_ON_WRITE)
+          .setTableName(getTableName(tablePath))
+          .setPayloadClassName(DataSourceWriteOptions.DEFAULT_PAYLOAD_OPT_VAL)
+          .initTable(spark.sessionState.newHadoopConf(), tablePath)
 
       addData(tablePath, Seq(("1", "a1", "10", "000")))
       val df = spark.readStream
@@ -91,8 +94,11 @@ class TestStreamingSource extends StreamTest {
   test("test mor stream source") {
     withTempDir { inputDir =>
       val tablePath = s"${inputDir.getCanonicalPath}/test_mor_stream"
-      HoodieTableMetaClient.initTableType(spark.sessionState.newHadoopConf(), 
tablePath,
-        MERGE_ON_READ, getTableName(tablePath), 
DataSourceWriteOptions.DEFAULT_PAYLOAD_OPT_VAL)
+      HoodieTableMetaClient.withPropertyBuilder()
+        .setTableType(MERGE_ON_READ)
+        .setTableName(getTableName(tablePath))
+        .setPayloadClassName(DataSourceWriteOptions.DEFAULT_PAYLOAD_OPT_VAL)
+        .initTable(spark.sessionState.newHadoopConf(), tablePath)
 
       addData(tablePath, Seq(("1", "a1", "10", "000")))
       val df = spark.readStream
diff --git 
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
 
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
index 0909053..5feca25 100644
--- 
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
+++ 
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
@@ -126,8 +126,11 @@ public class HiveTestUtil {
 
   public static void clear() throws IOException {
     fileSystem.delete(new Path(hiveSyncConfig.basePath), true);
-    HoodieTableMetaClient.initTableType(configuration, 
hiveSyncConfig.basePath, HoodieTableType.COPY_ON_WRITE,
-        hiveSyncConfig.tableName, HoodieAvroPayload.class.getName());
+    HoodieTableMetaClient.withPropertyBuilder()
+      .setTableType(HoodieTableType.COPY_ON_WRITE)
+      .setTableName(hiveSyncConfig.tableName)
+      .setPayloadClass(HoodieAvroPayload.class)
+      .initTable(configuration, hiveSyncConfig.basePath);
 
     HoodieHiveClient client = new HoodieHiveClient(hiveSyncConfig, 
hiveServer.getHiveConf(), fileSystem);
     for (String tableName : createdTablesSet) {
@@ -161,8 +164,12 @@ public class HiveTestUtil {
       throws IOException, URISyntaxException {
     Path path = new Path(hiveSyncConfig.basePath);
     FileIOUtils.deleteDirectory(new File(hiveSyncConfig.basePath));
-    HoodieTableMetaClient.initTableType(configuration, 
hiveSyncConfig.basePath, HoodieTableType.COPY_ON_WRITE,
-        hiveSyncConfig.tableName, HoodieAvroPayload.class.getName());
+    HoodieTableMetaClient.withPropertyBuilder()
+      .setTableType(HoodieTableType.COPY_ON_WRITE)
+      .setTableName(hiveSyncConfig.tableName)
+      .setPayloadClass(HoodieAvroPayload.class)
+      .initTable(configuration, hiveSyncConfig.basePath);
+
     boolean result = fileSystem.mkdirs(path);
     checkResult(result);
     DateTime dateTime = DateTime.now();
@@ -177,8 +184,11 @@ public class HiveTestUtil {
       throws IOException, URISyntaxException, InterruptedException {
     Path path = new Path(hiveSyncConfig.basePath);
     FileIOUtils.deleteDirectory(new File(hiveSyncConfig.basePath));
-    HoodieTableMetaClient.initTableType(configuration, 
hiveSyncConfig.basePath, HoodieTableType.MERGE_ON_READ,
-        hiveSyncConfig.tableName, HoodieAvroPayload.class.getName());
+    HoodieTableMetaClient.withPropertyBuilder()
+      .setTableType(HoodieTableType.MERGE_ON_READ)
+      .setTableName(hiveSyncConfig.tableName)
+      .setPayloadClass(HoodieAvroPayload.class)
+      .initTable(configuration, hiveSyncConfig.basePath);
 
     boolean result = fileSystem.mkdirs(path);
     checkResult(result);
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java
index 06245b9..1864795 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java
@@ -28,7 +28,6 @@ import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieIOException;
@@ -135,9 +134,10 @@ public class HDFSParquetImporter implements Serializable {
 
       if (!fs.exists(new Path(cfg.targetPath))) {
         // Initialize target hoodie table.
-        Properties properties = new Properties();
-        properties.put(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, 
cfg.tableName);
-        properties.put(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, 
cfg.tableType);
+        Properties properties = HoodieTableMetaClient.withPropertyBuilder()
+            .setTableName(cfg.tableName)
+            .setTableType(cfg.tableType)
+            .build();
         
HoodieTableMetaClient.initTableAndGetMetaClient(jsc.hadoopConfiguration(), 
cfg.targetPath, properties);
       }
 
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BootstrapExecutor.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BootstrapExecutor.java
index 5e34c20..bcf4025 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BootstrapExecutor.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BootstrapExecutor.java
@@ -23,7 +23,6 @@ import org.apache.hudi.DataSourceWriteOptions;
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.Option;
@@ -170,10 +169,15 @@ public class BootstrapExecutor  implements Serializable {
       throw new HoodieException("target base path already exists at " + 
cfg.targetBasePath
           + ". Cannot bootstrap data on top of an existing table");
     }
-
-    HoodieTableMetaClient.initTableTypeWithBootstrap(new 
Configuration(jssc.hadoopConfiguration()),
-        cfg.targetBasePath, HoodieTableType.valueOf(cfg.tableType), 
cfg.targetTableName, "archived", cfg.payloadClassName,
-        cfg.baseFileFormat, cfg.bootstrapIndexClass, bootstrapBasePath);
+    HoodieTableMetaClient.withPropertyBuilder()
+        .setTableType(cfg.tableType)
+        .setTableName(cfg.targetTableName)
+        .setArchiveLogFolder("archived")
+        .setPayloadClassName(cfg.payloadClassName)
+        .setBaseFileFormat(cfg.baseFileFormat)
+        .setBootstrapIndexClass(cfg.bootstrapIndexClass)
+        .setBootstrapBasePath(bootstrapBasePath)
+        .initTable(new Configuration(jssc.hadoopConfiguration()), 
cfg.targetBasePath);
   }
 
   public HoodieWriteConfig getBootstrapConfig() {
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 36e2c99..17eee8e 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -31,7 +31,6 @@ import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -236,8 +235,14 @@ public class DeltaSync implements Serializable {
       }
     } else {
       this.commitTimelineOpt = Option.empty();
-      HoodieTableMetaClient.initTableType(new 
Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath,
-          HoodieTableType.valueOf(cfg.tableType), cfg.targetTableName, 
"archived", cfg.payloadClassName, cfg.baseFileFormat);
+      HoodieTableMetaClient.withPropertyBuilder()
+          .setTableType(cfg.tableType)
+          .setTableName(cfg.targetTableName)
+          .setArchiveLogFolder("archived")
+          .setPayloadClassName(cfg.payloadClassName)
+          .setBaseFileFormat(cfg.baseFileFormat)
+          .initTable(new Configuration(jssc.hadoopConfiguration()),
+            cfg.targetBasePath);
     }
   }
 
@@ -321,8 +326,13 @@ public class DeltaSync implements Serializable {
         }
       }
     } else {
-      HoodieTableMetaClient.initTableType(new 
Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath,
-          HoodieTableType.valueOf(cfg.tableType), cfg.targetTableName, 
"archived", cfg.payloadClassName, cfg.baseFileFormat);
+      HoodieTableMetaClient.withPropertyBuilder()
+          .setTableType(cfg.tableType)
+          .setTableName(cfg.targetTableName)
+          .setArchiveLogFolder("archived")
+          .setPayloadClassName(cfg.payloadClassName)
+          .setBaseFileFormat(cfg.baseFileFormat)
+          .initTable(new Configuration(jssc.hadoopConfiguration()), 
cfg.targetBasePath);
     }
 
     if (!resumeCheckpointStr.isPresent() && cfg.checkpoint != null) {
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieSnapshotExporter.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieSnapshotExporter.java
index 5f51174..aefa49f 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieSnapshotExporter.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieSnapshotExporter.java
@@ -79,9 +79,12 @@ public class TestHoodieSnapshotExporter extends 
FunctionalTestHarness {
     sourcePath = dfsBasePath() + "/source/";
     targetPath = dfsBasePath() + "/target/";
     dfs().mkdirs(new Path(sourcePath));
-    HoodieTableMetaClient
-        .initTableType(jsc().hadoopConfiguration(), sourcePath, 
HoodieTableType.COPY_ON_WRITE, TABLE_NAME,
-            HoodieAvroPayload.class.getName());
+
+    HoodieTableMetaClient.withPropertyBuilder()
+      .setTableType(HoodieTableType.COPY_ON_WRITE)
+      .setTableName(TABLE_NAME)
+      .setPayloadClass(HoodieAvroPayload.class)
+      .initTable(jsc().hadoopConfiguration(), sourcePath);
 
     // Prepare data as source Hudi dataset
     HoodieWriteConfig cfg = getHoodieWriteConfig(sourcePath);
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
index b83fa78..946db12 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
@@ -180,8 +180,11 @@ public class UtilitiesTestBase {
     // Create Dummy hive sync config
     HiveSyncConfig hiveSyncConfig = getHiveSyncConfig("/dummy", "dummy");
     hiveConf.addResource(hiveServer.getHiveConf());
-    HoodieTableMetaClient.initTableType(dfs.getConf(), 
hiveSyncConfig.basePath, HoodieTableType.COPY_ON_WRITE,
-        hiveSyncConfig.tableName, null);
+    HoodieTableMetaClient.withPropertyBuilder()
+      .setTableType(HoodieTableType.COPY_ON_WRITE)
+      .setTableName(hiveSyncConfig.tableName)
+      .initTable(dfs.getConf(), hiveSyncConfig.basePath);
+
     HoodieHiveClient client = new HoodieHiveClient(hiveSyncConfig, hiveConf, 
dfs);
     client.updateHiveSQL("drop database if exists " + 
hiveSyncConfig.databaseName);
     client.updateHiveSQL("create database " + hiveSyncConfig.databaseName);

Reply via email to