This is an automated email from the ASF dual-hosted git repository.

nagarwal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 0fb8556  Add ability to provide multi-region (global) data consistency 
across HMS in different regions (#2542)
0fb8556 is described below

commit 0fb8556b0d9274aef650a46bb82a8cf495d4450b
Author: s-sanjay <[email protected]>
AuthorDate: Fri Jun 25 08:56:26 2021 +0530

    Add ability to provide multi-region (global) data consistency across HMS in 
different regions (#2542)
    
    [global-hive-sync-tool] Add a global hive sync tool to sync hudi table 
across clusters. Add a way to rollback the replicated time stamp if we fail to 
sync or if we partly sync
    
    Co-authored-by: Jagmeet Bali <[email protected]>
---
 .../common/bootstrap/index/BootstrapIndex.java     |   6 +-
 .../hudi/common/table/HoodieTableMetaClient.java   |   4 +-
 .../apache/hudi/common/util/ReflectionUtils.java   |   2 +-
 .../org/apache/hudi/hadoop/InputPathHandler.java   |   9 +-
 .../apache/hudi/hadoop/utils/HoodieHiveUtils.java  |   8 +-
 .../hudi/hadoop/utils/HoodieInputFormatUtils.java  |   1 +
 ...llyConsistentTimeStampFilteringInputFormat.java | 121 +++++++++
 .../hudi/hadoop/TestHoodieParquetInputFormat.java  |   4 +-
 .../apache/hudi/hadoop/TestInputPathHandler.java   |  17 ++
 hudi-sync/hudi-hive-sync/pom.xml                   |  10 +
 .../hudi-hive-sync/run_hive_global_commit_tool.sh  |  69 +++++
 .../java/org/apache/hudi/hive/HiveSyncConfig.java  |   1 +
 .../java/org/apache/hudi/hive/HiveSyncTool.java    |  12 +-
 .../org/apache/hudi/hive/HoodieHiveClient.java     |  80 +++++-
 .../hive/replication/GlobalHiveSyncConfig.java     |  54 ++++
 .../hudi/hive/replication/GlobalHiveSyncTool.java  | 105 ++++++++
 .../hive/replication/HiveSyncGlobalCommit.java     |  37 +++
 .../replication/HiveSyncGlobalCommitConfig.java    |  98 +++++++
 .../hive/replication/HiveSyncGlobalCommitTool.java | 136 ++++++++++
 .../hive/replication/ReplicationStateSync.java     |  90 +++++++
 .../hudi/hive/TestHiveSyncGlobalCommitTool.java    | 128 +++++++++
 .../org/apache/hudi/hive/TestHiveSyncTool.java     | 286 ++++++++++++++++++++-
 .../hudi/hive/testutils/HiveTestService.java       |  34 ++-
 .../apache/hudi/hive/testutils/HiveTestUtil.java   |  59 +++--
 .../apache/hudi/hive/testutils/TestCluster.java    | 271 +++++++++++++++++++
 .../testutils/TestHiveSyncGlobalCommitTool.java    | 133 ++++++++++
 .../hive-global-commit-config.xml                  |  27 ++
 27 files changed, 1731 insertions(+), 71 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/index/BootstrapIndex.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/index/BootstrapIndex.java
index 6aafeca..abd3ac5 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/index/BootstrapIndex.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/index/BootstrapIndex.java
@@ -21,12 +21,12 @@ package org.apache.hudi.common.bootstrap.index;
 import org.apache.hudi.common.model.BootstrapFileMapping;
 import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.ReflectionUtils;
 
 import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.util.ReflectionUtils;
 
 /**
  * Bootstrap Index Interface.
@@ -161,6 +161,6 @@ public abstract class BootstrapIndex implements 
Serializable {
 
   public static BootstrapIndex getBootstrapIndex(HoodieTableMetaClient 
metaClient) {
     return ((BootstrapIndex)(ReflectionUtils.loadClass(
-        metaClient.getTableConfig().getBootstrapIndexClass(), metaClient)));
+        metaClient.getTableConfig().getBootstrapIndexClass(), new 
Class[]{HoodieTableMetaClient.class}, metaClient)));
   }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index cf66f16..3285a00 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -98,8 +98,8 @@ public class HoodieTableMetaClient implements Serializable {
   private ConsistencyGuardConfig consistencyGuardConfig = 
ConsistencyGuardConfig.newBuilder().build();
 
   private HoodieTableMetaClient(Configuration conf, String basePath, boolean 
loadActiveTimelineOnLoad,
-                               ConsistencyGuardConfig consistencyGuardConfig, 
Option<TimelineLayoutVersion> layoutVersion,
-                               String payloadClassName) {
+                                ConsistencyGuardConfig consistencyGuardConfig, 
Option<TimelineLayoutVersion> layoutVersion,
+                                String payloadClassName) {
     LOG.info("Loading HoodieTableMetaClient from " + basePath);
     this.consistencyGuardConfig = consistencyGuardConfig;
     this.hadoopConf = new SerializableConfiguration(conf);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java
index 57d11d0..2895f46 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java
@@ -87,7 +87,7 @@ public class ReflectionUtils {
     try {
       return 
getClass(clazz).getConstructor(constructorArgTypes).newInstance(constructorArgs);
     } catch (InstantiationException | IllegalAccessException | 
InvocationTargetException | NoSuchMethodException e) {
-      throw new HoodieException("Unable to instantiate class ", e);
+      throw new HoodieException("Unable to instantiate class " + clazz, e);
     }
   }
 
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputPathHandler.java 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputPathHandler.java
index 0a5055a..f7adf38 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputPathHandler.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputPathHandler.java
@@ -39,6 +39,14 @@ import static 
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getTableMetaCl
  * InputPathHandler takes in a set of input paths and incremental tables list. 
Then, classifies the
  * input paths to incremental, snapshot paths and non-hoodie paths. This is 
then accessed later to
  * mutate the JobConf before processing incremental mode queries and snapshot 
queries.
+ *
+ * Note: We are adding jobConf of a mapreduce or spark job. The properties in 
the jobConf are two
+ * type: session properties and table properties from metastore. While session 
property is common
+ * for all the tables in a query the table properties are unique per table so 
there is no need to
+ * check if it belongs to the table for which the path handler is now 
instantiated. The jobConf has
+ * all table properties such as name, last modification time and so on which 
are unique to a table.
+ * This class is written in such a way that it can handle multiple tables and 
properties unique to
+ * a table but for table level property such check is not required.
  */
 public class InputPathHandler {
 
@@ -63,7 +71,6 @@ public class InputPathHandler {
   /**
    * Takes in the original InputPaths and classifies each of them into 
incremental, snapshot and
    * non-hoodie InputPaths. The logic is as follows:
-   *
    * 1. Check if an inputPath starts with the same basepath as any of the 
metadata basepaths we know
    *    1a. If yes, this belongs to a Hoodie table that we already know about. 
Simply classify this
    *        as incremental or snapshot - We can get the table name of this 
inputPath from the
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieHiveUtils.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieHiveUtils.java
index d9983bd..d5d9d9c 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieHiveUtils.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieHiveUtils.java
@@ -18,13 +18,14 @@
 
 package org.apache.hudi.hadoop.utils;
 
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.exception.HoodieIOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
@@ -73,6 +74,7 @@ public class HoodieHiveUtils {
   public static final int MAX_COMMIT_ALL = -1;
   public static final int DEFAULT_LEVELS_TO_BASEPATH = 3;
   public static final Pattern HOODIE_CONSUME_MODE_PATTERN_STRING = 
Pattern.compile("hoodie\\.(.*)\\.consume\\.mode");
+  public static final String GLOBALLY_CONSISTENT_READ_TIMESTAMP = 
"last_replication_timestamp";
 
   public static boolean stopAtCompaction(JobContext job, String tableName) {
     String compactionPropName = 
String.format(HOODIE_STOP_AT_COMPACTION_PATTERN, tableName);
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
index b39ee34..26fbdda 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
@@ -442,6 +442,7 @@ public class HoodieInputFormatUtils {
         }
 
         HoodieTimeline timeline = 
HoodieHiveUtils.getTableTimeline(metaClient.getTableConfig().getTableName(), 
job, metaClient);
+
         HoodieTableFileSystemView fsView = 
fsViewCache.computeIfAbsent(metaClient, tableMetaClient ->
             
FileSystemViewManager.createInMemoryFileSystemViewWithTimeline(engineContext, 
tableMetaClient, buildMetadataConfig(job), timeline));
         List<HoodieBaseFile> filteredBaseFiles = new ArrayList<>();
diff --git 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestGloballyConsistentTimeStampFilteringInputFormat.java
 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestGloballyConsistentTimeStampFilteringInputFormat.java
new file mode 100644
index 0000000..50e4a6e
--- /dev/null
+++ 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestGloballyConsistentTimeStampFilteringInputFormat.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.hadoop.testutils.InputFormatTestUtil;
+import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestGloballyConsistentTimeStampFilteringInputFormat
+    extends TestHoodieParquetInputFormat {
+
+  @BeforeEach
+  public void setUp() {
+    super.setUp();
+  }
+
+  @Test
+  public void testInputFormatLoad() throws IOException {
+    super.testInputFormatLoad();
+
+    // set filtering timestamp to 0 now the timeline wont have any commits.
+    InputFormatTestUtil.setupSnapshotMaxCommitTimeQueryMode(jobConf, "0");
+
+    Assertions.assertThrows(HoodieIOException.class, () -> 
inputFormat.getSplits(jobConf, 10));
+    Assertions.assertThrows(HoodieIOException.class, () -> 
inputFormat.listStatus(jobConf));
+  }
+
+  @Test
+  public void testInputFormatUpdates() throws IOException {
+    super.testInputFormatUpdates();
+
+    // set the globally replicated timestamp to 199 so only 100 is read and 
update is ignored.
+    InputFormatTestUtil.setupSnapshotMaxCommitTimeQueryMode(jobConf, "100");
+
+    FileStatus[] files = inputFormat.listStatus(jobConf);
+    assertEquals(10, files.length);
+
+    ensureFilesInCommit("5 files have been updated to commit 200. but should 
get filtered out ",
+        files,"200", 0);
+    ensureFilesInCommit("We should see 10 files from commit 100 ", files, 
"100", 10);
+  }
+
+  @Override
+  public void testIncrementalSimple() throws IOException {
+    // setting filtering timestamp to zero should not in any way alter the 
result of the test which
+    // pulls in zero files due to incremental ts being the actual commit time
+    jobConf.set(HoodieHiveUtils.GLOBALLY_CONSISTENT_READ_TIMESTAMP, "0");
+    super.testIncrementalSimple();
+  }
+
+  @Override
+  public void testIncrementalWithMultipleCommits() throws IOException {
+    super.testIncrementalWithMultipleCommits();
+
+    // set globally replicated timestamp to 400 so commits from 500, 600 does 
not show up
+    InputFormatTestUtil.setupSnapshotMaxCommitTimeQueryMode(jobConf, "400");
+    InputFormatTestUtil.setupIncremental(jobConf, "100", 
HoodieHiveUtils.MAX_COMMIT_ALL);
+
+    FileStatus[] files = inputFormat.listStatus(jobConf);
+
+    assertEquals(
+         5, files.length,"Pulling ALL commits from 100, should get us the 3 
files from 400 commit, 1 file from 300 "
+            + "commit and 1 file from 200 commit");
+    ensureFilesInCommit("Pulling 3 commits from 100, should get us the 3 files 
from 400 commit",
+        files, "400", 3);
+    ensureFilesInCommit("Pulling 3 commits from 100, should get us the 1 files 
from 300 commit",
+        files, "300", 1);
+    ensureFilesInCommit("Pulling 3 commits from 100, should get us the 1 files 
from 200 commit",
+        files, "200", 1);
+
+    List<String> commits = Arrays.asList("100", "200", "300", "400", "500", 
"600");
+    for (int idx = 0; idx < commits.size(); ++idx) {
+      for (int jdx = 0; jdx < commits.size(); ++jdx) {
+        InputFormatTestUtil.setupIncremental(jobConf, commits.get(idx), 
HoodieHiveUtils.MAX_COMMIT_ALL);
+        InputFormatTestUtil.setupSnapshotMaxCommitTimeQueryMode(jobConf, 
commits.get(jdx));
+
+        files = inputFormat.listStatus(jobConf);
+
+        if (jdx <= idx) {
+          assertEquals(0, files.length,"all commits should be filtered");
+        } else {
+          // only commits upto the timestamp is allowed
+          for (FileStatus file : files) {
+            String commitTs = FSUtils.getCommitTime(file.getPath().getName());
+            assertTrue(commits.indexOf(commitTs) <= jdx);
+            assertTrue(commits.indexOf(commitTs) > idx);
+          }
+        }
+      }
+    }
+  }
+}
diff --git 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java
 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java
index c4fed98..c45c614 100644
--- 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java
+++ 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java
@@ -65,8 +65,8 @@ import static org.junit.jupiter.api.Assertions.fail;
 
 public class TestHoodieParquetInputFormat {
 
-  private HoodieParquetInputFormat inputFormat;
-  private JobConf jobConf;
+  protected HoodieParquetInputFormat inputFormat;
+  protected JobConf jobConf;
   private final HoodieFileFormat baseFileFormat = HoodieFileFormat.PARQUET;
   private final String baseFileExtension = baseFileFormat.getFileExtension();
 
diff --git 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestInputPathHandler.java 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestInputPathHandler.java
index 3a8b197..0287318 100644
--- 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestInputPathHandler.java
+++ 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestInputPathHandler.java
@@ -23,10 +23,12 @@ import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.testutils.minicluster.HdfsTestService;
+import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.junit.jupiter.api.AfterAll;
@@ -169,6 +171,21 @@ public class TestInputPathHandler {
     assertTrue(actualComparesToExpected(actualPaths, nonHoodiePaths));
   }
 
+  @Test
+  public void testInputPathHandlerWithGloballyReplicatedTimeStamp() throws 
IOException {
+    JobConf jobConf = new JobConf();
+    jobConf.set(HoodieHiveUtils.GLOBALLY_CONSISTENT_READ_TIMESTAMP, "1");
+    inputPathHandler = new InputPathHandler(dfs.getConf(), inputPaths.toArray(
+        new Path[inputPaths.size()]), incrementalTables);
+    List<Path> actualPaths = 
inputPathHandler.getGroupedIncrementalPaths().values().stream()
+        .flatMap(List::stream).collect(Collectors.toList());
+    assertTrue(actualComparesToExpected(actualPaths, incrementalPaths));
+    actualPaths = inputPathHandler.getSnapshotPaths();
+    assertTrue(actualComparesToExpected(actualPaths, snapshotPaths));
+    actualPaths = inputPathHandler.getNonHoodieInputPaths();
+    assertTrue(actualComparesToExpected(actualPaths, nonHoodiePaths));
+  }
+
   private boolean actualComparesToExpected(List<Path> actualPaths, List<Path> 
expectedPaths) {
     if (actualPaths.size() != expectedPaths.size()) {
       return false;
diff --git a/hudi-sync/hudi-hive-sync/pom.xml b/hudi-sync/hudi-hive-sync/pom.xml
index c44f785..fd63028 100644
--- a/hudi-sync/hudi-hive-sync/pom.xml
+++ b/hudi-sync/hudi-hive-sync/pom.xml
@@ -30,6 +30,8 @@
 
   <properties>
     <main.basedir>${project.parent.basedir}</main.basedir>
+
+    <jetty.version>7.6.0.v20120127</jetty.version>
   </properties>
 
   <dependencies>
@@ -148,6 +150,14 @@
       <scope>test</scope>
     </dependency>
 
+    <!-- Needed for running HiveServer for Tests -->
+    <dependency>
+      <groupId>org.eclipse.jetty.aggregate</groupId>
+      <artifactId>jetty-all</artifactId>
+      <scope>test</scope>
+      <version>${jetty.version}</version>
+    </dependency>
+
     <dependency>
       <groupId>org.junit.jupiter</groupId>
       <artifactId>junit-jupiter-api</artifactId>
diff --git a/hudi-sync/hudi-hive-sync/run_hive_global_commit_tool.sh 
b/hudi-sync/hudi-hive-sync/run_hive_global_commit_tool.sh
new file mode 100755
index 0000000..b7e6cd2
--- /dev/null
+++ b/hudi-sync/hudi-hive-sync/run_hive_global_commit_tool.sh
@@ -0,0 +1,69 @@
+#!/usr/bin/env bash
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+# A tool to sync the hudi table to hive from different clusters. Similar to 
HiveSyncTool but syncs it to more
+# than one hive cluster ( currently a local and remote cluster). The common 
timestamp that was synced is stored as a new table property
+# This is most useful when we want to ensure that across different hive 
clusters we want ensure consistent reads. If that is not a requirement
+# then it is better to run HiveSyncTool separately.
+# Note:
+#   The tool tries to be transactional but does not guarantee it. If the sync 
fails midway in one cluster it will try to roll back the committed
+#   timestamp from already successful sync on other clusters but that can also 
fail.
+#   The tool does not roll back any synced partitions but only the timestamp.
+
+function error_exit {
+    echo "$1" >&2   ## Send message to stderr. Exclude >&2 if you don't want 
it that way.
+    exit "${2:-1}"  ## Return a code specified by $2 or 1 by default.
+}
+
+if [ -z "${HADOOP_HOME}" ]; then
+  error_exit "Please make sure the environment variable HADOOP_HOME is setup"
+fi
+
+if [ -z "${HIVE_HOME}" ]; then
+  error_exit "Please make sure the environment variable HIVE_HOME is setup"
+fi
+
+DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
+#Ensure we pick the right jar even for hive11 builds
+HUDI_HIVE_UBER_JAR=`ls -c 
$DIR/../packaging/hudi-hive-bundle/target/hudi-hive-*.jar | grep -v source | 
head -1`
+
+if [ -z "$HADOOP_CONF_DIR" ]; then
+  echo "setting hadoop conf dir"
+  HADOOP_CONF_DIR="${HADOOP_HOME}/etc/hadoop"
+fi
+
+## Include only specific packages from HIVE_HOME/lib to avoid version 
mismatches
+HIVE_EXEC=`ls ${HIVE_HOME}/lib/hive-exec-*.jar | tr '\n' ':'`
+HIVE_SERVICE=`ls ${HIVE_HOME}/lib/hive-service-*.jar | grep -v rpc | tr '\n' 
':'`
+HIVE_METASTORE=`ls ${HIVE_HOME}/lib/hive-metastore-*.jar | tr '\n' ':'`
+HIVE_JDBC=`ls ${HIVE_HOME}/lib/hive-jdbc-*.jar | tr '\n' ':'`
+if [ -z "${HIVE_JDBC}" ]; then
+  HIVE_JDBC=`ls ${HIVE_HOME}/lib/hive-jdbc-*.jar | grep -v handler | tr '\n' 
':'`
+fi
+HIVE_JACKSON=`ls ${HIVE_HOME}/lib/jackson-*.jar | tr '\n' ':'`
+HIVE_NUCLEUS=`ls ${HIVE_HOME}/lib/datanucleus*.jar | tr '\n' ':'`
+HIVE_JARS=$HIVE_METASTORE:$HIVE_SERVICE:$HIVE_EXEC:$HIVE_JDBC:$HIVE_JACKSON:$HIVE_NUCLEUS
+
+HADOOP_HIVE_JARS=${HIVE_JARS}:${HADOOP_HOME}/share/hadoop/common/*:${HADOOP_HOME}/share/hadoop/mapreduce/*:${HADOOP_HOME}/share/hadoop/hdfs/*:${HADOOP_HOME}/share/hadoop/common/lib/*:${HADOOP_HOME}/share/hadoop/hdfs/lib/*
+
+if ! [ -z "$HIVE_CONF_DIR" ]; then
+  error_exit "Don't set HIVE_CONF_DIR; use config xml file"
+fi
+
+echo "Running Command : java -cp 
$HUDI_HIVE_UBER_JAR:${HADOOP_HIVE_JARS}:${HADOOP_CONF_DIR}:${HIVE_HOME}lib/* 
org.apache.hudi.hive.replication.HiveSyncGlobalCommitTool $@"
+java -cp 
$HUDI_HIVE_UBER_JAR:${HADOOP_HIVE_JARS}:${HADOOP_CONF_DIR}:${HIVE_HOME}lib/* 
org.apache.hudi.hive.replication.HiveSyncGlobalCommitTool "$@"
diff --git 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
index e4e7962..41c419d 100644
--- 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
+++ 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
@@ -104,6 +104,7 @@ public class HiveSyncConfig implements Serializable {
   @Parameter(names = {"--decode-partition"}, description = "Decode the 
partition value if the partition has encoded during writing")
   public Boolean decodePartition = false;
 
+  // enhance the similar function in child class
   public static HiveSyncConfig copy(HiveSyncConfig cfg) {
     HiveSyncConfig newConfig = new HiveSyncConfig();
     newConfig.basePath = cfg.basePath;
diff --git 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
index 0dbe97f..7264c8d 100644
--- 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
+++ 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
@@ -58,10 +58,10 @@ public class HiveSyncTool extends AbstractSyncTool {
   public static final String SUFFIX_SNAPSHOT_TABLE = "_rt";
   public static final String SUFFIX_READ_OPTIMIZED_TABLE = "_ro";
 
-  private final HiveSyncConfig cfg;
-  private HoodieHiveClient hoodieHiveClient = null;
-  private String snapshotTableName = null;
-  private Option<String> roTableName = null;
+  protected final HiveSyncConfig cfg;
+  protected HoodieHiveClient hoodieHiveClient = null;
+  protected String snapshotTableName = null;
+  protected Option<String> roTableName = null;
 
   public HiveSyncTool(HiveSyncConfig cfg, HiveConf configuration, FileSystem 
fs) {
     super(configuration.getAllProperties(), fs);
@@ -127,8 +127,8 @@ public class HiveSyncTool extends AbstractSyncTool {
       }
     }
   }
-
-  private void syncHoodieTable(String tableName, boolean 
useRealtimeInputFormat,
+  
+  protected void syncHoodieTable(String tableName, boolean 
useRealtimeInputFormat,
                                boolean readAsOptimized) {
     LOG.info("Trying to sync hoodie table " + tableName + " with base path " + 
hoodieHiveClient.getBasePath()
         + " of type " + hoodieHiveClient.getTableType());
diff --git 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
index 3ae94dd..9d02145 100644
--- 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
+++ 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
@@ -18,12 +18,6 @@
 
 package org.apache.hudi.hive;
 
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.ql.Driver;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.fs.StorageSchemes;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -32,15 +26,22 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.PartitionPathEncodeUtils;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.hive.util.HiveSchemaUtil;
+import org.apache.hudi.sync.common.AbstractSyncHoodieClient;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.Driver;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
 import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hudi.sync.common.AbstractSyncHoodieClient;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.parquet.schema.MessageType;
@@ -60,6 +61,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static 
org.apache.hudi.hadoop.utils.HoodieHiveUtils.GLOBALLY_CONSISTENT_READ_TIMESTAMP;
+
 public class HoodieHiveClient extends AbstractSyncHoodieClient {
 
   private static final String HOODIE_LAST_COMMIT_TIME_SYNC = 
"last_commit_time_sync";
@@ -402,7 +405,19 @@ public class HoodieHiveClient extends 
AbstractSyncHoodieClient {
         closeQuietly(null, stmt);
       }
     } else {
-      updateHiveSQLUsingHiveDriver(s);
+      CommandProcessorResponse response = updateHiveSQLUsingHiveDriver(s);
+      if (response == null) {
+        throw new HoodieHiveSyncException("Failed in executing SQL null 
response" + s);
+      }
+      if (response.getResponseCode() != 0) {
+        LOG.error(String.format("Failure in SQL response %s", 
response.toString()));
+        if (response.getException() != null) {
+          throw new HoodieHiveSyncException(
+              String.format("Failed in executing SQL %s", s), 
response.getException());
+        } else {
+          throw new HoodieHiveSyncException(String.format("Failed in executing 
SQL %s", s));
+        }
+      }
     }
   }
 
@@ -476,13 +491,58 @@ public class HoodieHiveClient extends 
AbstractSyncHoodieClient {
     }
   }
 
+  public Option<String> getLastReplicatedTime(String tableName) {
+    // Get the last replicated time from the TBLproperties
+    try {
+      Table database = client.getTable(syncConfig.databaseName, tableName);
+      return 
Option.ofNullable(database.getParameters().getOrDefault(GLOBALLY_CONSISTENT_READ_TIMESTAMP,
 null));
+    } catch (NoSuchObjectException e) {
+      LOG.warn("the said table not found in hms " + syncConfig.databaseName + 
"." + tableName);
+      return Option.empty();
+    } catch (Exception e) {
+      throw new HoodieHiveSyncException("Failed to get the last replicated 
time from the database", e);
+    }
+  }
+
+  public void updateLastReplicatedTimeStamp(String tableName, String 
timeStamp) {
+    if (!activeTimeline.filterCompletedInstants().getInstants()
+            .anyMatch(i -> i.getTimestamp().equals(timeStamp))) {
+      throw new HoodieHiveSyncException(
+          "Not a valid completed timestamp " + timeStamp + " for table " + 
tableName);
+    }
+    try {
+      Table table = client.getTable(syncConfig.databaseName, tableName);
+      table.putToParameters(GLOBALLY_CONSISTENT_READ_TIMESTAMP, timeStamp);
+      client.alter_table(syncConfig.databaseName, tableName, table);
+    } catch (Exception e) {
+      throw new HoodieHiveSyncException(
+          "Failed to update last replicated time to " + timeStamp + " for " + 
tableName, e);
+    }
+  }
+
+  public void deleteLastReplicatedTimeStamp(String tableName) {
+    try {
+      Table table = client.getTable(syncConfig.databaseName, tableName);
+      String timestamp = 
table.getParameters().remove(GLOBALLY_CONSISTENT_READ_TIMESTAMP);
+      client.alter_table(syncConfig.databaseName, tableName, table);
+      if (timestamp != null) {
+        LOG.info("deleted last replicated timestamp " + timestamp + " for 
table " + tableName);
+      }
+    } catch (NoSuchObjectException e) {
+      // this is ok the table doesn't even exist.
+    } catch (Exception e) {
+      throw new HoodieHiveSyncException(
+          "Failed to delete last replicated timestamp for " + tableName, e);
+    }
+  }
+
   public void close() {
     try {
       if (connection != null) {
         connection.close();
       }
       if (client != null) {
-        Hive.closeCurrent();
+        client.close();
         client = null;
       }
     } catch (SQLException e) {
@@ -506,4 +566,4 @@ public class HoodieHiveClient extends 
AbstractSyncHoodieClient {
       throw new HoodieHiveSyncException("Failed to get update last commit time 
synced to " + lastCommitSynced, e);
     }
   }
-}
\ No newline at end of file
+}
diff --git 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/GlobalHiveSyncConfig.java
 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/GlobalHiveSyncConfig.java
new file mode 100644
index 0000000..19074c8
--- /dev/null
+++ 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/GlobalHiveSyncConfig.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hive.replication;
+
+import com.beust.jcommander.Parameter;
+import org.apache.hudi.hive.HiveSyncConfig;
+
+public class GlobalHiveSyncConfig extends HiveSyncConfig {
+  @Parameter(names = {"--replicated-timestamp"}, description = "Add globally 
replicated timestamp to enable consistent reads across clusters")
+  public String globallyReplicatedTimeStamp;
+
+  public static GlobalHiveSyncConfig copy(GlobalHiveSyncConfig cfg) {
+    GlobalHiveSyncConfig newConfig = new GlobalHiveSyncConfig();
+    newConfig.basePath = cfg.basePath;
+    newConfig.assumeDatePartitioning = cfg.assumeDatePartitioning;
+    newConfig.databaseName = cfg.databaseName;
+    newConfig.hivePass = cfg.hivePass;
+    newConfig.hiveUser = cfg.hiveUser;
+    newConfig.partitionFields = cfg.partitionFields;
+    newConfig.partitionValueExtractorClass = cfg.partitionValueExtractorClass;
+    newConfig.jdbcUrl = cfg.jdbcUrl;
+    newConfig.tableName = cfg.tableName;
+    newConfig.usePreApacheInputFormat = cfg.usePreApacheInputFormat;
+    newConfig.useFileListingFromMetadata = cfg.useFileListingFromMetadata;
+    newConfig.verifyMetadataFileListing = cfg.verifyMetadataFileListing;
+    newConfig.supportTimestamp = cfg.supportTimestamp;
+    newConfig.decodePartition = cfg.decodePartition;
+    newConfig.globallyReplicatedTimeStamp = cfg.globallyReplicatedTimeStamp;
+    return newConfig;
+  }
+
+  @Override
+  public String toString() {
+    return "GlobalHiveSyncConfig{" + super.toString()
+        + " globallyReplicatedTimeStamp=" + globallyReplicatedTimeStamp + "}";
+  }
+
+}
diff --git 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/GlobalHiveSyncTool.java
 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/GlobalHiveSyncTool.java
new file mode 100644
index 0000000..19c23b7
--- /dev/null
+++ 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/GlobalHiveSyncTool.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hive.replication;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.InvalidTableException;
+import org.apache.hudi.hive.HiveSyncTool;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class GlobalHiveSyncTool extends HiveSyncTool {
+
+  private static final Logger LOG = LogManager.getLogger(HiveSyncTool.class);
+
+  public GlobalHiveSyncTool(GlobalHiveSyncConfig cfg, HiveConf configuration, 
FileSystem fs) {
+    super(cfg, configuration, fs);
+  }
+
+  @Override
+  public void syncHoodieTable() {
+    switch (hoodieHiveClient.getTableType()) {
+      case COPY_ON_WRITE:
+        syncHoodieTable(snapshotTableName, false, false);
+        break;
+      case MERGE_ON_READ:
+        // sync a RO table for MOR
+        syncHoodieTable(roTableName.get(), false, true);
+        // sync a RT table for MOR
+        syncHoodieTable(snapshotTableName, true, false);
+        break;
+      default:
+        LOG.error("Unknown table type " + hoodieHiveClient.getTableType());
+        throw new InvalidTableException(hoodieHiveClient.getBasePath());
+    }
+  }
+
+  @Override
+  protected void syncHoodieTable(String tableName, boolean 
useRealtimeInputFormat, boolean readAsOptimized) {
+    super.syncHoodieTable(tableName, useRealtimeInputFormat, readAsOptimized);
+    if (((GlobalHiveSyncConfig)cfg).globallyReplicatedTimeStamp != null) {
+      hoodieHiveClient.updateLastReplicatedTimeStamp(tableName,
+          ((GlobalHiveSyncConfig) cfg).globallyReplicatedTimeStamp);
+    }
+    LOG.info("Sync complete for " + tableName);
+  }
+
+  public void close() {
+    hoodieHiveClient.close();
+  }
+
+  public Map<String, Option<String>> getLastReplicatedTimeStampMap() {
+    Map<String, Option<String>> timeStampMap = new HashMap<>();
+    Option<String> timeStamp = 
hoodieHiveClient.getLastReplicatedTime(snapshotTableName);
+    timeStampMap.put(snapshotTableName, timeStamp);
+    if (HoodieTableType.MERGE_ON_READ.equals(hoodieHiveClient.getTableType())) 
{
+      Option<String> roTimeStamp = 
hoodieHiveClient.getLastReplicatedTime(roTableName.get());
+      timeStampMap.put(roTableName.get(), roTimeStamp);
+    }
+    return timeStampMap;
+  }
+
+  public void setLastReplicatedTimeStamp(Map<String, Option<String>> 
timeStampMap) {
+    for (String tableName : timeStampMap.keySet()) {
+      Option<String> timestamp = timeStampMap.get(tableName);
+      if (timestamp.isPresent()) {
+        hoodieHiveClient.updateLastReplicatedTimeStamp(tableName, 
timestamp.get());
+        LOG.info("updated timestamp for " + tableName + " to: " + 
timestamp.get());
+      } else {
+        hoodieHiveClient.deleteLastReplicatedTimeStamp(tableName);
+        LOG.info("deleted timestamp for " + tableName);
+      }
+    }
+  }
+
+  public static GlobalHiveSyncTool 
buildGlobalHiveSyncTool(GlobalHiveSyncConfig cfg, HiveConf hiveConf) {
+    FileSystem fs = FSUtils.getFs(cfg.basePath, new Configuration());
+    hiveConf.addResource(fs.getConf());
+    return new GlobalHiveSyncTool(cfg, hiveConf, fs);
+  }
+}
diff --git 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/HiveSyncGlobalCommit.java
 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/HiveSyncGlobalCommit.java
new file mode 100644
index 0000000..ad8d03d
--- /dev/null
+++ 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/HiveSyncGlobalCommit.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hive.replication;
+
+/**
+ * A interface to allow syncing the Hudi table to all clusters.
+ */
+public interface HiveSyncGlobalCommit {
+
+  /**
+   *
+   * @return whether the commit succeeded to all the clusters.
+   */
+  boolean  commit();
+
+  /**
+   *
+   * @return boolean whether the rollback succeeded to all the clusters.
+   */
+  boolean rollback();
+}
diff --git 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/HiveSyncGlobalCommitConfig.java
 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/HiveSyncGlobalCommitConfig.java
new file mode 100644
index 0000000..bce84e9
--- /dev/null
+++ 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/HiveSyncGlobalCommitConfig.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hive.replication;
+
+import com.beust.jcommander.Parameter;
+
+import com.beust.jcommander.Parameters;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Properties;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+// TODO: stop extending HiveSyncConfig and take all the variables needed from 
config file
+@Parameters(commandDescription = "A tool to sync the hudi table to hive from 
different clusters. Similar to HiveSyncTool but syncs it to more"
+    + "than one hive cluster ( currently a local and remote cluster). The 
common timestamp that was synced is stored as a new table property "
+    + "This is most useful when we want to ensure that across different hive 
clusters we want ensure consistent reads. If that is not a requirement"
+    + "then it is better to run HiveSyncTool separately."
+    + "Note: "
+    + "  The tool tries to be transactional but does not guarantee it. If the 
sync fails midway in one cluster it will try to roll back the committed "
+    + "  timestamp from already successful sync on other clusters but that can 
also fail."
+    + "  The tool does not roll back any synced partitions but only the 
timestamp.")
+public class HiveSyncGlobalCommitConfig extends GlobalHiveSyncConfig {
+
+  private static final Logger LOG = 
LogManager.getLogger(HiveSyncGlobalCommitConfig.class);
+
+  public static String LOCAL_HIVE_SITE_URI = 
"hivesyncglobal.local_hive_site_uri";
+  public static String REMOTE_HIVE_SITE_URI = 
"hivesyncglobal.remote_hive_site_uri";
+  public static String CONFIG_FILE_URI = "hivesyncglobal.config_file_uri";
+  public static String REMOTE_BASE_PATH = "hivesyncglobal.remote_base_path";
+  public static String LOCAL_BASE_PATH = "hivesyncglobal.local_base_path";
+  public static String RETRY_ATTEMPTS = "hivesyncglobal.retry_attempts";
+  public static String REMOTE_HIVE_SERVER_JDBC_URLS = 
"hivesyncglobal.remote_hs2_jdbc_urls";
+  public static String LOCAL_HIVE_SERVER_JDBC_URLS = 
"hivesyncglobal.local_hs2_jdbc_urls";
+
+  @Parameter(names = {
+      "--config-xml-file"}, description = "path to the config file in Hive", 
required = true)
+  public String configFile;
+
+  public Properties properties = new Properties();
+
+  private boolean finalize = false;
+
+  public void load() throws IOException {
+    if (finalize) {
+      throw new RuntimeException("trying to modify finalized config");
+    }
+    finalize = true;
+    try (InputStream configStream = new FileInputStream(new File(configFile))) 
{
+      properties.loadFromXML(configStream);
+    }
+    if (StringUtils.isNullOrEmpty(globallyReplicatedTimeStamp)) {
+      throw new RuntimeException("globally replicated timestamp not set");
+    }
+  }
+
+  GlobalHiveSyncConfig mkGlobalHiveSyncConfig(boolean forRemote) {
+    GlobalHiveSyncConfig cfg = GlobalHiveSyncConfig.copy(this);
+    cfg.basePath = forRemote ? properties.getProperty(REMOTE_BASE_PATH)
+        : properties.getProperty(LOCAL_BASE_PATH, cfg.basePath);
+    cfg.jdbcUrl = forRemote ? 
properties.getProperty(REMOTE_HIVE_SERVER_JDBC_URLS)
+        : properties.getProperty(LOCAL_HIVE_SERVER_JDBC_URLS, cfg.jdbcUrl);
+    LOG.info("building hivesync config forRemote: " + forRemote + " " + 
cfg.jdbcUrl + " "
+        + cfg.basePath);
+    return cfg;
+  }
+
+  @Override
+  public String toString() {
+    return "HiveSyncGlobalCommitConfig{ " + "configFile=" + configFile + ", 
properties="
+        + properties + ", " + super.toString()
+        + " }";
+  }
+
+  public void storeToXML(OutputStream configStream) throws IOException {
+    this.properties.storeToXML(configStream, "hivesync global config");
+  }
+}
diff --git 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/HiveSyncGlobalCommitTool.java
 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/HiveSyncGlobalCommitTool.java
new file mode 100644
index 0000000..a194eeb
--- /dev/null
+++ 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/HiveSyncGlobalCommitTool.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hive.replication;
+
+import static 
org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.LOCAL_HIVE_SITE_URI;
+import static 
org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.REMOTE_HIVE_SITE_URI;
+
+import com.beust.jcommander.JCommander;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+
+import org.apache.hudi.hive.HoodieHiveSyncException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+public class HiveSyncGlobalCommitTool implements HiveSyncGlobalCommit, 
AutoCloseable {
+
+  private static final Logger LOG = 
LogManager.getLogger(HiveSyncGlobalCommitTool.class);
+  private final HiveSyncGlobalCommitConfig config;
+  private List<ReplicationStateSync> replicationStateSyncList;
+
+  private ReplicationStateSync getReplicatedState(boolean forRemote) {
+    HiveConf hiveConf = new HiveConf();
+    // we probably just need to set the metastore URIs
+    // TODO: figure out how to integrate this in production
+    // how to load balance between piper HMS,HS2
+    // if we have list of uris, we can do something similar to createHiveConf 
in reairsync
+    hiveConf.addResource(new Path(config.properties.getProperty(
+        forRemote ? REMOTE_HIVE_SITE_URI : LOCAL_HIVE_SITE_URI)));
+    // TODO: get clusterId as input parameters
+    ReplicationStateSync state = new 
ReplicationStateSync(config.mkGlobalHiveSyncConfig(forRemote),
+        hiveConf, forRemote ? "REMOTESYNC" : "LOCALSYNC");
+    return state;
+  }
+
+  @Override
+  public boolean commit() {
+    // TODO: add retry attempts
+    String name = Thread.currentThread().getName();
+    try {
+      for (ReplicationStateSync stateSync : replicationStateSyncList) {
+        Thread.currentThread().setName(stateSync.getClusterId());
+        LOG.info("starting sync for state " + stateSync);
+        stateSync.sync();
+        LOG.info("synced state " + stateSync);
+      }
+    } catch (Exception e) {
+      Thread.currentThread().setName(name);
+      LOG.error(String.format("Error while trying to commit replication state 
%s", e.getMessage()), e);
+      return false;
+    } finally {
+      Thread.currentThread().setName(name);
+    }
+
+    LOG.info("done syncing to all tables, verifying the timestamps...");
+    ReplicationStateSync base = replicationStateSyncList.get(0);
+    boolean success = true;
+    LOG.info("expecting all timestamps to be similar to: " + base);
+    for (int idx = 1; idx < replicationStateSyncList.size(); ++idx) {
+      ReplicationStateSync other = replicationStateSyncList.get(idx);
+      if (!base.replicationStateIsInSync(other)) {
+        LOG.error("the timestamp of other : " + other + " is not matching with 
base: " + base);
+        success = false;
+      }
+    }
+    return success;
+  }
+
+  @Override
+  public boolean rollback() {
+    for (ReplicationStateSync stateSync : replicationStateSyncList) {
+      stateSync.rollback();
+    }
+    return true;
+  }
+
+  public HiveSyncGlobalCommitTool(HiveSyncGlobalCommitConfig config) {
+    this.config = config;
+    this.replicationStateSyncList = new ArrayList<>(2);
+    this.replicationStateSyncList.add(getReplicatedState(false));
+    this.replicationStateSyncList.add(getReplicatedState(true));
+  }
+
+  private static HiveSyncGlobalCommitConfig 
getHiveSyncGlobalCommitConfig(String[] args)
+      throws IOException {
+    HiveSyncGlobalCommitConfig cfg = new HiveSyncGlobalCommitConfig();
+    JCommander cmd = new JCommander(cfg, null, args);
+    if (cfg.help || args.length == 0) {
+      cmd.usage();
+      System.exit(1);
+    }
+    cfg.load();
+    return cfg;
+  }
+
+  @Override
+  public void close() {
+    for (ReplicationStateSync stateSync : replicationStateSyncList) {
+      stateSync.close();
+    }
+  }
+
+  public static void main(String[] args) throws IOException, 
HoodieHiveSyncException {
+    final HiveSyncGlobalCommitConfig cfg = getHiveSyncGlobalCommitConfig(args);
+    try (final HiveSyncGlobalCommitTool globalCommitTool = new 
HiveSyncGlobalCommitTool(cfg)) {
+      boolean success = globalCommitTool.commit();
+      if (!success) {
+        if (!globalCommitTool.rollback()) {
+          throw new RuntimeException("not able to rollback failed commit");
+        }
+      }
+    } catch (Exception e) {
+      throw new HoodieHiveSyncException(
+          "not able to commit replicated timestamp", e);
+    }
+  }
+}
diff --git 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/ReplicationStateSync.java
 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/ReplicationStateSync.java
new file mode 100644
index 0000000..bf806fe
--- /dev/null
+++ 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/ReplicationStateSync.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hive.replication;
+
+import java.util.Map;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hudi.common.util.Option;
+
+public class ReplicationStateSync {
+
+  private GlobalHiveSyncTool globalHiveSyncTool;
+  private final GlobalHiveSyncConfig globalHiveSyncConfig;
+  private final HiveConf hiveConf;
+  private Map<String, Option<String>> replicatedTimeStampMap;
+  private Map<String, Option<String>> oldReplicatedTimeStampMap;
+  private final String clusterId;
+
+  ReplicationStateSync(GlobalHiveSyncConfig conf, HiveConf hiveConf, String 
uid) {
+    this.globalHiveSyncConfig = conf;
+    this.hiveConf = hiveConf;
+    initGlobalHiveSyncTool();
+    replicatedTimeStampMap = 
globalHiveSyncTool.getLastReplicatedTimeStampMap();
+    clusterId = uid;
+  }
+
+  private void initGlobalHiveSyncTool() {
+    globalHiveSyncTool = 
GlobalHiveSyncTool.buildGlobalHiveSyncTool(globalHiveSyncConfig, hiveConf);
+  }
+
+  public void sync() throws Exception {
+    // the cluster maybe down by the time we reach here so we refresh our 
replication
+    // state right before we set the oldReplicatedTimeStamp to narrow this 
window. this is a
+    // liveliness check right before we start.
+    replicatedTimeStampMap = 
globalHiveSyncTool.getLastReplicatedTimeStampMap();
+    // it is possible sync fails midway and corrupts the table property 
therefore we should set
+    // the oldReplicatedTimeStampMap before the sync start so that we attempt 
to rollback
+    // this will help in scenario where sync failed due to some bug in 
hivesync but in case where
+    // cluster went down halfway through or before sync in this case rollback 
may also fail and
+    // that is ok and we want to be alerted to such scenarios.
+    oldReplicatedTimeStampMap = replicatedTimeStampMap;
+    globalHiveSyncTool.syncHoodieTable();
+    replicatedTimeStampMap = 
globalHiveSyncTool.getLastReplicatedTimeStampMap();
+  }
+
+  public boolean rollback() {
+    if (oldReplicatedTimeStampMap != null) {
+      globalHiveSyncTool.setLastReplicatedTimeStamp(oldReplicatedTimeStampMap);
+      oldReplicatedTimeStampMap = null;
+    }
+    return true;
+  }
+
+  public boolean replicationStateIsInSync(ReplicationStateSync other) {
+    return globalHiveSyncTool.getLastReplicatedTimeStampMap()
+        .equals(other.globalHiveSyncTool.getLastReplicatedTimeStampMap());
+  }
+
+  @Override
+  public String toString() {
+    return  "{ clusterId: " + clusterId + " replicatedState: " + 
replicatedTimeStampMap + " }";
+  }
+
+  public String getClusterId() {
+    return clusterId;
+  }
+
+  public void close() {
+    if (globalHiveSyncTool != null) {
+      globalHiveSyncTool.close();
+      globalHiveSyncTool = null;
+    }
+  }
+
+}
diff --git 
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncGlobalCommitTool.java
 
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncGlobalCommitTool.java
new file mode 100644
index 0000000..9372433
--- /dev/null
+++ 
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncGlobalCommitTool.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hive;
+
+import static 
org.apache.hudi.hadoop.utils.HoodieHiveUtils.GLOBALLY_CONSISTENT_READ_TIMESTAMP;
+import static 
org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.LOCAL_BASE_PATH;
+import static 
org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.LOCAL_HIVE_SERVER_JDBC_URLS;
+import static 
org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.LOCAL_HIVE_SITE_URI;
+import static 
org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.REMOTE_BASE_PATH;
+import static 
org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.REMOTE_HIVE_SERVER_JDBC_URLS;
+import static 
org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.REMOTE_HIVE_SITE_URI;
+
+import java.util.Collections;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig;
+import org.apache.hudi.hive.replication.HiveSyncGlobalCommitTool;
+import org.apache.hudi.hive.testutils.TestCluster;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class TestHiveSyncGlobalCommitTool {
+
+  @RegisterExtension
+  public static TestCluster localCluster = new TestCluster();
+  @RegisterExtension
+  public static TestCluster remoteCluster = new TestCluster();
+
+  private static String DB_NAME = "foo";
+  private static String TBL_NAME = "bar";
+
+  private HiveSyncGlobalCommitConfig getGlobalCommitConfig(
+      String commitTime, String dbName, String tblName) throws Exception {
+    HiveSyncGlobalCommitConfig config = new HiveSyncGlobalCommitConfig();
+    config.properties.setProperty(LOCAL_HIVE_SITE_URI, 
localCluster.getHiveSiteXmlLocation());
+    config.properties.setProperty(REMOTE_HIVE_SITE_URI, 
remoteCluster.getHiveSiteXmlLocation());
+    config.properties.setProperty(LOCAL_HIVE_SERVER_JDBC_URLS, 
localCluster.getHiveJdBcUrl());
+    config.properties.setProperty(REMOTE_HIVE_SERVER_JDBC_URLS, 
remoteCluster.getHiveJdBcUrl());
+    config.properties.setProperty(LOCAL_BASE_PATH, 
localCluster.tablePath(dbName, tblName));
+    config.properties.setProperty(REMOTE_BASE_PATH, 
remoteCluster.tablePath(dbName, tblName));
+    config.globallyReplicatedTimeStamp = commitTime;
+    config.hiveUser = System.getProperty("user.name");
+    config.hivePass = "";
+    config.databaseName = dbName;
+    config.tableName = tblName;
+    config.basePath = localCluster.tablePath(dbName, tblName);
+    config.assumeDatePartitioning = true;
+    config.usePreApacheInputFormat = false;
+    config.partitionFields = Collections.singletonList("datestr");
+    return config;
+  }
+
+  private void compareEqualLastReplicatedTimeStamp(HiveSyncGlobalCommitConfig 
config) throws Exception {
+    Assertions.assertEquals(localCluster.getHMSClient()
+        .getTable(config.databaseName, config.tableName).getParameters()
+        .get(GLOBALLY_CONSISTENT_READ_TIMESTAMP), remoteCluster.getHMSClient()
+        .getTable(config.databaseName, config.tableName).getParameters()
+        .get(GLOBALLY_CONSISTENT_READ_TIMESTAMP), "compare replicated 
timestamps");
+  }
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    localCluster.forceCreateDb(DB_NAME);
+    remoteCluster.forceCreateDb(DB_NAME);
+    localCluster.dfsCluster.getFileSystem().delete(new 
Path(localCluster.tablePath(DB_NAME, TBL_NAME)), true);
+    remoteCluster.dfsCluster.getFileSystem().delete(new 
Path(remoteCluster.tablePath(DB_NAME, TBL_NAME)), true);
+  }
+
+  @AfterEach
+  public void clear() throws Exception {
+    localCluster.getHMSClient().dropTable(DB_NAME, TBL_NAME);
+    remoteCluster.getHMSClient().dropTable(DB_NAME, TBL_NAME);
+  }
+
+  @Test
+  public void testBasicGlobalCommit() throws Exception {
+    String commitTime = "100";
+    localCluster.createCOWTable(commitTime, 5, DB_NAME, TBL_NAME);
+    // simulate drs
+    remoteCluster.createCOWTable(commitTime, 5, DB_NAME, TBL_NAME);
+    HiveSyncGlobalCommitConfig config = getGlobalCommitConfig(commitTime, 
DB_NAME, TBL_NAME);
+    HiveSyncGlobalCommitTool tool = new HiveSyncGlobalCommitTool(config);
+    Assertions.assertTrue(tool.commit());
+    compareEqualLastReplicatedTimeStamp(config);
+  }
+
+  @Test
+  public void testBasicRollback() throws Exception {
+    String commitTime = "100";
+    localCluster.createCOWTable(commitTime, 5, DB_NAME, TBL_NAME);
+    // simulate drs
+    remoteCluster.createCOWTable(commitTime, 5, DB_NAME, TBL_NAME);
+    HiveSyncGlobalCommitConfig config = getGlobalCommitConfig(commitTime, 
DB_NAME, TBL_NAME);
+    HiveSyncGlobalCommitTool tool = new HiveSyncGlobalCommitTool(config);
+    Assertions.assertFalse(localCluster.getHMSClient().tableExists(DB_NAME, 
TBL_NAME));
+    Assertions.assertFalse(remoteCluster.getHMSClient().tableExists(DB_NAME, 
TBL_NAME));
+    // stop the remote cluster hive server to simulate cluster going down
+    remoteCluster.stopHiveServer2();
+    Assertions.assertFalse(tool.commit());
+    Assertions.assertEquals(commitTime, localCluster.getHMSClient()
+        .getTable(config.databaseName, config.tableName).getParameters()
+        .get(GLOBALLY_CONSISTENT_READ_TIMESTAMP));
+    Assertions.assertTrue(tool.rollback()); // do a rollback
+    Assertions.assertNotEquals(commitTime, localCluster.getHMSClient()
+        .getTable(config.databaseName, config.tableName).getParameters()
+        .get(GLOBALLY_CONSISTENT_READ_TIMESTAMP));
+    Assertions.assertFalse(remoteCluster.getHMSClient().tableExists(DB_NAME, 
TBL_NAME));
+    remoteCluster.startHiveServer2();
+  }
+}
diff --git 
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
 
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
index 4324a64..e88c46a 100644
--- 
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
+++ 
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
@@ -18,22 +18,24 @@
 
 package org.apache.hudi.hive;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.hadoop.hive.ql.Driver;
-import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.testutils.NetworkTestUtils;
 import org.apache.hudi.common.testutils.SchemaTestUtil;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.hive.testutils.HiveTestUtil;
 import org.apache.hudi.hive.util.ConfigUtils;
+import org.apache.hudi.hive.util.HiveSchemaUtil;
 import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent;
 import 
org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent.PartitionEventType;
-import org.apache.hudi.hive.testutils.HiveTestUtil;
-import org.apache.hudi.hive.util.HiveSchemaUtil;
 
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.OriginalType;
 import org.apache.parquet.schema.PrimitiveType;
@@ -48,9 +50,12 @@ import org.junit.jupiter.params.provider.MethodSource;
 
 import java.io.IOException;
 import java.net.URISyntaxException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Stream;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -68,12 +73,12 @@ public class TestHiveSyncTool {
   }
 
   @BeforeEach
-  public void setUp() throws IOException, InterruptedException {
+  public void setUp() throws Exception {
     HiveTestUtil.setUp();
   }
 
   @AfterEach
-  public void teardown() throws IOException {
+  public void teardown() throws Exception {
     HiveTestUtil.clear();
   }
 
@@ -246,6 +251,7 @@ public class TestHiveSyncTool {
     hiveClient = new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, 
HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
     List<Partition> hivePartitions = 
hiveClient.scanTablePartitions(HiveTestUtil.hiveSyncConfig.tableName);
     List<String> writtenPartitionsSince = 
hiveClient.getPartitionsWrittenToSince(Option.empty());
+    //writtenPartitionsSince.add(newPartition.get(0));
     List<PartitionEvent> partitionEvents = 
hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince);
     assertEquals(1, partitionEvents.size(), "There should be only one 
partition event");
     assertEquals(PartitionEventType.UPDATE, 
partitionEvents.iterator().next().eventType,
@@ -769,6 +775,136 @@ public class TestHiveSyncTool {
         "Table " + HiveTestUtil.hiveSyncConfig.tableName + " should not exist 
initially");
   }
 
+  private void verifyOldParquetFileTest(HoodieHiveClient hiveClient, String 
emptyCommitTime) throws Exception {
+    
assertTrue(hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName), 
"Table " + HiveTestUtil.hiveSyncConfig.tableName + " should exist after sync 
completes");
+    
assertEquals(hiveClient.getTableSchema(HiveTestUtil.hiveSyncConfig.tableName).size(),
+        hiveClient.getDataSchema().getColumns().size() + 1,
+        "Hive Schema should match the table schema + partition field");
+    assertEquals(1, 
hiveClient.scanTablePartitions(HiveTestUtil.hiveSyncConfig.tableName).size(),"Table
 partitions should match the number of partitions we wrote");
+    assertEquals(emptyCommitTime,
+        
hiveClient.getLastCommitTimeSynced(HiveTestUtil.hiveSyncConfig.tableName).get(),"The
 last commit that was sycned should be updated in the TBLPROPERTIES");
+
+    // make sure correct schema is picked
+    Schema schema = SchemaTestUtil.getSimpleSchema();
+    for (Field field : schema.getFields()) {
+      assertEquals(field.schema().getType().getName(),
+          
hiveClient.getTableSchema(HiveTestUtil.hiveSyncConfig.tableName).get(field.name()).toLowerCase(),
+          String.format("Hive Schema Field %s was added", field));
+    }
+    assertEquals("string",
+        
hiveClient.getTableSchema(HiveTestUtil.hiveSyncConfig.tableName).get("datestr").toLowerCase(),
 "Hive Schema Field datestr was added");
+    assertEquals(schema.getFields().size() + 1 + 
HoodieRecord.HOODIE_META_COLUMNS.size(),
+        
hiveClient.getTableSchema(HiveTestUtil.hiveSyncConfig.tableName).size(),"Hive 
Schema fields size");
+  }
+
+  @ParameterizedTest
+  @MethodSource("useJdbc")
+  public void testPickingOlderParquetFileIfLatestIsEmptyCommit(boolean 
useJdbc) throws Exception {
+    HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc;
+    final String commitTime = "100";
+    HiveTestUtil.createCOWTable(commitTime, 1, true);
+    HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
+    // create empty commit
+    final String emptyCommitTime = "200";
+    HiveTestUtil.createCommitFileWithSchema(commitMetadata, emptyCommitTime, 
true);
+    HoodieHiveClient hiveClient =
+        new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, 
HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
+    
assertFalse(hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName),"Table
 " + HiveTestUtil.hiveSyncConfig.tableName + " should not exist initially");
+
+    HiveSyncTool tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, 
HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
+    tool.syncHoodieTable();
+
+    verifyOldParquetFileTest(hiveClient, emptyCommitTime);
+  }
+
+  @ParameterizedTest
+  @MethodSource("useJdbc")
+  public void testNotPickingOlderParquetFileWhenLatestCommitReadFails(boolean 
useJdbc) throws Exception {
+    HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc;
+    final String commitTime = "100";
+    HiveTestUtil.createCOWTable(commitTime, 1, true);
+    HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
+
+    // evolve the schema
+    DateTime dateTime = DateTime.now().plusDays(6);
+    String commitTime2 = "101";
+    HiveTestUtil.addCOWPartitions(1, false, true, dateTime, commitTime2);
+
+    // create empty commit
+    final String emptyCommitTime = "200";
+    HiveTestUtil.createCommitFile(commitMetadata, emptyCommitTime);
+
+    HoodieHiveClient hiveClient =
+        new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, 
HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
+    assertFalse(
+        
hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName),"Table " + 
HiveTestUtil.hiveSyncConfig.tableName + " should not exist initially");
+
+    HiveSyncTool tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, 
HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
+
+    // now delete the evolved commit instant
+    Path fullPath = new Path(HiveTestUtil.hiveSyncConfig.basePath + "/" + 
HoodieTableMetaClient.METAFOLDER_NAME + "/"
+        + hiveClient.getActiveTimeline().getInstants()
+        .filter(inst -> inst.getTimestamp().equals(commitTime2))
+        .findFirst().get().getFileName());
+    assertTrue(HiveTestUtil.fileSystem.delete(fullPath, false));
+
+    try {
+      tool.syncHoodieTable();
+    } catch (RuntimeException e) {
+      // we expect the table sync to fail
+    }
+
+    // table should not be synced yet
+    assertFalse(
+        
hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName),"Table " + 
HiveTestUtil.hiveSyncConfig.tableName + " should not exist at all");
+  }
+
+  @ParameterizedTest
+  @MethodSource("useJdbc")
+  public void 
testNotPickingOlderParquetFileWhenLatestCommitReadFailsForExistingTable(boolean 
useJdbc) throws Exception {
+    HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc;
+    final String commitTime = "100";
+    HiveTestUtil.createCOWTable(commitTime, 1, true);
+    HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
+    // create empty commit
+    final String emptyCommitTime = "200";
+    HiveTestUtil.createCommitFileWithSchema(commitMetadata, emptyCommitTime, 
true);
+    //HiveTestUtil.createCommitFile(commitMetadata, emptyCommitTime);
+    HoodieHiveClient hiveClient =
+        new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, 
HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
+    assertFalse(
+        hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName), 
"Table " + HiveTestUtil.hiveSyncConfig.tableName + " should not exist 
initially");
+
+    HiveSyncTool tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, 
HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
+    tool.syncHoodieTable();
+
+    verifyOldParquetFileTest(hiveClient, emptyCommitTime);
+
+    // evolve the schema
+    DateTime dateTime = DateTime.now().plusDays(6);
+    String commitTime2 = "301";
+    HiveTestUtil.addCOWPartitions(1, false, true, dateTime, commitTime2);
+    //HiveTestUtil.createCommitFileWithSchema(commitMetadata, "400", false); 
// create another empty commit
+    //HiveTestUtil.createCommitFile(commitMetadata, "400"); // create another 
empty commit
+
+    tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, 
HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
+    HoodieHiveClient hiveClientLatest = new 
HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), 
HiveTestUtil.fileSystem);
+    // now delete the evolved commit instant
+    Path fullPath = new Path(HiveTestUtil.hiveSyncConfig.basePath + "/" + 
HoodieTableMetaClient.METAFOLDER_NAME + "/"
+        + hiveClientLatest.getActiveTimeline().getInstants()
+            .filter(inst -> inst.getTimestamp().equals(commitTime2))
+            .findFirst().get().getFileName());
+    assertTrue(HiveTestUtil.fileSystem.delete(fullPath, false));
+    try {
+      tool.syncHoodieTable();
+    } catch (RuntimeException e) {
+      // we expect the table sync to fail
+    }
+
+    // old sync values should be left intact
+    verifyOldParquetFileTest(hiveClient, emptyCommitTime);
+  }
+
   @ParameterizedTest
   @MethodSource("useJdbc")
   public void testTypeConverter(boolean useJdbc) throws Exception {
@@ -807,5 +943,137 @@ public class TestHiveSyncTool {
         .containsValue("BIGINT"), errorMsg);
     hiveClient.updateHiveSQL(dropTableSql);
   }
+  /*
+  private void verifyOldParquetFileTest(HoodieHiveClient hiveClient, String 
emptyCommitTime) throws Exception {
+    
assertTrue(hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName),
+        "Table " + HiveTestUtil.hiveSyncConfig.tableName + " should exist 
after sync completes");
+    
assertEquals(hiveClient.getTableSchema(HiveTestUtil.hiveSyncConfig.tableName).size(),
+        hiveClient.getDataSchema().getColumns().size() + 1,
+        "Hive Schema should match the table schema + partition field");
+    assertEquals(1,
+        
hiveClient.scanTablePartitions(HiveTestUtil.hiveSyncConfig.tableName).size(),
+        "Table partitions should match the number of partitions we wrote");
+    assertEquals("The last commit that was sycned should be updated in the 
TBLPROPERTIES", emptyCommitTime,
+        
hiveClient.getLastCommitTimeSynced(HiveTestUtil.hiveSyncConfig.tableName).get());
+
+    // make sure correct schema is picked
+    Schema schema = SchemaTestUtil.getSimpleSchema();
+    for (Field field : schema.getFields()) {
+      assertEquals(String.format("Hive Schema Field %s was added", field), 
field.schema().getType().getName(),
+          
hiveClient.getTableSchema(HiveTestUtil.hiveSyncConfig.tableName).get(field.name()).toLowerCase());
+    }
+    assertEquals("Hive Schema Field datestr was added", "string",
+        
hiveClient.getTableSchema(HiveTestUtil.hiveSyncConfig.tableName).get("datestr").toLowerCase());
+    assertEquals(schema.getFields().size() + 1,
+        
hiveClient.getTableSchema(HiveTestUtil.hiveSyncConfig.tableName).size(),
+        "Hive Schema fields size");
+  }
+
+  @ParameterizedTest
+  @MethodSource("useJdbc")
+  public void testPickingOlderParquetFileIfLatestIsEmptyCommit(boolean 
useJdbc) throws Exception {
+    HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc;
+    final String commitTime = "100";
+    HiveTestUtil.createCOWTable(commitTime, 1, false);
+    HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
+    // create empty commit
+    final String emptyCommitTime = "200";
+    HiveTestUtil.createCommitFile(commitMetadata, emptyCommitTime);
+    HoodieHiveClient hiveClient =
+        new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, 
HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
+    
assertFalse(hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName),
+        "Table " + HiveTestUtil.hiveSyncConfig.tableName + " should not exist 
initially");
+
+    HiveSyncTool tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, 
HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
+    tool.syncHoodieTable();
+
+    verifyOldParquetFileTest(hiveClient, emptyCommitTime);
+  }
+
+  @ParameterizedTest
+  @MethodSource("useJdbc")
+  public void testNotPickingOlderParquetFileWhenLatestCommitReadFails(boolean 
useJdbc) throws Exception {
+    HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc;
+    final String commitTime = "100";
+    HiveTestUtil.createCOWTable(commitTime, 1, false);
+    HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
+
+    // evolve the schema
+    DateTime dateTime = DateTime.now().plusDays(6);
+    String commitTime2 = "101";
+    HiveTestUtil.addCOWPartitions(1, false, false, dateTime, commitTime2);
+
+    // create empty commit
+    final String emptyCommitTime = "200";
+    HiveTestUtil.createCommitFile(commitMetadata, emptyCommitTime);
+
+    HoodieHiveClient hiveClient =
+        new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, 
HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
+    
assertFalse(hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName),
+        "Table " + HiveTestUtil.hiveSyncConfig.tableName + " should not exist 
initially");
+
+    HiveSyncTool tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, 
HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
+
+    // now delete the evolved commit instant
+    Path fullPath = new Path(HiveTestUtil.hiveSyncConfig.basePath + "/" + 
HoodieTableMetaClient.METAFOLDER_NAME + "/"
+        + hiveClient.getActiveTimeline().getInstants()
+        .filter(inst -> inst.getTimestamp().equals(commitTime2))
+        .findFirst().get().getFileName());
+    assertTrue(HiveTestUtil.fileSystem.delete(fullPath, false));
+
+    try {
+      tool.syncHoodieTable();
+    } catch (RuntimeException e) {
+      // we expect the table sync to fail
+    }
+
+    // table should not be synced yet
+    
assertFalse(hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName),
+        "Table " + HiveTestUtil.hiveSyncConfig.tableName + " should not exist 
at all");
+  }
+
+  @ParameterizedTest
+  @MethodSource("useJdbc")
+  public void 
testNotPickingOlderParquetFileWhenLatestCommitReadFailsForExistingTable(boolean 
useJdbc) throws Exception {
+    HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc;
+    final String commitTime = "100";
+    HiveTestUtil.createCOWTable(commitTime, 1, false);
+    HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
+    // create empty commit
+    final String emptyCommitTime = "200";
+    HiveTestUtil.createCommitFile(commitMetadata, emptyCommitTime);
+    HoodieHiveClient hiveClient =
+        new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, 
HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
+    
assertFalse(hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName),
+        "Table " + HiveTestUtil.hiveSyncConfig.tableName + " should not exist 
initially");
+
+    HiveSyncTool tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, 
HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
+    tool.syncHoodieTable();
+
+    verifyOldParquetFileTest(hiveClient, emptyCommitTime);
+
+    // evolve the schema
+    DateTime dateTime = DateTime.now().plusDays(6);
+    String commitTime2 = "301";
+    HiveTestUtil.addCOWPartitions(1, false, false, dateTime, commitTime2);
+    HiveTestUtil.createCommitFile(commitMetadata, "400"); // create another 
empty commit
+
+    tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, 
HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
+    HoodieHiveClient hiveClientLatest = new 
HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), 
HiveTestUtil.fileSystem);
+    // now delete the evolved commit instant
+    Path fullPath = new Path(HiveTestUtil.hiveSyncConfig.basePath + "/" + 
HoodieTableMetaClient.METAFOLDER_NAME + "/"
+        + hiveClientLatest.getActiveTimeline().getInstants()
+            .filter(inst -> inst.getTimestamp().equals(commitTime2))
+            .findFirst().get().getFileName());
+    assertTrue(HiveTestUtil.fileSystem.delete(fullPath, false));
+
+    try {
+      tool.syncHoodieTable();
+    } catch (RuntimeException e) {
+      // we expect the table sync to fail
+    }
 
+    // old sync values should be left intact
+    verifyOldParquetFileTest(hiveClient, emptyCommitTime);
+  }*/
 }
diff --git 
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestService.java
 
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestService.java
index ac083ab..66343bf 100644
--- 
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestService.java
+++ 
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestService.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.util.FileIOUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.HiveMetaStore;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.IHMSHandler;
@@ -78,6 +79,7 @@ public class HiveTestService {
   private ExecutorService executorService;
   private TServer tServer;
   private HiveServer2 hiveServer;
+  private HiveConf serverConf;
 
   public HiveTestService(Configuration hadoopConf) throws IOException {
     this.workDir = Files.createTempDirectory(System.currentTimeMillis() + 
"-").toFile().getAbsolutePath();
@@ -88,6 +90,14 @@ public class HiveTestService {
     return hadoopConf;
   }
 
+  public TServer getHiveMetaStore() { 
+    return tServer;
+  }
+
+  public HiveConf getServerConf() {
+    return serverConf;
+  }
+
   public HiveServer2 start() throws IOException {
     Objects.requireNonNull(workDir, "The work dir must be set before starting 
cluster.");
 
@@ -102,10 +112,10 @@ public class HiveTestService {
       FileIOUtils.deleteDirectory(file);
     }
 
-    HiveConf serverConf = configureHive(hadoopConf, localHiveLocation);
+    serverConf = configureHive(hadoopConf, localHiveLocation);
 
     executorService = Executors.newSingleThreadExecutor();
-    tServer = startMetaStore(bindIP, metastorePort, serverConf);
+    tServer = startMetaStore(bindIP, serverConf);
 
     serverConf.set("hive.in.test", "true");
     hiveServer = startHiveServer(serverConf);
@@ -116,7 +126,7 @@ public class HiveTestService {
     } else {
       serverHostname = bindIP;
     }
-    if (!waitForServerUp(serverConf, serverHostname, metastorePort, 
CONNECTION_TIMEOUT)) {
+    if (!waitForServerUp(serverConf, serverHostname, CONNECTION_TIMEOUT)) {
       throw new IOException("Waiting for startup of standalone server");
     }
 
@@ -163,9 +173,17 @@ public class HiveTestService {
 
   public HiveConf configureHive(Configuration conf, String localHiveLocation) 
throws IOException {
     conf.set("hive.metastore.local", "false");
-    conf.set(HiveConf.ConfVars.METASTOREURIS.varname, "thrift://" + bindIP + 
":" + metastorePort);
+    int port = metastorePort;
+    if (conf.get(HiveConf.ConfVars.METASTORE_SERVER_PORT.varname, null) == 
null) {
+      conf.setInt(ConfVars.METASTORE_SERVER_PORT.varname, metastorePort);
+    } else {
+      port = conf.getInt(ConfVars.METASTORE_SERVER_PORT.varname, 
metastorePort);
+    }
+    if (conf.get(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_PORT.varname, null) == 
null) {
+      conf.setInt(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname, serverPort);
+    }
+    conf.set(HiveConf.ConfVars.METASTOREURIS.varname, "thrift://" + bindIP + 
":" + port);
     conf.set(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST.varname, bindIP);
-    conf.setInt(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_PORT.varname, 
serverPort);
     // The following line to turn of SASL has no effect since HiveAuthFactory 
calls
     // 'new HiveConf()'. This is fixed by 
https://issues.apache.org/jira/browse/HIVE-6657,
     // in Hive 0.14.
@@ -191,8 +209,9 @@ public class HiveTestService {
     return new HiveConf(conf, this.getClass());
   }
 
-  private boolean waitForServerUp(HiveConf serverConf, String hostname, int 
port, int timeout) {
+  private boolean waitForServerUp(HiveConf serverConf, String hostname, int 
timeout) {
     long start = System.currentTimeMillis();
+    int port = serverConf.getIntVar(HiveConf.ConfVars.METASTORE_SERVER_PORT);
     while (true) {
       try {
         new HiveMetaStoreClient(serverConf);
@@ -288,11 +307,12 @@ public class HiveTestService {
     }
   }
 
-  public TServer startMetaStore(String forceBindIP, int port, HiveConf conf) 
throws IOException {
+  public TServer startMetaStore(String forceBindIP, HiveConf conf) throws 
IOException {
     try {
       // Server will create new threads up to max as necessary. After an idle
       // period, it will destory threads to keep the number of threads in the
       // pool to min.
+      int port = conf.getIntVar(HiveConf.ConfVars.METASTORE_SERVER_PORT);
       int minWorkerThreads = 
conf.getIntVar(HiveConf.ConfVars.METASTORESERVERMINTHREADS);
       int maxWorkerThreads = 
conf.getIntVar(HiveConf.ConfVars.METASTORESERVERMAXTHREADS);
       boolean tcpKeepAlive = 
conf.getBoolVar(HiveConf.ConfVars.METASTORE_TCP_KEEP_ALIVE);
diff --git 
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
 
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
index 1d6bfb4..46f95f6 100644
--- 
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
+++ 
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
@@ -123,10 +123,10 @@ public class HiveTestUtil {
   public static void clear() throws IOException {
     fileSystem.delete(new Path(hiveSyncConfig.basePath), true);
     HoodieTableMetaClient.withPropertyBuilder()
-      .setTableType(HoodieTableType.COPY_ON_WRITE)
-      .setTableName(hiveSyncConfig.tableName)
-      .setPayloadClass(HoodieAvroPayload.class)
-      .initTable(configuration, hiveSyncConfig.basePath);
+        .setTableType(HoodieTableType.COPY_ON_WRITE)
+        .setTableName(hiveSyncConfig.tableName)
+        .setPayloadClass(HoodieAvroPayload.class)
+        .initTable(configuration, hiveSyncConfig.basePath);
 
     HoodieHiveClient client = new HoodieHiveClient(hiveSyncConfig, 
hiveServer.getHiveConf(), fileSystem);
     for (String tableName : createdTablesSet) {
@@ -158,10 +158,10 @@ public class HiveTestUtil {
     Path path = new Path(hiveSyncConfig.basePath);
     FileIOUtils.deleteDirectory(new File(hiveSyncConfig.basePath));
     HoodieTableMetaClient.withPropertyBuilder()
-      .setTableType(HoodieTableType.COPY_ON_WRITE)
-      .setTableName(hiveSyncConfig.tableName)
-      .setPayloadClass(HoodieAvroPayload.class)
-      .initTable(configuration, hiveSyncConfig.basePath);
+        .setTableType(HoodieTableType.COPY_ON_WRITE)
+        .setTableName(hiveSyncConfig.tableName)
+        .setPayloadClass(HoodieAvroPayload.class)
+        .initTable(configuration, hiveSyncConfig.basePath);
 
     boolean result = fileSystem.mkdirs(path);
     checkResult(result);
@@ -173,15 +173,15 @@ public class HiveTestUtil {
   }
 
   public static void createMORTable(String commitTime, String deltaCommitTime, 
int numberOfPartitions,
-      boolean createDeltaCommit, boolean useSchemaFromCommitMetadata)
+                                    boolean createDeltaCommit, boolean 
useSchemaFromCommitMetadata)
       throws IOException, URISyntaxException, InterruptedException {
     Path path = new Path(hiveSyncConfig.basePath);
     FileIOUtils.deleteDirectory(new File(hiveSyncConfig.basePath));
     HoodieTableMetaClient.withPropertyBuilder()
-      .setTableType(HoodieTableType.MERGE_ON_READ)
-      .setTableName(hiveSyncConfig.tableName)
-      .setPayloadClass(HoodieAvroPayload.class)
-      .initTable(configuration, hiveSyncConfig.basePath);
+        .setTableType(HoodieTableType.MERGE_ON_READ)
+        .setTableName(hiveSyncConfig.tableName)
+        .setPayloadClass(HoodieAvroPayload.class)
+        .initTable(configuration, hiveSyncConfig.basePath);
 
     boolean result = fileSystem.mkdirs(path);
     checkResult(result);
@@ -189,25 +189,25 @@ public class HiveTestUtil {
     HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, 
true,
         useSchemaFromCommitMetadata, dateTime, commitTime);
     createdTablesSet
-      .add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName + 
HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE);
+        .add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName + 
HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE);
     createdTablesSet
         .add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName + 
HiveSyncTool.SUFFIX_SNAPSHOT_TABLE);
     HoodieCommitMetadata compactionMetadata = new HoodieCommitMetadata();
     commitMetadata.getPartitionToWriteStats()
         .forEach((key, value) -> value.forEach(l -> 
compactionMetadata.addWriteStat(key, l)));
     addSchemaToCommitMetadata(compactionMetadata, 
commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY),
-                             useSchemaFromCommitMetadata);
+        useSchemaFromCommitMetadata);
     createCompactionCommitFile(compactionMetadata, commitTime);
     if (createDeltaCommit) {
       // Write a delta commit
       HoodieCommitMetadata deltaMetadata = 
createLogFiles(commitMetadata.getPartitionToWriteStats(), true,
-                                                          
useSchemaFromCommitMetadata);
+          useSchemaFromCommitMetadata);
       createDeltaCommitFile(deltaMetadata, deltaCommitTime);
     }
   }
 
   public static void addCOWPartitions(int numberOfPartitions, boolean 
isParquetSchemaSimple,
-      boolean useSchemaFromCommitMetadata, DateTime startFrom, String 
instantTime) throws IOException, URISyntaxException {
+                                      boolean useSchemaFromCommitMetadata, 
DateTime startFrom, String instantTime) throws IOException, URISyntaxException {
     HoodieCommitMetadata commitMetadata =
         createPartitions(numberOfPartitions, isParquetSchemaSimple, 
useSchemaFromCommitMetadata, startFrom, instantTime);
     createdTablesSet.add(hiveSyncConfig.databaseName + "." + 
hiveSyncConfig.tableName);
@@ -215,7 +215,7 @@ public class HiveTestUtil {
   }
 
   public static void addCOWPartition(String partitionPath, boolean 
isParquetSchemaSimple,
-      boolean useSchemaFromCommitMetadata, String instantTime) throws 
IOException, URISyntaxException {
+                                     boolean useSchemaFromCommitMetadata, 
String instantTime) throws IOException, URISyntaxException {
     HoodieCommitMetadata commitMetadata =
         createPartition(partitionPath, isParquetSchemaSimple, 
useSchemaFromCommitMetadata, instantTime);
     createdTablesSet.add(hiveSyncConfig.databaseName + "." + 
hiveSyncConfig.tableName);
@@ -223,7 +223,7 @@ public class HiveTestUtil {
   }
 
   public static void addMORPartitions(int numberOfPartitions, boolean 
isParquetSchemaSimple, boolean isLogSchemaSimple,
-      boolean useSchemaFromCommitMetadata, DateTime startFrom, String 
instantTime, String deltaCommitTime)
+                                      boolean useSchemaFromCommitMetadata, 
DateTime startFrom, String instantTime, String deltaCommitTime)
       throws IOException, URISyntaxException, InterruptedException {
     HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, 
isParquetSchemaSimple,
         useSchemaFromCommitMetadata, startFrom, instantTime);
@@ -233,7 +233,7 @@ public class HiveTestUtil {
     commitMetadata.getPartitionToWriteStats()
         .forEach((key, value) -> value.forEach(l -> 
compactionMetadata.addWriteStat(key, l)));
     addSchemaToCommitMetadata(compactionMetadata, 
commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY),
-                             useSchemaFromCommitMetadata);
+        useSchemaFromCommitMetadata);
     createCompactionCommitFile(compactionMetadata, instantTime);
     HoodieCommitMetadata deltaMetadata = 
createLogFiles(commitMetadata.getPartitionToWriteStats(), isLogSchemaSimple,
         useSchemaFromCommitMetadata);
@@ -241,7 +241,7 @@ public class HiveTestUtil {
   }
 
   private static HoodieCommitMetadata createLogFiles(Map<String, 
List<HoodieWriteStat>> partitionWriteStats,
-      boolean isLogSchemaSimple, boolean useSchemaFromCommitMetadata)
+                                                     boolean 
isLogSchemaSimple, boolean useSchemaFromCommitMetadata)
       throws InterruptedException, IOException, URISyntaxException {
     HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
     for (Entry<String, List<HoodieWriteStat>> wEntry : 
partitionWriteStats.entrySet()) {
@@ -261,7 +261,7 @@ public class HiveTestUtil {
   }
 
   private static HoodieCommitMetadata createPartitions(int numberOfPartitions, 
boolean isParquetSchemaSimple,
-      boolean useSchemaFromCommitMetadata, DateTime startFrom, String 
instantTime) throws IOException, URISyntaxException {
+                                                       boolean 
useSchemaFromCommitMetadata, DateTime startFrom, String instantTime) throws 
IOException, URISyntaxException {
     startFrom = startFrom.withTimeAtStartOfDay();
 
     HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
@@ -279,7 +279,7 @@ public class HiveTestUtil {
   }
 
   private static HoodieCommitMetadata createPartition(String partitionPath, 
boolean isParquetSchemaSimple,
-      boolean useSchemaFromCommitMetadata, String instantTime) throws 
IOException, URISyntaxException {
+                                                      boolean 
useSchemaFromCommitMetadata, String instantTime) throws IOException, 
URISyntaxException {
     HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
     Path partPath = new Path(hiveSyncConfig.basePath + "/" + partitionPath);
     fileSystem.makeQualified(partPath);
@@ -354,7 +354,7 @@ public class HiveTestUtil {
   }
 
   private static void addSchemaToCommitMetadata(HoodieCommitMetadata 
commitMetadata, boolean isSimpleSchema,
-      boolean useSchemaFromCommitMetadata) throws IOException {
+                                                boolean 
useSchemaFromCommitMetadata) throws IOException {
     if (useSchemaFromCommitMetadata) {
       Schema dataSchema = getTestDataSchema(isSimpleSchema);
       commitMetadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, 
dataSchema.toString());
@@ -362,7 +362,7 @@ public class HiveTestUtil {
   }
 
   private static void addSchemaToCommitMetadata(HoodieCommitMetadata 
commitMetadata, String schema,
-      boolean useSchemaFromCommitMetadata) {
+                                                boolean 
useSchemaFromCommitMetadata) {
     if (useSchemaFromCommitMetadata) {
       commitMetadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, schema);
     }
@@ -374,7 +374,7 @@ public class HiveTestUtil {
     }
   }
 
-  private static void createCommitFile(HoodieCommitMetadata commitMetadata, 
String instantTime) throws IOException {
+  public static void createCommitFile(HoodieCommitMetadata commitMetadata, 
String instantTime) throws IOException {
     byte[] bytes = 
commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8);
     Path fullPath = new Path(hiveSyncConfig.basePath + "/" + 
HoodieTableMetaClient.METAFOLDER_NAME + "/"
         + HoodieTimeline.makeCommitFileName(instantTime));
@@ -383,6 +383,11 @@ public class HiveTestUtil {
     fsout.close();
   }
 
+  public static void createCommitFileWithSchema(HoodieCommitMetadata 
commitMetadata, String instantTime, boolean isSimpleSchema) throws IOException {
+    addSchemaToCommitMetadata(commitMetadata, isSimpleSchema, true);
+    createCommitFile(commitMetadata, instantTime);
+  }
+
   private static void createCompactionCommitFile(HoodieCommitMetadata 
commitMetadata, String instantTime)
       throws IOException {
     byte[] bytes = 
commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8);
@@ -406,4 +411,4 @@ public class HiveTestUtil {
   public static Set<String> getCreatedTablesSet() {
     return createdTablesSet;
   }
-}
+}
\ No newline at end of file
diff --git 
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/TestCluster.java
 
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/TestCluster.java
new file mode 100644
index 0000000..6a077e1
--- /dev/null
+++ 
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/TestCluster.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hive.testutils;
+
+import org.apache.hudi.avro.HoodieAvroWriteSupport;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.BloomFilterFactory;
+import org.apache.hudi.common.bloom.BloomFilterTypeCode;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.testutils.SchemaTestUtil;
+import org.apache.hudi.common.testutils.minicluster.HdfsTestService;
+import org.apache.hudi.common.util.FileIOUtils;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hive.service.server.HiveServer2;
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.runners.model.InitializationError;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.UUID;
+
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class TestCluster implements BeforeAllCallback, AfterAllCallback,
+        BeforeEachCallback, AfterEachCallback {
+  private HdfsTestService hdfsTestService;
+  public HiveTestService hiveTestService;
+  private Configuration conf;
+  public HiveServer2 server2;
+  private static volatile int port = 9083;
+  public MiniDFSCluster dfsCluster;
+  DateTimeFormatter dtfOut;
+  public File hiveSiteXml;
+  private IMetaStoreClient client;
+
+  @Override
+  public void beforeAll(ExtensionContext context) throws Exception {
+    setup();
+  }
+
+  @Override
+  public void afterAll(ExtensionContext context) throws Exception {
+    shutDown();
+  }
+
+  @Override
+  public void beforeEach(ExtensionContext context) throws Exception {
+  }
+
+  @Override
+  public void afterEach(ExtensionContext context) throws Exception {
+  }
+
+  public void setup() throws Exception {
+    hdfsTestService = new HdfsTestService();
+    dfsCluster = hdfsTestService.start(true);
+
+    conf = hdfsTestService.getHadoopConf();
+    conf.setInt(ConfVars.METASTORE_SERVER_PORT.varname, port++);
+    conf.setInt(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname, port++);
+    conf.setInt(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname, port++);
+    hiveTestService = new HiveTestService(conf);
+    server2 = hiveTestService.start();
+    dtfOut = DateTimeFormat.forPattern("yyyy/MM/dd");
+    hiveSiteXml = File.createTempFile("hive-site", ".xml");
+    hiveSiteXml.deleteOnExit();
+    try (OutputStream os = new FileOutputStream(hiveSiteXml)) {
+      hiveTestService.getServerConf().writeXml(os);
+    }
+    client = HiveMetaStoreClient.newSynchronizedClient(
+        RetryingMetaStoreClient.getProxy(hiveTestService.getServerConf(), 
true));
+  }
+
+  public Configuration getConf() {
+    return this.conf;
+  }
+
+  public String getHiveSiteXmlLocation() {
+    return hiveSiteXml.getAbsolutePath();
+  }
+
+  public IMetaStoreClient getHMSClient() {
+    return client;
+  }
+
+  public String getHiveJdBcUrl() {
+    return "jdbc:hive2://127.0.0.1:" + 
conf.get(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname) + "";
+  }
+
+  public String tablePath(String dbName, String tableName) throws Exception {
+    return dbPath(dbName) + "/" + tableName;
+  }
+
+  private String dbPath(String dbName) throws Exception {
+    return dfsCluster.getFileSystem().getWorkingDirectory().toString() + "/" + 
dbName;
+  }
+
+  public void forceCreateDb(String dbName) throws Exception {
+    try {
+      getHMSClient().dropDatabase(dbName);
+    } catch (NoSuchObjectException e) {
+      System.out.println("db does not exist but its ok " + dbName);
+    }
+    Database db = new Database(dbName, "", dbPath(dbName), new HashMap<>());
+    getHMSClient().createDatabase(db);
+  }
+
+  public void createCOWTable(String commitTime, int numberOfPartitions, String 
dbName, String tableName)
+      throws Exception {
+    String tablePathStr = tablePath(dbName, tableName);
+    Path path = new Path(tablePathStr);
+    FileIOUtils.deleteDirectory(new File(path.toString()));
+    HoodieTableMetaClient.withPropertyBuilder()
+        .setTableType(HoodieTableType.COPY_ON_WRITE)
+        .setTableName(tableName)
+        .setPayloadClass(HoodieAvroPayload.class)
+        .initTable(conf, path.toString());
+    boolean result = dfsCluster.getFileSystem().mkdirs(path);
+    if (!result) {
+      throw new InitializationError("cannot initialize table");
+    }
+    DateTime dateTime = DateTime.now();
+    HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, 
true, dateTime, commitTime, path.toString());
+    createCommitFile(commitMetadata, commitTime, path.toString());
+  }
+
+  private void createCommitFile(HoodieCommitMetadata commitMetadata, String 
commitTime, String basePath) throws IOException {
+    byte[] bytes = 
commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8);
+    Path fullPath = new Path(basePath + "/" + 
HoodieTableMetaClient.METAFOLDER_NAME + "/"
+        + HoodieTimeline.makeCommitFileName(commitTime));
+    FSDataOutputStream fsout = dfsCluster.getFileSystem().create(fullPath, 
true);
+    fsout.write(bytes);
+    fsout.close();
+  }
+
+  private HoodieCommitMetadata createPartitions(int numberOfPartitions, 
boolean isParquetSchemaSimple,
+      DateTime startFrom, String commitTime, String basePath) throws 
IOException, URISyntaxException {
+    startFrom = startFrom.withTimeAtStartOfDay();
+
+    HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
+    for (int i = 0; i < numberOfPartitions; i++) {
+      String partitionPath = dtfOut.print(startFrom);
+      Path partPath = new Path(basePath + "/" + partitionPath);
+      dfsCluster.getFileSystem().makeQualified(partPath);
+      dfsCluster.getFileSystem().mkdirs(partPath);
+      List<HoodieWriteStat> writeStats = createTestData(partPath, 
isParquetSchemaSimple, commitTime);
+      startFrom = startFrom.minusDays(1);
+      writeStats.forEach(s -> commitMetadata.addWriteStat(partitionPath, s));
+    }
+    return commitMetadata;
+  }
+
+  private List<HoodieWriteStat> createTestData(Path partPath, boolean 
isParquetSchemaSimple, String commitTime)
+      throws IOException, URISyntaxException {
+    List<HoodieWriteStat> writeStats = new ArrayList<>();
+    for (int i = 0; i < 5; i++) {
+      // Create 5 files
+      String fileId = UUID.randomUUID().toString();
+      Path filePath = new Path(partPath.toString() + "/" + FSUtils
+          .makeDataFileName(commitTime, "1-0-1", fileId));
+      generateParquetData(filePath, isParquetSchemaSimple);
+      HoodieWriteStat writeStat = new HoodieWriteStat();
+      writeStat.setFileId(fileId);
+      writeStat.setPath(filePath.toString());
+      writeStats.add(writeStat);
+    }
+    return writeStats;
+  }
+
+  @SuppressWarnings({"unchecked", "deprecation"})
+  private void generateParquetData(Path filePath, boolean 
isParquetSchemaSimple)
+      throws IOException, URISyntaxException {
+    Schema schema = (isParquetSchemaSimple ? SchemaTestUtil.getSimpleSchema() 
: SchemaTestUtil.getEvolvedSchema());
+    org.apache.parquet.schema.MessageType parquetSchema = new 
AvroSchemaConverter().convert(schema);
+    BloomFilter filter = BloomFilterFactory.createBloomFilter(1000, 0.0001, -1,
+        BloomFilterTypeCode.SIMPLE.name());
+    HoodieAvroWriteSupport writeSupport = new 
HoodieAvroWriteSupport(parquetSchema, schema, filter);
+    ParquetWriter writer = new ParquetWriter(filePath, writeSupport, 
CompressionCodecName.GZIP, 120 * 1024 * 1024,
+        ParquetWriter.DEFAULT_PAGE_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, 
ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED,
+        ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED, 
ParquetWriter.DEFAULT_WRITER_VERSION, dfsCluster.getFileSystem().getConf());
+
+    List<IndexedRecord> testRecords = (isParquetSchemaSimple ? 
SchemaTestUtil.generateTestRecords(0, 100)
+        : SchemaTestUtil.generateEvolvedTestRecords(100, 100));
+    testRecords.forEach(s -> {
+      try {
+        writer.write(s);
+      } catch (IOException e) {
+        fail("IOException while writing test records as parquet" + 
e.toString());
+      }
+    });
+    writer.close();
+  }
+
+  public HiveConf getHiveConf() {
+    return server2.getHiveConf();
+  }
+
+  public void stopHiveServer2() {
+    if (server2 != null) {
+      server2.stop();
+      server2 = null;
+    }
+  }
+
+  public void startHiveServer2() {
+    if (server2 == null) {
+      server2 = new HiveServer2();
+      server2.init(hiveTestService.getServerConf());
+      server2.start();
+    }
+  }
+
+  public void shutDown() {
+    stopHiveServer2();
+    client.close();
+    hiveTestService.getHiveMetaStore().stop();
+    hdfsTestService.stop();
+  }
+}
diff --git 
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/TestHiveSyncGlobalCommitTool.java
 
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/TestHiveSyncGlobalCommitTool.java
new file mode 100644
index 0000000..980374e
--- /dev/null
+++ 
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/TestHiveSyncGlobalCommitTool.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hive.testutils;
+
+import org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig;
+import org.apache.hudi.hive.replication.HiveSyncGlobalCommitTool;
+
+import org.apache.hadoop.fs.Path;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+
+import static 
org.apache.hudi.hadoop.utils.HoodieHiveUtils.GLOBALLY_CONSISTENT_READ_TIMESTAMP;
+import static 
org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.LOCAL_BASE_PATH;
+import static 
org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.LOCAL_HIVE_SERVER_JDBC_URLS;
+import static 
org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.LOCAL_HIVE_SITE_URI;
+import static 
org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.REMOTE_BASE_PATH;
+import static 
org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.REMOTE_HIVE_SERVER_JDBC_URLS;
+import static 
org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.REMOTE_HIVE_SITE_URI;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestHiveSyncGlobalCommitTool {
+
+  TestCluster localCluster;
+  TestCluster remoteCluster;
+
+  private static String DB_NAME = "foo";
+  private static String TBL_NAME = "bar";
+
+  private HiveSyncGlobalCommitConfig getGlobalCommitConfig(
+      String commitTime, String dbName, String tblName) throws Exception {
+    HiveSyncGlobalCommitConfig config = new HiveSyncGlobalCommitConfig();
+    config.properties.setProperty(LOCAL_HIVE_SITE_URI, 
localCluster.getHiveSiteXmlLocation());
+    config.properties.setProperty(REMOTE_HIVE_SITE_URI, 
remoteCluster.getHiveSiteXmlLocation());
+    config.properties.setProperty(LOCAL_HIVE_SERVER_JDBC_URLS, 
localCluster.getHiveJdBcUrl());
+    config.properties.setProperty(REMOTE_HIVE_SERVER_JDBC_URLS, 
remoteCluster.getHiveJdBcUrl());
+    config.properties.setProperty(LOCAL_BASE_PATH, 
localCluster.tablePath(dbName, tblName));
+    config.properties.setProperty(REMOTE_BASE_PATH, 
remoteCluster.tablePath(dbName, tblName));
+    config.globallyReplicatedTimeStamp = commitTime;
+    config.hiveUser = System.getProperty("user.name");
+    config.hivePass = "";
+    config.databaseName = dbName;
+    config.tableName = tblName;
+    config.basePath = localCluster.tablePath(dbName, tblName);
+    config.assumeDatePartitioning = true;
+    config.usePreApacheInputFormat = false;
+    config.partitionFields = Collections.singletonList("datestr");
+    return config;
+  }
+
+  private void compareEqualLastReplicatedTimeStamp(HiveSyncGlobalCommitConfig 
config) throws Exception {
+    assertEquals(localCluster.getHMSClient().getTable(config.databaseName, 
config.tableName).getParameters().get(GLOBALLY_CONSISTENT_READ_TIMESTAMP),
+        remoteCluster.getHMSClient().getTable(config.databaseName, 
config.tableName).getParameters().get(GLOBALLY_CONSISTENT_READ_TIMESTAMP),
+        "compare replicated timestamps");
+  }
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    localCluster = new TestCluster();
+    localCluster.setup();
+    remoteCluster = new TestCluster();
+    remoteCluster.setup();
+    localCluster.forceCreateDb(DB_NAME);
+    remoteCluster.forceCreateDb(DB_NAME);
+    localCluster.dfsCluster.getFileSystem().delete(new 
Path(localCluster.tablePath(DB_NAME, TBL_NAME)), true);
+    remoteCluster.dfsCluster.getFileSystem().delete(new 
Path(remoteCluster.tablePath(DB_NAME, TBL_NAME)), true);
+  }
+
+  @AfterEach
+  public void clear() throws Exception {
+    localCluster.getHMSClient().dropTable(DB_NAME, TBL_NAME);
+    remoteCluster.getHMSClient().dropTable(DB_NAME, TBL_NAME);
+    localCluster.shutDown();
+    remoteCluster.shutDown();
+  }
+
+  @Test
+  public void testBasicGlobalCommit() throws Exception {
+    String commitTime = "100";
+    localCluster.createCOWTable(commitTime, 5, DB_NAME, TBL_NAME);
+    // simulate drs
+    remoteCluster.createCOWTable(commitTime, 5, DB_NAME, TBL_NAME);
+    HiveSyncGlobalCommitConfig config = getGlobalCommitConfig(commitTime, 
DB_NAME, TBL_NAME);
+    HiveSyncGlobalCommitTool tool = new HiveSyncGlobalCommitTool(config);
+    assertTrue(tool.commit());
+    compareEqualLastReplicatedTimeStamp(config);
+  }
+
+  @Test
+  public void testBasicRollback() throws Exception {
+    String commitTime = "100";
+    localCluster.createCOWTable(commitTime, 5, DB_NAME, TBL_NAME);
+    // simulate drs
+    remoteCluster.createCOWTable(commitTime, 5, DB_NAME, TBL_NAME);
+    HiveSyncGlobalCommitConfig config = getGlobalCommitConfig(commitTime, 
DB_NAME, TBL_NAME);
+    HiveSyncGlobalCommitTool tool = new HiveSyncGlobalCommitTool(config);
+    assertFalse(localCluster.getHMSClient().tableExists(DB_NAME, TBL_NAME));
+    assertFalse(remoteCluster.getHMSClient().tableExists(DB_NAME, TBL_NAME));
+    // stop the remote cluster hive server to simulate cluster going down
+    remoteCluster.stopHiveServer2();
+    assertFalse(tool.commit());
+    assertEquals(commitTime, localCluster.getHMSClient()
+        .getTable(config.databaseName, config.tableName).getParameters()
+        .get(GLOBALLY_CONSISTENT_READ_TIMESTAMP));
+    assertTrue(tool.rollback()); // do a rollback
+    assertNotEquals(commitTime, localCluster.getHMSClient()
+        .getTable(config.databaseName, config.tableName).getParameters()
+        .get(GLOBALLY_CONSISTENT_READ_TIMESTAMP));
+    assertFalse(remoteCluster.getHMSClient().tableExists(DB_NAME, TBL_NAME));
+    remoteCluster.startHiveServer2();
+  }
+}
diff --git 
a/hudi-utilities/src/test/resources/delta-streamer-config/hive-global-commit-config.xml
 
b/hudi-utilities/src/test/resources/delta-streamer-config/hive-global-commit-config.xml
new file mode 100644
index 0000000..e1a8347
--- /dev/null
+++ 
b/hudi-utilities/src/test/resources/delta-streamer-config/hive-global-commit-config.xml
@@ -0,0 +1,27 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+  -->
+<!DOCTYPE properties SYSTEM "http://java.sun.com/dtd/properties.dtd";>
+<properties>
+<comment>hi</comment>
+<entry key="hivesyncglobal.retry_attempts">1</entry>
+<entry 
key="hivesyncglobal.remote_hive_site_uri">/home/hive-remote-site.xml</entry>
+<entry 
key="hivesyncglobal.remote_base_path">hdfs://hadoop-cluster2:9000/tmp/hudi_trips_cow</entry>
+<entry 
key="hivesyncglobal.local_hive_site_uri">/home/hive/packaging/target/apache-hive-2.3.4-uber-51-SNAPSHOT-bin/apache-hive-2.3.4-uber-51-SNAPSHOT-bin/conf/hive-site.xml</entry>
+<entry 
key="hivesyncglobal.remote_hs2_jdbc_urls">jdbc:hive2://hadoop-cluster2:10000/default;transportMode=http;httpPath=hs2</entry>
+</properties>

Reply via email to