This is an automated email from the ASF dual-hosted git repository.
nagarwal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 0fb8556 Add ability to provide multi-region (global) data consistency
across HMS in different regions (#2542)
0fb8556 is described below
commit 0fb8556b0d9274aef650a46bb82a8cf495d4450b
Author: s-sanjay <[email protected]>
AuthorDate: Fri Jun 25 08:56:26 2021 +0530
Add ability to provide multi-region (global) data consistency across HMS in
different regions (#2542)
[global-hive-sync-tool] Add a global hive sync tool to sync hudi table
across clusters. Add a way to rollback the replicated time stamp if we fail to
sync or if we partly sync
Co-authored-by: Jagmeet Bali <[email protected]>
---
.../common/bootstrap/index/BootstrapIndex.java | 6 +-
.../hudi/common/table/HoodieTableMetaClient.java | 4 +-
.../apache/hudi/common/util/ReflectionUtils.java | 2 +-
.../org/apache/hudi/hadoop/InputPathHandler.java | 9 +-
.../apache/hudi/hadoop/utils/HoodieHiveUtils.java | 8 +-
.../hudi/hadoop/utils/HoodieInputFormatUtils.java | 1 +
...llyConsistentTimeStampFilteringInputFormat.java | 121 +++++++++
.../hudi/hadoop/TestHoodieParquetInputFormat.java | 4 +-
.../apache/hudi/hadoop/TestInputPathHandler.java | 17 ++
hudi-sync/hudi-hive-sync/pom.xml | 10 +
.../hudi-hive-sync/run_hive_global_commit_tool.sh | 69 +++++
.../java/org/apache/hudi/hive/HiveSyncConfig.java | 1 +
.../java/org/apache/hudi/hive/HiveSyncTool.java | 12 +-
.../org/apache/hudi/hive/HoodieHiveClient.java | 80 +++++-
.../hive/replication/GlobalHiveSyncConfig.java | 54 ++++
.../hudi/hive/replication/GlobalHiveSyncTool.java | 105 ++++++++
.../hive/replication/HiveSyncGlobalCommit.java | 37 +++
.../replication/HiveSyncGlobalCommitConfig.java | 98 +++++++
.../hive/replication/HiveSyncGlobalCommitTool.java | 136 ++++++++++
.../hive/replication/ReplicationStateSync.java | 90 +++++++
.../hudi/hive/TestHiveSyncGlobalCommitTool.java | 128 +++++++++
.../org/apache/hudi/hive/TestHiveSyncTool.java | 286 ++++++++++++++++++++-
.../hudi/hive/testutils/HiveTestService.java | 34 ++-
.../apache/hudi/hive/testutils/HiveTestUtil.java | 59 +++--
.../apache/hudi/hive/testutils/TestCluster.java | 271 +++++++++++++++++++
.../testutils/TestHiveSyncGlobalCommitTool.java | 133 ++++++++++
.../hive-global-commit-config.xml | 27 ++
27 files changed, 1731 insertions(+), 71 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/index/BootstrapIndex.java
b/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/index/BootstrapIndex.java
index 6aafeca..abd3ac5 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/index/BootstrapIndex.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/index/BootstrapIndex.java
@@ -21,12 +21,12 @@ package org.apache.hudi.common.bootstrap.index;
import org.apache.hudi.common.model.BootstrapFileMapping;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.ReflectionUtils;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.util.ReflectionUtils;
/**
* Bootstrap Index Interface.
@@ -161,6 +161,6 @@ public abstract class BootstrapIndex implements
Serializable {
public static BootstrapIndex getBootstrapIndex(HoodieTableMetaClient
metaClient) {
return ((BootstrapIndex)(ReflectionUtils.loadClass(
- metaClient.getTableConfig().getBootstrapIndexClass(), metaClient)));
+ metaClient.getTableConfig().getBootstrapIndexClass(), new
Class[]{HoodieTableMetaClient.class}, metaClient)));
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index cf66f16..3285a00 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -98,8 +98,8 @@ public class HoodieTableMetaClient implements Serializable {
private ConsistencyGuardConfig consistencyGuardConfig =
ConsistencyGuardConfig.newBuilder().build();
private HoodieTableMetaClient(Configuration conf, String basePath, boolean
loadActiveTimelineOnLoad,
- ConsistencyGuardConfig consistencyGuardConfig,
Option<TimelineLayoutVersion> layoutVersion,
- String payloadClassName) {
+ ConsistencyGuardConfig consistencyGuardConfig,
Option<TimelineLayoutVersion> layoutVersion,
+ String payloadClassName) {
LOG.info("Loading HoodieTableMetaClient from " + basePath);
this.consistencyGuardConfig = consistencyGuardConfig;
this.hadoopConf = new SerializableConfiguration(conf);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java
index 57d11d0..2895f46 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java
@@ -87,7 +87,7 @@ public class ReflectionUtils {
try {
return
getClass(clazz).getConstructor(constructorArgTypes).newInstance(constructorArgs);
} catch (InstantiationException | IllegalAccessException |
InvocationTargetException | NoSuchMethodException e) {
- throw new HoodieException("Unable to instantiate class ", e);
+ throw new HoodieException("Unable to instantiate class " + clazz, e);
}
}
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputPathHandler.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputPathHandler.java
index 0a5055a..f7adf38 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputPathHandler.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/InputPathHandler.java
@@ -39,6 +39,14 @@ import static
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getTableMetaCl
* InputPathHandler takes in a set of input paths and incremental tables list.
Then, classifies the
* input paths to incremental, snapshot paths and non-hoodie paths. This is
then accessed later to
* mutate the JobConf before processing incremental mode queries and snapshot
queries.
+ *
+ * Note: We are adding jobConf of a mapreduce or spark job. The properties in
the jobConf are two
+ * type: session properties and table properties from metastore. While session
property is common
+ * for all the tables in a query the table properties are unique per table so
there is no need to
+ * check if it belongs to the table for which the path handler is now
instantiated. The jobConf has
+ * all table properties such as name, last modification time and so on which
are unique to a table.
+ * This class is written in such a way that it can handle multiple tables and
properties unique to
+ * a table but for table level property such check is not required.
*/
public class InputPathHandler {
@@ -63,7 +71,6 @@ public class InputPathHandler {
/**
* Takes in the original InputPaths and classifies each of them into
incremental, snapshot and
* non-hoodie InputPaths. The logic is as follows:
- *
* 1. Check if an inputPath starts with the same basepath as any of the
metadata basepaths we know
* 1a. If yes, this belongs to a Hoodie table that we already know about.
Simply classify this
* as incremental or snapshot - We can get the table name of this
inputPath from the
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieHiveUtils.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieHiveUtils.java
index d9983bd..d5d9d9c 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieHiveUtils.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieHiveUtils.java
@@ -18,13 +18,14 @@
package org.apache.hudi.hadoop.utils;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.exception.HoodieIOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.JobContext;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -73,6 +74,7 @@ public class HoodieHiveUtils {
public static final int MAX_COMMIT_ALL = -1;
public static final int DEFAULT_LEVELS_TO_BASEPATH = 3;
public static final Pattern HOODIE_CONSUME_MODE_PATTERN_STRING =
Pattern.compile("hoodie\\.(.*)\\.consume\\.mode");
+ public static final String GLOBALLY_CONSISTENT_READ_TIMESTAMP =
"last_replication_timestamp";
public static boolean stopAtCompaction(JobContext job, String tableName) {
String compactionPropName =
String.format(HOODIE_STOP_AT_COMPACTION_PATTERN, tableName);
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
index b39ee34..26fbdda 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
@@ -442,6 +442,7 @@ public class HoodieInputFormatUtils {
}
HoodieTimeline timeline =
HoodieHiveUtils.getTableTimeline(metaClient.getTableConfig().getTableName(),
job, metaClient);
+
HoodieTableFileSystemView fsView =
fsViewCache.computeIfAbsent(metaClient, tableMetaClient ->
FileSystemViewManager.createInMemoryFileSystemViewWithTimeline(engineContext,
tableMetaClient, buildMetadataConfig(job), timeline));
List<HoodieBaseFile> filteredBaseFiles = new ArrayList<>();
diff --git
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestGloballyConsistentTimeStampFilteringInputFormat.java
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestGloballyConsistentTimeStampFilteringInputFormat.java
new file mode 100644
index 0000000..50e4a6e
--- /dev/null
+++
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestGloballyConsistentTimeStampFilteringInputFormat.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.hadoop.testutils.InputFormatTestUtil;
+import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestGloballyConsistentTimeStampFilteringInputFormat
+ extends TestHoodieParquetInputFormat {
+
+ @BeforeEach
+ public void setUp() {
+ super.setUp();
+ }
+
+ @Test
+ public void testInputFormatLoad() throws IOException {
+ super.testInputFormatLoad();
+
+ // set filtering timestamp to 0 now the timeline wont have any commits.
+ InputFormatTestUtil.setupSnapshotMaxCommitTimeQueryMode(jobConf, "0");
+
+ Assertions.assertThrows(HoodieIOException.class, () ->
inputFormat.getSplits(jobConf, 10));
+ Assertions.assertThrows(HoodieIOException.class, () ->
inputFormat.listStatus(jobConf));
+ }
+
+ @Test
+ public void testInputFormatUpdates() throws IOException {
+ super.testInputFormatUpdates();
+
+ // set the globally replicated timestamp to 199 so only 100 is read and
update is ignored.
+ InputFormatTestUtil.setupSnapshotMaxCommitTimeQueryMode(jobConf, "100");
+
+ FileStatus[] files = inputFormat.listStatus(jobConf);
+ assertEquals(10, files.length);
+
+ ensureFilesInCommit("5 files have been updated to commit 200. but should
get filtered out ",
+ files,"200", 0);
+ ensureFilesInCommit("We should see 10 files from commit 100 ", files,
"100", 10);
+ }
+
+ @Override
+ public void testIncrementalSimple() throws IOException {
+ // setting filtering timestamp to zero should not in any way alter the
result of the test which
+ // pulls in zero files due to incremental ts being the actual commit time
+ jobConf.set(HoodieHiveUtils.GLOBALLY_CONSISTENT_READ_TIMESTAMP, "0");
+ super.testIncrementalSimple();
+ }
+
+ @Override
+ public void testIncrementalWithMultipleCommits() throws IOException {
+ super.testIncrementalWithMultipleCommits();
+
+ // set globally replicated timestamp to 400 so commits from 500, 600 does
not show up
+ InputFormatTestUtil.setupSnapshotMaxCommitTimeQueryMode(jobConf, "400");
+ InputFormatTestUtil.setupIncremental(jobConf, "100",
HoodieHiveUtils.MAX_COMMIT_ALL);
+
+ FileStatus[] files = inputFormat.listStatus(jobConf);
+
+ assertEquals(
+ 5, files.length,"Pulling ALL commits from 100, should get us the 3
files from 400 commit, 1 file from 300 "
+ + "commit and 1 file from 200 commit");
+ ensureFilesInCommit("Pulling 3 commits from 100, should get us the 3 files
from 400 commit",
+ files, "400", 3);
+ ensureFilesInCommit("Pulling 3 commits from 100, should get us the 1 files
from 300 commit",
+ files, "300", 1);
+ ensureFilesInCommit("Pulling 3 commits from 100, should get us the 1 files
from 200 commit",
+ files, "200", 1);
+
+ List<String> commits = Arrays.asList("100", "200", "300", "400", "500",
"600");
+ for (int idx = 0; idx < commits.size(); ++idx) {
+ for (int jdx = 0; jdx < commits.size(); ++jdx) {
+ InputFormatTestUtil.setupIncremental(jobConf, commits.get(idx),
HoodieHiveUtils.MAX_COMMIT_ALL);
+ InputFormatTestUtil.setupSnapshotMaxCommitTimeQueryMode(jobConf,
commits.get(jdx));
+
+ files = inputFormat.listStatus(jobConf);
+
+ if (jdx <= idx) {
+ assertEquals(0, files.length,"all commits should be filtered");
+ } else {
+ // only commits upto the timestamp is allowed
+ for (FileStatus file : files) {
+ String commitTs = FSUtils.getCommitTime(file.getPath().getName());
+ assertTrue(commits.indexOf(commitTs) <= jdx);
+ assertTrue(commits.indexOf(commitTs) > idx);
+ }
+ }
+ }
+ }
+ }
+}
diff --git
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java
index c4fed98..c45c614 100644
---
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java
+++
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java
@@ -65,8 +65,8 @@ import static org.junit.jupiter.api.Assertions.fail;
public class TestHoodieParquetInputFormat {
- private HoodieParquetInputFormat inputFormat;
- private JobConf jobConf;
+ protected HoodieParquetInputFormat inputFormat;
+ protected JobConf jobConf;
private final HoodieFileFormat baseFileFormat = HoodieFileFormat.PARQUET;
private final String baseFileExtension = baseFileFormat.getFileExtension();
diff --git
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestInputPathHandler.java
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestInputPathHandler.java
index 3a8b197..0287318 100644
---
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestInputPathHandler.java
+++
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestInputPathHandler.java
@@ -23,10 +23,12 @@ import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.testutils.minicluster.HdfsTestService;
+import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.jupiter.api.AfterAll;
@@ -169,6 +171,21 @@ public class TestInputPathHandler {
assertTrue(actualComparesToExpected(actualPaths, nonHoodiePaths));
}
+ @Test
+ public void testInputPathHandlerWithGloballyReplicatedTimeStamp() throws
IOException {
+ JobConf jobConf = new JobConf();
+ jobConf.set(HoodieHiveUtils.GLOBALLY_CONSISTENT_READ_TIMESTAMP, "1");
+ inputPathHandler = new InputPathHandler(dfs.getConf(), inputPaths.toArray(
+ new Path[inputPaths.size()]), incrementalTables);
+ List<Path> actualPaths =
inputPathHandler.getGroupedIncrementalPaths().values().stream()
+ .flatMap(List::stream).collect(Collectors.toList());
+ assertTrue(actualComparesToExpected(actualPaths, incrementalPaths));
+ actualPaths = inputPathHandler.getSnapshotPaths();
+ assertTrue(actualComparesToExpected(actualPaths, snapshotPaths));
+ actualPaths = inputPathHandler.getNonHoodieInputPaths();
+ assertTrue(actualComparesToExpected(actualPaths, nonHoodiePaths));
+ }
+
private boolean actualComparesToExpected(List<Path> actualPaths, List<Path>
expectedPaths) {
if (actualPaths.size() != expectedPaths.size()) {
return false;
diff --git a/hudi-sync/hudi-hive-sync/pom.xml b/hudi-sync/hudi-hive-sync/pom.xml
index c44f785..fd63028 100644
--- a/hudi-sync/hudi-hive-sync/pom.xml
+++ b/hudi-sync/hudi-hive-sync/pom.xml
@@ -30,6 +30,8 @@
<properties>
<main.basedir>${project.parent.basedir}</main.basedir>
+
+ <jetty.version>7.6.0.v20120127</jetty.version>
</properties>
<dependencies>
@@ -148,6 +150,14 @@
<scope>test</scope>
</dependency>
+ <!-- Needed for running HiveServer for Tests -->
+ <dependency>
+ <groupId>org.eclipse.jetty.aggregate</groupId>
+ <artifactId>jetty-all</artifactId>
+ <scope>test</scope>
+ <version>${jetty.version}</version>
+ </dependency>
+
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
diff --git a/hudi-sync/hudi-hive-sync/run_hive_global_commit_tool.sh
b/hudi-sync/hudi-hive-sync/run_hive_global_commit_tool.sh
new file mode 100755
index 0000000..b7e6cd2
--- /dev/null
+++ b/hudi-sync/hudi-hive-sync/run_hive_global_commit_tool.sh
@@ -0,0 +1,69 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# A tool to sync the hudi table to hive from different clusters. Similar to
HiveSyncTool but syncs it to more
+# than one hive cluster ( currently a local and remote cluster). The common
timestamp that was synced is stored as a new table property
+# This is most useful when we want to ensure that across different hive
clusters we want ensure consistent reads. If that is not a requirement
+# then it is better to run HiveSyncTool separately.
+# Note:
+# The tool tries to be transactional but does not guarantee it. If the sync
fails midway in one cluster it will try to roll back the committed
+# timestamp from already successful sync on other clusters but that can also
fail.
+# The tool does not roll back any synced partitions but only the timestamp.
+
+function error_exit {
+ echo "$1" >&2 ## Send message to stderr. Exclude >&2 if you don't want
it that way.
+ exit "${2:-1}" ## Return a code specified by $2 or 1 by default.
+}
+
+if [ -z "${HADOOP_HOME}" ]; then
+ error_exit "Please make sure the environment variable HADOOP_HOME is setup"
+fi
+
+if [ -z "${HIVE_HOME}" ]; then
+ error_exit "Please make sure the environment variable HIVE_HOME is setup"
+fi
+
+DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
+#Ensure we pick the right jar even for hive11 builds
+HUDI_HIVE_UBER_JAR=`ls -c
$DIR/../packaging/hudi-hive-bundle/target/hudi-hive-*.jar | grep -v source |
head -1`
+
+if [ -z "$HADOOP_CONF_DIR" ]; then
+ echo "setting hadoop conf dir"
+ HADOOP_CONF_DIR="${HADOOP_HOME}/etc/hadoop"
+fi
+
+## Include only specific packages from HIVE_HOME/lib to avoid version
mismatches
+HIVE_EXEC=`ls ${HIVE_HOME}/lib/hive-exec-*.jar | tr '\n' ':'`
+HIVE_SERVICE=`ls ${HIVE_HOME}/lib/hive-service-*.jar | grep -v rpc | tr '\n'
':'`
+HIVE_METASTORE=`ls ${HIVE_HOME}/lib/hive-metastore-*.jar | tr '\n' ':'`
+HIVE_JDBC=`ls ${HIVE_HOME}/lib/hive-jdbc-*.jar | tr '\n' ':'`
+if [ -z "${HIVE_JDBC}" ]; then
+ HIVE_JDBC=`ls ${HIVE_HOME}/lib/hive-jdbc-*.jar | grep -v handler | tr '\n'
':'`
+fi
+HIVE_JACKSON=`ls ${HIVE_HOME}/lib/jackson-*.jar | tr '\n' ':'`
+HIVE_NUCLEUS=`ls ${HIVE_HOME}/lib/datanucleus*.jar | tr '\n' ':'`
+HIVE_JARS=$HIVE_METASTORE:$HIVE_SERVICE:$HIVE_EXEC:$HIVE_JDBC:$HIVE_JACKSON:$HIVE_NUCLEUS
+
+HADOOP_HIVE_JARS=${HIVE_JARS}:${HADOOP_HOME}/share/hadoop/common/*:${HADOOP_HOME}/share/hadoop/mapreduce/*:${HADOOP_HOME}/share/hadoop/hdfs/*:${HADOOP_HOME}/share/hadoop/common/lib/*:${HADOOP_HOME}/share/hadoop/hdfs/lib/*
+
+if ! [ -z "$HIVE_CONF_DIR" ]; then
+ error_exit "Don't set HIVE_CONF_DIR; use config xml file"
+fi
+
+echo "Running Command : java -cp
$HUDI_HIVE_UBER_JAR:${HADOOP_HIVE_JARS}:${HADOOP_CONF_DIR}:${HIVE_HOME}lib/*
org.apache.hudi.hive.replication.HiveSyncGlobalCommitTool $@"
+java -cp
$HUDI_HIVE_UBER_JAR:${HADOOP_HIVE_JARS}:${HADOOP_CONF_DIR}:${HIVE_HOME}lib/*
org.apache.hudi.hive.replication.HiveSyncGlobalCommitTool "$@"
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
index e4e7962..41c419d 100644
---
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
+++
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
@@ -104,6 +104,7 @@ public class HiveSyncConfig implements Serializable {
@Parameter(names = {"--decode-partition"}, description = "Decode the
partition value if the partition has encoded during writing")
public Boolean decodePartition = false;
+ // enhance the similar function in child class
public static HiveSyncConfig copy(HiveSyncConfig cfg) {
HiveSyncConfig newConfig = new HiveSyncConfig();
newConfig.basePath = cfg.basePath;
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
index 0dbe97f..7264c8d 100644
---
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
+++
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
@@ -58,10 +58,10 @@ public class HiveSyncTool extends AbstractSyncTool {
public static final String SUFFIX_SNAPSHOT_TABLE = "_rt";
public static final String SUFFIX_READ_OPTIMIZED_TABLE = "_ro";
- private final HiveSyncConfig cfg;
- private HoodieHiveClient hoodieHiveClient = null;
- private String snapshotTableName = null;
- private Option<String> roTableName = null;
+ protected final HiveSyncConfig cfg;
+ protected HoodieHiveClient hoodieHiveClient = null;
+ protected String snapshotTableName = null;
+ protected Option<String> roTableName = null;
public HiveSyncTool(HiveSyncConfig cfg, HiveConf configuration, FileSystem
fs) {
super(configuration.getAllProperties(), fs);
@@ -127,8 +127,8 @@ public class HiveSyncTool extends AbstractSyncTool {
}
}
}
-
- private void syncHoodieTable(String tableName, boolean
useRealtimeInputFormat,
+
+ protected void syncHoodieTable(String tableName, boolean
useRealtimeInputFormat,
boolean readAsOptimized) {
LOG.info("Trying to sync hoodie table " + tableName + " with base path " +
hoodieHiveClient.getBasePath()
+ " of type " + hoodieHiveClient.getTableType());
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
index 3ae94dd..9d02145 100644
---
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
+++
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
@@ -18,12 +18,6 @@
package org.apache.hudi.hive;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.ql.Driver;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.StorageSchemes;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -32,15 +26,22 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.PartitionPathEncodeUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.hive.util.HiveSchemaUtil;
+import org.apache.hudi.sync.common.AbstractSyncHoodieClient;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hudi.sync.common.AbstractSyncHoodieClient;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.schema.MessageType;
@@ -60,6 +61,8 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import static
org.apache.hudi.hadoop.utils.HoodieHiveUtils.GLOBALLY_CONSISTENT_READ_TIMESTAMP;
+
public class HoodieHiveClient extends AbstractSyncHoodieClient {
private static final String HOODIE_LAST_COMMIT_TIME_SYNC =
"last_commit_time_sync";
@@ -402,7 +405,19 @@ public class HoodieHiveClient extends
AbstractSyncHoodieClient {
closeQuietly(null, stmt);
}
} else {
- updateHiveSQLUsingHiveDriver(s);
+ CommandProcessorResponse response = updateHiveSQLUsingHiveDriver(s);
+ if (response == null) {
+ throw new HoodieHiveSyncException("Failed in executing SQL null
response" + s);
+ }
+ if (response.getResponseCode() != 0) {
+ LOG.error(String.format("Failure in SQL response %s",
response.toString()));
+ if (response.getException() != null) {
+ throw new HoodieHiveSyncException(
+ String.format("Failed in executing SQL %s", s),
response.getException());
+ } else {
+ throw new HoodieHiveSyncException(String.format("Failed in executing
SQL %s", s));
+ }
+ }
}
}
@@ -476,13 +491,58 @@ public class HoodieHiveClient extends
AbstractSyncHoodieClient {
}
}
+ public Option<String> getLastReplicatedTime(String tableName) {
+ // Get the last replicated time from the TBLproperties
+ try {
+ Table database = client.getTable(syncConfig.databaseName, tableName);
+ return
Option.ofNullable(database.getParameters().getOrDefault(GLOBALLY_CONSISTENT_READ_TIMESTAMP,
null));
+ } catch (NoSuchObjectException e) {
+ LOG.warn("the said table not found in hms " + syncConfig.databaseName +
"." + tableName);
+ return Option.empty();
+ } catch (Exception e) {
+ throw new HoodieHiveSyncException("Failed to get the last replicated
time from the database", e);
+ }
+ }
+
+ public void updateLastReplicatedTimeStamp(String tableName, String
timeStamp) {
+ if (!activeTimeline.filterCompletedInstants().getInstants()
+ .anyMatch(i -> i.getTimestamp().equals(timeStamp))) {
+ throw new HoodieHiveSyncException(
+ "Not a valid completed timestamp " + timeStamp + " for table " +
tableName);
+ }
+ try {
+ Table table = client.getTable(syncConfig.databaseName, tableName);
+ table.putToParameters(GLOBALLY_CONSISTENT_READ_TIMESTAMP, timeStamp);
+ client.alter_table(syncConfig.databaseName, tableName, table);
+ } catch (Exception e) {
+ throw new HoodieHiveSyncException(
+ "Failed to update last replicated time to " + timeStamp + " for " +
tableName, e);
+ }
+ }
+
+ public void deleteLastReplicatedTimeStamp(String tableName) {
+ try {
+ Table table = client.getTable(syncConfig.databaseName, tableName);
+ String timestamp =
table.getParameters().remove(GLOBALLY_CONSISTENT_READ_TIMESTAMP);
+ client.alter_table(syncConfig.databaseName, tableName, table);
+ if (timestamp != null) {
+ LOG.info("deleted last replicated timestamp " + timestamp + " for
table " + tableName);
+ }
+ } catch (NoSuchObjectException e) {
+ // this is ok the table doesn't even exist.
+ } catch (Exception e) {
+ throw new HoodieHiveSyncException(
+ "Failed to delete last replicated timestamp for " + tableName, e);
+ }
+ }
+
public void close() {
try {
if (connection != null) {
connection.close();
}
if (client != null) {
- Hive.closeCurrent();
+ client.close();
client = null;
}
} catch (SQLException e) {
@@ -506,4 +566,4 @@ public class HoodieHiveClient extends
AbstractSyncHoodieClient {
throw new HoodieHiveSyncException("Failed to get update last commit time
synced to " + lastCommitSynced, e);
}
}
-}
\ No newline at end of file
+}
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/GlobalHiveSyncConfig.java
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/GlobalHiveSyncConfig.java
new file mode 100644
index 0000000..19074c8
--- /dev/null
+++
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/GlobalHiveSyncConfig.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hive.replication;
+
+import com.beust.jcommander.Parameter;
+import org.apache.hudi.hive.HiveSyncConfig;
+
+public class GlobalHiveSyncConfig extends HiveSyncConfig {
+ @Parameter(names = {"--replicated-timestamp"}, description = "Add globally
replicated timestamp to enable consistent reads across clusters")
+ public String globallyReplicatedTimeStamp;
+
+ public static GlobalHiveSyncConfig copy(GlobalHiveSyncConfig cfg) {
+ GlobalHiveSyncConfig newConfig = new GlobalHiveSyncConfig();
+ newConfig.basePath = cfg.basePath;
+ newConfig.assumeDatePartitioning = cfg.assumeDatePartitioning;
+ newConfig.databaseName = cfg.databaseName;
+ newConfig.hivePass = cfg.hivePass;
+ newConfig.hiveUser = cfg.hiveUser;
+ newConfig.partitionFields = cfg.partitionFields;
+ newConfig.partitionValueExtractorClass = cfg.partitionValueExtractorClass;
+ newConfig.jdbcUrl = cfg.jdbcUrl;
+ newConfig.tableName = cfg.tableName;
+ newConfig.usePreApacheInputFormat = cfg.usePreApacheInputFormat;
+ newConfig.useFileListingFromMetadata = cfg.useFileListingFromMetadata;
+ newConfig.verifyMetadataFileListing = cfg.verifyMetadataFileListing;
+ newConfig.supportTimestamp = cfg.supportTimestamp;
+ newConfig.decodePartition = cfg.decodePartition;
+ newConfig.globallyReplicatedTimeStamp = cfg.globallyReplicatedTimeStamp;
+ return newConfig;
+ }
+
+ @Override
+ public String toString() {
+ return "GlobalHiveSyncConfig{" + super.toString()
+ + " globallyReplicatedTimeStamp=" + globallyReplicatedTimeStamp + "}";
+ }
+
+}
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/GlobalHiveSyncTool.java
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/GlobalHiveSyncTool.java
new file mode 100644
index 0000000..19c23b7
--- /dev/null
+++
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/GlobalHiveSyncTool.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hive.replication;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.InvalidTableException;
+import org.apache.hudi.hive.HiveSyncTool;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class GlobalHiveSyncTool extends HiveSyncTool {
+
+ private static final Logger LOG = LogManager.getLogger(HiveSyncTool.class);
+
+ public GlobalHiveSyncTool(GlobalHiveSyncConfig cfg, HiveConf configuration,
FileSystem fs) {
+ super(cfg, configuration, fs);
+ }
+
+ @Override
+ public void syncHoodieTable() {
+ switch (hoodieHiveClient.getTableType()) {
+ case COPY_ON_WRITE:
+ syncHoodieTable(snapshotTableName, false, false);
+ break;
+ case MERGE_ON_READ:
+ // sync a RO table for MOR
+ syncHoodieTable(roTableName.get(), false, true);
+ // sync a RT table for MOR
+ syncHoodieTable(snapshotTableName, true, false);
+ break;
+ default:
+ LOG.error("Unknown table type " + hoodieHiveClient.getTableType());
+ throw new InvalidTableException(hoodieHiveClient.getBasePath());
+ }
+ }
+
+ @Override
+ protected void syncHoodieTable(String tableName, boolean
useRealtimeInputFormat, boolean readAsOptimized) {
+ super.syncHoodieTable(tableName, useRealtimeInputFormat, readAsOptimized);
+ if (((GlobalHiveSyncConfig)cfg).globallyReplicatedTimeStamp != null) {
+ hoodieHiveClient.updateLastReplicatedTimeStamp(tableName,
+ ((GlobalHiveSyncConfig) cfg).globallyReplicatedTimeStamp);
+ }
+ LOG.info("Sync complete for " + tableName);
+ }
+
+ public void close() {
+ hoodieHiveClient.close();
+ }
+
+ public Map<String, Option<String>> getLastReplicatedTimeStampMap() {
+ Map<String, Option<String>> timeStampMap = new HashMap<>();
+ Option<String> timeStamp =
hoodieHiveClient.getLastReplicatedTime(snapshotTableName);
+ timeStampMap.put(snapshotTableName, timeStamp);
+ if (HoodieTableType.MERGE_ON_READ.equals(hoodieHiveClient.getTableType()))
{
+ Option<String> roTimeStamp =
hoodieHiveClient.getLastReplicatedTime(roTableName.get());
+ timeStampMap.put(roTableName.get(), roTimeStamp);
+ }
+ return timeStampMap;
+ }
+
+ public void setLastReplicatedTimeStamp(Map<String, Option<String>>
timeStampMap) {
+ for (String tableName : timeStampMap.keySet()) {
+ Option<String> timestamp = timeStampMap.get(tableName);
+ if (timestamp.isPresent()) {
+ hoodieHiveClient.updateLastReplicatedTimeStamp(tableName,
timestamp.get());
+ LOG.info("updated timestamp for " + tableName + " to: " +
timestamp.get());
+ } else {
+ hoodieHiveClient.deleteLastReplicatedTimeStamp(tableName);
+ LOG.info("deleted timestamp for " + tableName);
+ }
+ }
+ }
+
+ public static GlobalHiveSyncTool
buildGlobalHiveSyncTool(GlobalHiveSyncConfig cfg, HiveConf hiveConf) {
+ FileSystem fs = FSUtils.getFs(cfg.basePath, new Configuration());
+ hiveConf.addResource(fs.getConf());
+ return new GlobalHiveSyncTool(cfg, hiveConf, fs);
+ }
+}
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/HiveSyncGlobalCommit.java
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/HiveSyncGlobalCommit.java
new file mode 100644
index 0000000..ad8d03d
--- /dev/null
+++
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/HiveSyncGlobalCommit.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hive.replication;
+
+/**
+ * A interface to allow syncing the Hudi table to all clusters.
+ */
+public interface HiveSyncGlobalCommit {
+
+ /**
+ *
+ * @return whether the commit succeeded to all the clusters.
+ */
+ boolean commit();
+
+ /**
+ *
+ * @return boolean whether the rollback succeeded to all the clusters.
+ */
+ boolean rollback();
+}
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/HiveSyncGlobalCommitConfig.java
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/HiveSyncGlobalCommitConfig.java
new file mode 100644
index 0000000..bce84e9
--- /dev/null
+++
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/HiveSyncGlobalCommitConfig.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hive.replication;
+
+import com.beust.jcommander.Parameter;
+
+import com.beust.jcommander.Parameters;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Properties;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+// TODO: stop extending HiveSyncConfig and take all the variables needed from
config file
+@Parameters(commandDescription = "A tool to sync the hudi table to hive from
different clusters. Similar to HiveSyncTool but syncs it to more"
+ + "than one hive cluster ( currently a local and remote cluster). The
common timestamp that was synced is stored as a new table property "
+ + "This is most useful when we want to ensure that across different hive
clusters we want ensure consistent reads. If that is not a requirement"
+ + "then it is better to run HiveSyncTool separately."
+ + "Note: "
+ + " The tool tries to be transactional but does not guarantee it. If the
sync fails midway in one cluster it will try to roll back the committed "
+ + " timestamp from already successful sync on other clusters but that can
also fail."
+ + " The tool does not roll back any synced partitions but only the
timestamp.")
+public class HiveSyncGlobalCommitConfig extends GlobalHiveSyncConfig {
+
+ private static final Logger LOG =
LogManager.getLogger(HiveSyncGlobalCommitConfig.class);
+
+ public static String LOCAL_HIVE_SITE_URI =
"hivesyncglobal.local_hive_site_uri";
+ public static String REMOTE_HIVE_SITE_URI =
"hivesyncglobal.remote_hive_site_uri";
+ public static String CONFIG_FILE_URI = "hivesyncglobal.config_file_uri";
+ public static String REMOTE_BASE_PATH = "hivesyncglobal.remote_base_path";
+ public static String LOCAL_BASE_PATH = "hivesyncglobal.local_base_path";
+ public static String RETRY_ATTEMPTS = "hivesyncglobal.retry_attempts";
+ public static String REMOTE_HIVE_SERVER_JDBC_URLS =
"hivesyncglobal.remote_hs2_jdbc_urls";
+ public static String LOCAL_HIVE_SERVER_JDBC_URLS =
"hivesyncglobal.local_hs2_jdbc_urls";
+
+ @Parameter(names = {
+ "--config-xml-file"}, description = "path to the config file in Hive",
required = true)
+ public String configFile;
+
+ public Properties properties = new Properties();
+
+ private boolean finalize = false;
+
+ public void load() throws IOException {
+ if (finalize) {
+ throw new RuntimeException("trying to modify finalized config");
+ }
+ finalize = true;
+ try (InputStream configStream = new FileInputStream(new File(configFile)))
{
+ properties.loadFromXML(configStream);
+ }
+ if (StringUtils.isNullOrEmpty(globallyReplicatedTimeStamp)) {
+ throw new RuntimeException("globally replicated timestamp not set");
+ }
+ }
+
+ GlobalHiveSyncConfig mkGlobalHiveSyncConfig(boolean forRemote) {
+ GlobalHiveSyncConfig cfg = GlobalHiveSyncConfig.copy(this);
+ cfg.basePath = forRemote ? properties.getProperty(REMOTE_BASE_PATH)
+ : properties.getProperty(LOCAL_BASE_PATH, cfg.basePath);
+ cfg.jdbcUrl = forRemote ?
properties.getProperty(REMOTE_HIVE_SERVER_JDBC_URLS)
+ : properties.getProperty(LOCAL_HIVE_SERVER_JDBC_URLS, cfg.jdbcUrl);
+ LOG.info("building hivesync config forRemote: " + forRemote + " " +
cfg.jdbcUrl + " "
+ + cfg.basePath);
+ return cfg;
+ }
+
+ @Override
+ public String toString() {
+ return "HiveSyncGlobalCommitConfig{ " + "configFile=" + configFile + ",
properties="
+ + properties + ", " + super.toString()
+ + " }";
+ }
+
+ public void storeToXML(OutputStream configStream) throws IOException {
+ this.properties.storeToXML(configStream, "hivesync global config");
+ }
+}
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/HiveSyncGlobalCommitTool.java
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/HiveSyncGlobalCommitTool.java
new file mode 100644
index 0000000..a194eeb
--- /dev/null
+++
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/HiveSyncGlobalCommitTool.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hive.replication;
+
+import static
org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.LOCAL_HIVE_SITE_URI;
+import static
org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.REMOTE_HIVE_SITE_URI;
+
+import com.beust.jcommander.JCommander;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+
+import org.apache.hudi.hive.HoodieHiveSyncException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+public class HiveSyncGlobalCommitTool implements HiveSyncGlobalCommit,
AutoCloseable {
+
+ private static final Logger LOG =
LogManager.getLogger(HiveSyncGlobalCommitTool.class);
+ private final HiveSyncGlobalCommitConfig config;
+ private List<ReplicationStateSync> replicationStateSyncList;
+
+ private ReplicationStateSync getReplicatedState(boolean forRemote) {
+ HiveConf hiveConf = new HiveConf();
+ // we probably just need to set the metastore URIs
+ // TODO: figure out how to integrate this in production
+ // how to load balance between piper HMS,HS2
+ // if we have list of uris, we can do something similar to createHiveConf
in reairsync
+ hiveConf.addResource(new Path(config.properties.getProperty(
+ forRemote ? REMOTE_HIVE_SITE_URI : LOCAL_HIVE_SITE_URI)));
+ // TODO: get clusterId as input parameters
+ ReplicationStateSync state = new
ReplicationStateSync(config.mkGlobalHiveSyncConfig(forRemote),
+ hiveConf, forRemote ? "REMOTESYNC" : "LOCALSYNC");
+ return state;
+ }
+
+ @Override
+ public boolean commit() {
+ // TODO: add retry attempts
+ String name = Thread.currentThread().getName();
+ try {
+ for (ReplicationStateSync stateSync : replicationStateSyncList) {
+ Thread.currentThread().setName(stateSync.getClusterId());
+ LOG.info("starting sync for state " + stateSync);
+ stateSync.sync();
+ LOG.info("synced state " + stateSync);
+ }
+ } catch (Exception e) {
+ Thread.currentThread().setName(name);
+ LOG.error(String.format("Error while trying to commit replication state
%s", e.getMessage()), e);
+ return false;
+ } finally {
+ Thread.currentThread().setName(name);
+ }
+
+ LOG.info("done syncing to all tables, verifying the timestamps...");
+ ReplicationStateSync base = replicationStateSyncList.get(0);
+ boolean success = true;
+ LOG.info("expecting all timestamps to be similar to: " + base);
+ for (int idx = 1; idx < replicationStateSyncList.size(); ++idx) {
+ ReplicationStateSync other = replicationStateSyncList.get(idx);
+ if (!base.replicationStateIsInSync(other)) {
+ LOG.error("the timestamp of other : " + other + " is not matching with
base: " + base);
+ success = false;
+ }
+ }
+ return success;
+ }
+
+ @Override
+ public boolean rollback() {
+ for (ReplicationStateSync stateSync : replicationStateSyncList) {
+ stateSync.rollback();
+ }
+ return true;
+ }
+
+ public HiveSyncGlobalCommitTool(HiveSyncGlobalCommitConfig config) {
+ this.config = config;
+ this.replicationStateSyncList = new ArrayList<>(2);
+ this.replicationStateSyncList.add(getReplicatedState(false));
+ this.replicationStateSyncList.add(getReplicatedState(true));
+ }
+
+ private static HiveSyncGlobalCommitConfig
getHiveSyncGlobalCommitConfig(String[] args)
+ throws IOException {
+ HiveSyncGlobalCommitConfig cfg = new HiveSyncGlobalCommitConfig();
+ JCommander cmd = new JCommander(cfg, null, args);
+ if (cfg.help || args.length == 0) {
+ cmd.usage();
+ System.exit(1);
+ }
+ cfg.load();
+ return cfg;
+ }
+
+ @Override
+ public void close() {
+ for (ReplicationStateSync stateSync : replicationStateSyncList) {
+ stateSync.close();
+ }
+ }
+
+ public static void main(String[] args) throws IOException,
HoodieHiveSyncException {
+ final HiveSyncGlobalCommitConfig cfg = getHiveSyncGlobalCommitConfig(args);
+ try (final HiveSyncGlobalCommitTool globalCommitTool = new
HiveSyncGlobalCommitTool(cfg)) {
+ boolean success = globalCommitTool.commit();
+ if (!success) {
+ if (!globalCommitTool.rollback()) {
+ throw new RuntimeException("not able to rollback failed commit");
+ }
+ }
+ } catch (Exception e) {
+ throw new HoodieHiveSyncException(
+ "not able to commit replicated timestamp", e);
+ }
+ }
+}
diff --git
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/ReplicationStateSync.java
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/ReplicationStateSync.java
new file mode 100644
index 0000000..bf806fe
--- /dev/null
+++
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/replication/ReplicationStateSync.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hive.replication;
+
+import java.util.Map;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hudi.common.util.Option;
+
+public class ReplicationStateSync {
+
+ private GlobalHiveSyncTool globalHiveSyncTool;
+ private final GlobalHiveSyncConfig globalHiveSyncConfig;
+ private final HiveConf hiveConf;
+ private Map<String, Option<String>> replicatedTimeStampMap;
+ private Map<String, Option<String>> oldReplicatedTimeStampMap;
+ private final String clusterId;
+
+ ReplicationStateSync(GlobalHiveSyncConfig conf, HiveConf hiveConf, String
uid) {
+ this.globalHiveSyncConfig = conf;
+ this.hiveConf = hiveConf;
+ initGlobalHiveSyncTool();
+ replicatedTimeStampMap =
globalHiveSyncTool.getLastReplicatedTimeStampMap();
+ clusterId = uid;
+ }
+
+ private void initGlobalHiveSyncTool() {
+ globalHiveSyncTool =
GlobalHiveSyncTool.buildGlobalHiveSyncTool(globalHiveSyncConfig, hiveConf);
+ }
+
+ public void sync() throws Exception {
+ // the cluster maybe down by the time we reach here so we refresh our
replication
+ // state right before we set the oldReplicatedTimeStamp to narrow this
window. this is a
+ // liveliness check right before we start.
+ replicatedTimeStampMap =
globalHiveSyncTool.getLastReplicatedTimeStampMap();
+ // it is possible sync fails midway and corrupts the table property
therefore we should set
+ // the oldReplicatedTimeStampMap before the sync start so that we attempt
to rollback
+ // this will help in scenario where sync failed due to some bug in
hivesync but in case where
+ // cluster went down halfway through or before sync in this case rollback
may also fail and
+ // that is ok and we want to be alerted to such scenarios.
+ oldReplicatedTimeStampMap = replicatedTimeStampMap;
+ globalHiveSyncTool.syncHoodieTable();
+ replicatedTimeStampMap =
globalHiveSyncTool.getLastReplicatedTimeStampMap();
+ }
+
+ public boolean rollback() {
+ if (oldReplicatedTimeStampMap != null) {
+ globalHiveSyncTool.setLastReplicatedTimeStamp(oldReplicatedTimeStampMap);
+ oldReplicatedTimeStampMap = null;
+ }
+ return true;
+ }
+
+ public boolean replicationStateIsInSync(ReplicationStateSync other) {
+ return globalHiveSyncTool.getLastReplicatedTimeStampMap()
+ .equals(other.globalHiveSyncTool.getLastReplicatedTimeStampMap());
+ }
+
+ @Override
+ public String toString() {
+ return "{ clusterId: " + clusterId + " replicatedState: " +
replicatedTimeStampMap + " }";
+ }
+
+ public String getClusterId() {
+ return clusterId;
+ }
+
+ public void close() {
+ if (globalHiveSyncTool != null) {
+ globalHiveSyncTool.close();
+ globalHiveSyncTool = null;
+ }
+ }
+
+}
diff --git
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncGlobalCommitTool.java
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncGlobalCommitTool.java
new file mode 100644
index 0000000..9372433
--- /dev/null
+++
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncGlobalCommitTool.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hive;
+
+import static
org.apache.hudi.hadoop.utils.HoodieHiveUtils.GLOBALLY_CONSISTENT_READ_TIMESTAMP;
+import static
org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.LOCAL_BASE_PATH;
+import static
org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.LOCAL_HIVE_SERVER_JDBC_URLS;
+import static
org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.LOCAL_HIVE_SITE_URI;
+import static
org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.REMOTE_BASE_PATH;
+import static
org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.REMOTE_HIVE_SERVER_JDBC_URLS;
+import static
org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.REMOTE_HIVE_SITE_URI;
+
+import java.util.Collections;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig;
+import org.apache.hudi.hive.replication.HiveSyncGlobalCommitTool;
+import org.apache.hudi.hive.testutils.TestCluster;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class TestHiveSyncGlobalCommitTool {
+
+ @RegisterExtension
+ public static TestCluster localCluster = new TestCluster();
+ @RegisterExtension
+ public static TestCluster remoteCluster = new TestCluster();
+
+ private static String DB_NAME = "foo";
+ private static String TBL_NAME = "bar";
+
+ private HiveSyncGlobalCommitConfig getGlobalCommitConfig(
+ String commitTime, String dbName, String tblName) throws Exception {
+ HiveSyncGlobalCommitConfig config = new HiveSyncGlobalCommitConfig();
+ config.properties.setProperty(LOCAL_HIVE_SITE_URI,
localCluster.getHiveSiteXmlLocation());
+ config.properties.setProperty(REMOTE_HIVE_SITE_URI,
remoteCluster.getHiveSiteXmlLocation());
+ config.properties.setProperty(LOCAL_HIVE_SERVER_JDBC_URLS,
localCluster.getHiveJdBcUrl());
+ config.properties.setProperty(REMOTE_HIVE_SERVER_JDBC_URLS,
remoteCluster.getHiveJdBcUrl());
+ config.properties.setProperty(LOCAL_BASE_PATH,
localCluster.tablePath(dbName, tblName));
+ config.properties.setProperty(REMOTE_BASE_PATH,
remoteCluster.tablePath(dbName, tblName));
+ config.globallyReplicatedTimeStamp = commitTime;
+ config.hiveUser = System.getProperty("user.name");
+ config.hivePass = "";
+ config.databaseName = dbName;
+ config.tableName = tblName;
+ config.basePath = localCluster.tablePath(dbName, tblName);
+ config.assumeDatePartitioning = true;
+ config.usePreApacheInputFormat = false;
+ config.partitionFields = Collections.singletonList("datestr");
+ return config;
+ }
+
+ private void compareEqualLastReplicatedTimeStamp(HiveSyncGlobalCommitConfig
config) throws Exception {
+ Assertions.assertEquals(localCluster.getHMSClient()
+ .getTable(config.databaseName, config.tableName).getParameters()
+ .get(GLOBALLY_CONSISTENT_READ_TIMESTAMP), remoteCluster.getHMSClient()
+ .getTable(config.databaseName, config.tableName).getParameters()
+ .get(GLOBALLY_CONSISTENT_READ_TIMESTAMP), "compare replicated
timestamps");
+ }
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ localCluster.forceCreateDb(DB_NAME);
+ remoteCluster.forceCreateDb(DB_NAME);
+ localCluster.dfsCluster.getFileSystem().delete(new
Path(localCluster.tablePath(DB_NAME, TBL_NAME)), true);
+ remoteCluster.dfsCluster.getFileSystem().delete(new
Path(remoteCluster.tablePath(DB_NAME, TBL_NAME)), true);
+ }
+
+ @AfterEach
+ public void clear() throws Exception {
+ localCluster.getHMSClient().dropTable(DB_NAME, TBL_NAME);
+ remoteCluster.getHMSClient().dropTable(DB_NAME, TBL_NAME);
+ }
+
+ @Test
+ public void testBasicGlobalCommit() throws Exception {
+ String commitTime = "100";
+ localCluster.createCOWTable(commitTime, 5, DB_NAME, TBL_NAME);
+ // simulate drs
+ remoteCluster.createCOWTable(commitTime, 5, DB_NAME, TBL_NAME);
+ HiveSyncGlobalCommitConfig config = getGlobalCommitConfig(commitTime,
DB_NAME, TBL_NAME);
+ HiveSyncGlobalCommitTool tool = new HiveSyncGlobalCommitTool(config);
+ Assertions.assertTrue(tool.commit());
+ compareEqualLastReplicatedTimeStamp(config);
+ }
+
+ @Test
+ public void testBasicRollback() throws Exception {
+ String commitTime = "100";
+ localCluster.createCOWTable(commitTime, 5, DB_NAME, TBL_NAME);
+ // simulate drs
+ remoteCluster.createCOWTable(commitTime, 5, DB_NAME, TBL_NAME);
+ HiveSyncGlobalCommitConfig config = getGlobalCommitConfig(commitTime,
DB_NAME, TBL_NAME);
+ HiveSyncGlobalCommitTool tool = new HiveSyncGlobalCommitTool(config);
+ Assertions.assertFalse(localCluster.getHMSClient().tableExists(DB_NAME,
TBL_NAME));
+ Assertions.assertFalse(remoteCluster.getHMSClient().tableExists(DB_NAME,
TBL_NAME));
+ // stop the remote cluster hive server to simulate cluster going down
+ remoteCluster.stopHiveServer2();
+ Assertions.assertFalse(tool.commit());
+ Assertions.assertEquals(commitTime, localCluster.getHMSClient()
+ .getTable(config.databaseName, config.tableName).getParameters()
+ .get(GLOBALLY_CONSISTENT_READ_TIMESTAMP));
+ Assertions.assertTrue(tool.rollback()); // do a rollback
+ Assertions.assertNotEquals(commitTime, localCluster.getHMSClient()
+ .getTable(config.databaseName, config.tableName).getParameters()
+ .get(GLOBALLY_CONSISTENT_READ_TIMESTAMP));
+ Assertions.assertFalse(remoteCluster.getHMSClient().tableExists(DB_NAME,
TBL_NAME));
+ remoteCluster.startHiveServer2();
+ }
+}
diff --git
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
index 4324a64..e88c46a 100644
---
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
+++
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
@@ -18,22 +18,24 @@
package org.apache.hudi.hive;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.hadoop.hive.ql.Driver;
-import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.testutils.NetworkTestUtils;
import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.hive.testutils.HiveTestUtil;
import org.apache.hudi.hive.util.ConfigUtils;
+import org.apache.hudi.hive.util.HiveSchemaUtil;
import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent;
import
org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent.PartitionEventType;
-import org.apache.hudi.hive.testutils.HiveTestUtil;
-import org.apache.hudi.hive.util.HiveSchemaUtil;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType;
@@ -48,9 +50,12 @@ import org.junit.jupiter.params.provider.MethodSource;
import java.io.IOException;
import java.net.URISyntaxException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -68,12 +73,12 @@ public class TestHiveSyncTool {
}
@BeforeEach
- public void setUp() throws IOException, InterruptedException {
+ public void setUp() throws Exception {
HiveTestUtil.setUp();
}
@AfterEach
- public void teardown() throws IOException {
+ public void teardown() throws Exception {
HiveTestUtil.clear();
}
@@ -246,6 +251,7 @@ public class TestHiveSyncTool {
hiveClient = new HoodieHiveClient(HiveTestUtil.hiveSyncConfig,
HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
List<Partition> hivePartitions =
hiveClient.scanTablePartitions(HiveTestUtil.hiveSyncConfig.tableName);
List<String> writtenPartitionsSince =
hiveClient.getPartitionsWrittenToSince(Option.empty());
+ //writtenPartitionsSince.add(newPartition.get(0));
List<PartitionEvent> partitionEvents =
hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince);
assertEquals(1, partitionEvents.size(), "There should be only one
partition event");
assertEquals(PartitionEventType.UPDATE,
partitionEvents.iterator().next().eventType,
@@ -769,6 +775,136 @@ public class TestHiveSyncTool {
"Table " + HiveTestUtil.hiveSyncConfig.tableName + " should not exist
initially");
}
+ private void verifyOldParquetFileTest(HoodieHiveClient hiveClient, String
emptyCommitTime) throws Exception {
+
assertTrue(hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName),
"Table " + HiveTestUtil.hiveSyncConfig.tableName + " should exist after sync
completes");
+
assertEquals(hiveClient.getTableSchema(HiveTestUtil.hiveSyncConfig.tableName).size(),
+ hiveClient.getDataSchema().getColumns().size() + 1,
+ "Hive Schema should match the table schema + partition field");
+ assertEquals(1,
hiveClient.scanTablePartitions(HiveTestUtil.hiveSyncConfig.tableName).size(),"Table
partitions should match the number of partitions we wrote");
+ assertEquals(emptyCommitTime,
+
hiveClient.getLastCommitTimeSynced(HiveTestUtil.hiveSyncConfig.tableName).get(),"The
last commit that was sycned should be updated in the TBLPROPERTIES");
+
+ // make sure correct schema is picked
+ Schema schema = SchemaTestUtil.getSimpleSchema();
+ for (Field field : schema.getFields()) {
+ assertEquals(field.schema().getType().getName(),
+
hiveClient.getTableSchema(HiveTestUtil.hiveSyncConfig.tableName).get(field.name()).toLowerCase(),
+ String.format("Hive Schema Field %s was added", field));
+ }
+ assertEquals("string",
+
hiveClient.getTableSchema(HiveTestUtil.hiveSyncConfig.tableName).get("datestr").toLowerCase(),
"Hive Schema Field datestr was added");
+ assertEquals(schema.getFields().size() + 1 +
HoodieRecord.HOODIE_META_COLUMNS.size(),
+
hiveClient.getTableSchema(HiveTestUtil.hiveSyncConfig.tableName).size(),"Hive
Schema fields size");
+ }
+
+ @ParameterizedTest
+ @MethodSource("useJdbc")
+ public void testPickingOlderParquetFileIfLatestIsEmptyCommit(boolean
useJdbc) throws Exception {
+ HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc;
+ final String commitTime = "100";
+ HiveTestUtil.createCOWTable(commitTime, 1, true);
+ HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
+ // create empty commit
+ final String emptyCommitTime = "200";
+ HiveTestUtil.createCommitFileWithSchema(commitMetadata, emptyCommitTime,
true);
+ HoodieHiveClient hiveClient =
+ new HoodieHiveClient(HiveTestUtil.hiveSyncConfig,
HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
+
assertFalse(hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName),"Table
" + HiveTestUtil.hiveSyncConfig.tableName + " should not exist initially");
+
+ HiveSyncTool tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig,
HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
+ tool.syncHoodieTable();
+
+ verifyOldParquetFileTest(hiveClient, emptyCommitTime);
+ }
+
+ @ParameterizedTest
+ @MethodSource("useJdbc")
+ public void testNotPickingOlderParquetFileWhenLatestCommitReadFails(boolean
useJdbc) throws Exception {
+ HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc;
+ final String commitTime = "100";
+ HiveTestUtil.createCOWTable(commitTime, 1, true);
+ HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
+
+ // evolve the schema
+ DateTime dateTime = DateTime.now().plusDays(6);
+ String commitTime2 = "101";
+ HiveTestUtil.addCOWPartitions(1, false, true, dateTime, commitTime2);
+
+ // create empty commit
+ final String emptyCommitTime = "200";
+ HiveTestUtil.createCommitFile(commitMetadata, emptyCommitTime);
+
+ HoodieHiveClient hiveClient =
+ new HoodieHiveClient(HiveTestUtil.hiveSyncConfig,
HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
+ assertFalse(
+
hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName),"Table " +
HiveTestUtil.hiveSyncConfig.tableName + " should not exist initially");
+
+ HiveSyncTool tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig,
HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
+
+ // now delete the evolved commit instant
+ Path fullPath = new Path(HiveTestUtil.hiveSyncConfig.basePath + "/" +
HoodieTableMetaClient.METAFOLDER_NAME + "/"
+ + hiveClient.getActiveTimeline().getInstants()
+ .filter(inst -> inst.getTimestamp().equals(commitTime2))
+ .findFirst().get().getFileName());
+ assertTrue(HiveTestUtil.fileSystem.delete(fullPath, false));
+
+ try {
+ tool.syncHoodieTable();
+ } catch (RuntimeException e) {
+ // we expect the table sync to fail
+ }
+
+ // table should not be synced yet
+ assertFalse(
+
hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName),"Table " +
HiveTestUtil.hiveSyncConfig.tableName + " should not exist at all");
+ }
+
+ @ParameterizedTest
+ @MethodSource("useJdbc")
+ public void
testNotPickingOlderParquetFileWhenLatestCommitReadFailsForExistingTable(boolean
useJdbc) throws Exception {
+ HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc;
+ final String commitTime = "100";
+ HiveTestUtil.createCOWTable(commitTime, 1, true);
+ HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
+ // create empty commit
+ final String emptyCommitTime = "200";
+ HiveTestUtil.createCommitFileWithSchema(commitMetadata, emptyCommitTime,
true);
+ //HiveTestUtil.createCommitFile(commitMetadata, emptyCommitTime);
+ HoodieHiveClient hiveClient =
+ new HoodieHiveClient(HiveTestUtil.hiveSyncConfig,
HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
+ assertFalse(
+ hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName),
"Table " + HiveTestUtil.hiveSyncConfig.tableName + " should not exist
initially");
+
+ HiveSyncTool tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig,
HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
+ tool.syncHoodieTable();
+
+ verifyOldParquetFileTest(hiveClient, emptyCommitTime);
+
+ // evolve the schema
+ DateTime dateTime = DateTime.now().plusDays(6);
+ String commitTime2 = "301";
+ HiveTestUtil.addCOWPartitions(1, false, true, dateTime, commitTime2);
+ //HiveTestUtil.createCommitFileWithSchema(commitMetadata, "400", false);
// create another empty commit
+ //HiveTestUtil.createCommitFile(commitMetadata, "400"); // create another
empty commit
+
+ tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig,
HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
+ HoodieHiveClient hiveClientLatest = new
HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(),
HiveTestUtil.fileSystem);
+ // now delete the evolved commit instant
+ Path fullPath = new Path(HiveTestUtil.hiveSyncConfig.basePath + "/" +
HoodieTableMetaClient.METAFOLDER_NAME + "/"
+ + hiveClientLatest.getActiveTimeline().getInstants()
+ .filter(inst -> inst.getTimestamp().equals(commitTime2))
+ .findFirst().get().getFileName());
+ assertTrue(HiveTestUtil.fileSystem.delete(fullPath, false));
+ try {
+ tool.syncHoodieTable();
+ } catch (RuntimeException e) {
+ // we expect the table sync to fail
+ }
+
+ // old sync values should be left intact
+ verifyOldParquetFileTest(hiveClient, emptyCommitTime);
+ }
+
@ParameterizedTest
@MethodSource("useJdbc")
public void testTypeConverter(boolean useJdbc) throws Exception {
@@ -807,5 +943,137 @@ public class TestHiveSyncTool {
.containsValue("BIGINT"), errorMsg);
hiveClient.updateHiveSQL(dropTableSql);
}
+ /*
+ private void verifyOldParquetFileTest(HoodieHiveClient hiveClient, String
emptyCommitTime) throws Exception {
+
assertTrue(hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName),
+ "Table " + HiveTestUtil.hiveSyncConfig.tableName + " should exist
after sync completes");
+
assertEquals(hiveClient.getTableSchema(HiveTestUtil.hiveSyncConfig.tableName).size(),
+ hiveClient.getDataSchema().getColumns().size() + 1,
+ "Hive Schema should match the table schema + partition field");
+ assertEquals(1,
+
hiveClient.scanTablePartitions(HiveTestUtil.hiveSyncConfig.tableName).size(),
+ "Table partitions should match the number of partitions we wrote");
+ assertEquals("The last commit that was sycned should be updated in the
TBLPROPERTIES", emptyCommitTime,
+
hiveClient.getLastCommitTimeSynced(HiveTestUtil.hiveSyncConfig.tableName).get());
+
+ // make sure correct schema is picked
+ Schema schema = SchemaTestUtil.getSimpleSchema();
+ for (Field field : schema.getFields()) {
+ assertEquals(String.format("Hive Schema Field %s was added", field),
field.schema().getType().getName(),
+
hiveClient.getTableSchema(HiveTestUtil.hiveSyncConfig.tableName).get(field.name()).toLowerCase());
+ }
+ assertEquals("Hive Schema Field datestr was added", "string",
+
hiveClient.getTableSchema(HiveTestUtil.hiveSyncConfig.tableName).get("datestr").toLowerCase());
+ assertEquals(schema.getFields().size() + 1,
+
hiveClient.getTableSchema(HiveTestUtil.hiveSyncConfig.tableName).size(),
+ "Hive Schema fields size");
+ }
+
+ @ParameterizedTest
+ @MethodSource("useJdbc")
+ public void testPickingOlderParquetFileIfLatestIsEmptyCommit(boolean
useJdbc) throws Exception {
+ HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc;
+ final String commitTime = "100";
+ HiveTestUtil.createCOWTable(commitTime, 1, false);
+ HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
+ // create empty commit
+ final String emptyCommitTime = "200";
+ HiveTestUtil.createCommitFile(commitMetadata, emptyCommitTime);
+ HoodieHiveClient hiveClient =
+ new HoodieHiveClient(HiveTestUtil.hiveSyncConfig,
HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
+
assertFalse(hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName),
+ "Table " + HiveTestUtil.hiveSyncConfig.tableName + " should not exist
initially");
+
+ HiveSyncTool tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig,
HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
+ tool.syncHoodieTable();
+
+ verifyOldParquetFileTest(hiveClient, emptyCommitTime);
+ }
+
+ @ParameterizedTest
+ @MethodSource("useJdbc")
+ public void testNotPickingOlderParquetFileWhenLatestCommitReadFails(boolean
useJdbc) throws Exception {
+ HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc;
+ final String commitTime = "100";
+ HiveTestUtil.createCOWTable(commitTime, 1, false);
+ HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
+
+ // evolve the schema
+ DateTime dateTime = DateTime.now().plusDays(6);
+ String commitTime2 = "101";
+ HiveTestUtil.addCOWPartitions(1, false, false, dateTime, commitTime2);
+
+ // create empty commit
+ final String emptyCommitTime = "200";
+ HiveTestUtil.createCommitFile(commitMetadata, emptyCommitTime);
+
+ HoodieHiveClient hiveClient =
+ new HoodieHiveClient(HiveTestUtil.hiveSyncConfig,
HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
+
assertFalse(hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName),
+ "Table " + HiveTestUtil.hiveSyncConfig.tableName + " should not exist
initially");
+
+ HiveSyncTool tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig,
HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
+
+ // now delete the evolved commit instant
+ Path fullPath = new Path(HiveTestUtil.hiveSyncConfig.basePath + "/" +
HoodieTableMetaClient.METAFOLDER_NAME + "/"
+ + hiveClient.getActiveTimeline().getInstants()
+ .filter(inst -> inst.getTimestamp().equals(commitTime2))
+ .findFirst().get().getFileName());
+ assertTrue(HiveTestUtil.fileSystem.delete(fullPath, false));
+
+ try {
+ tool.syncHoodieTable();
+ } catch (RuntimeException e) {
+ // we expect the table sync to fail
+ }
+
+ // table should not be synced yet
+
assertFalse(hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName),
+ "Table " + HiveTestUtil.hiveSyncConfig.tableName + " should not exist
at all");
+ }
+
+ @ParameterizedTest
+ @MethodSource("useJdbc")
+ public void
testNotPickingOlderParquetFileWhenLatestCommitReadFailsForExistingTable(boolean
useJdbc) throws Exception {
+ HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc;
+ final String commitTime = "100";
+ HiveTestUtil.createCOWTable(commitTime, 1, false);
+ HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
+ // create empty commit
+ final String emptyCommitTime = "200";
+ HiveTestUtil.createCommitFile(commitMetadata, emptyCommitTime);
+ HoodieHiveClient hiveClient =
+ new HoodieHiveClient(HiveTestUtil.hiveSyncConfig,
HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
+
assertFalse(hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName),
+ "Table " + HiveTestUtil.hiveSyncConfig.tableName + " should not exist
initially");
+
+ HiveSyncTool tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig,
HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
+ tool.syncHoodieTable();
+
+ verifyOldParquetFileTest(hiveClient, emptyCommitTime);
+
+ // evolve the schema
+ DateTime dateTime = DateTime.now().plusDays(6);
+ String commitTime2 = "301";
+ HiveTestUtil.addCOWPartitions(1, false, false, dateTime, commitTime2);
+ HiveTestUtil.createCommitFile(commitMetadata, "400"); // create another
empty commit
+
+ tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig,
HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
+ HoodieHiveClient hiveClientLatest = new
HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(),
HiveTestUtil.fileSystem);
+ // now delete the evolved commit instant
+ Path fullPath = new Path(HiveTestUtil.hiveSyncConfig.basePath + "/" +
HoodieTableMetaClient.METAFOLDER_NAME + "/"
+ + hiveClientLatest.getActiveTimeline().getInstants()
+ .filter(inst -> inst.getTimestamp().equals(commitTime2))
+ .findFirst().get().getFileName());
+ assertTrue(HiveTestUtil.fileSystem.delete(fullPath, false));
+
+ try {
+ tool.syncHoodieTable();
+ } catch (RuntimeException e) {
+ // we expect the table sync to fail
+ }
+ // old sync values should be left intact
+ verifyOldParquetFileTest(hiveClient, emptyCommitTime);
+ }*/
}
diff --git
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestService.java
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestService.java
index ac083ab..66343bf 100644
---
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestService.java
+++
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestService.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.HiveMetaStore;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IHMSHandler;
@@ -78,6 +79,7 @@ public class HiveTestService {
private ExecutorService executorService;
private TServer tServer;
private HiveServer2 hiveServer;
+ private HiveConf serverConf;
public HiveTestService(Configuration hadoopConf) throws IOException {
this.workDir = Files.createTempDirectory(System.currentTimeMillis() +
"-").toFile().getAbsolutePath();
@@ -88,6 +90,14 @@ public class HiveTestService {
return hadoopConf;
}
+ public TServer getHiveMetaStore() {
+ return tServer;
+ }
+
+ public HiveConf getServerConf() {
+ return serverConf;
+ }
+
public HiveServer2 start() throws IOException {
Objects.requireNonNull(workDir, "The work dir must be set before starting
cluster.");
@@ -102,10 +112,10 @@ public class HiveTestService {
FileIOUtils.deleteDirectory(file);
}
- HiveConf serverConf = configureHive(hadoopConf, localHiveLocation);
+ serverConf = configureHive(hadoopConf, localHiveLocation);
executorService = Executors.newSingleThreadExecutor();
- tServer = startMetaStore(bindIP, metastorePort, serverConf);
+ tServer = startMetaStore(bindIP, serverConf);
serverConf.set("hive.in.test", "true");
hiveServer = startHiveServer(serverConf);
@@ -116,7 +126,7 @@ public class HiveTestService {
} else {
serverHostname = bindIP;
}
- if (!waitForServerUp(serverConf, serverHostname, metastorePort,
CONNECTION_TIMEOUT)) {
+ if (!waitForServerUp(serverConf, serverHostname, CONNECTION_TIMEOUT)) {
throw new IOException("Waiting for startup of standalone server");
}
@@ -163,9 +173,17 @@ public class HiveTestService {
public HiveConf configureHive(Configuration conf, String localHiveLocation)
throws IOException {
conf.set("hive.metastore.local", "false");
- conf.set(HiveConf.ConfVars.METASTOREURIS.varname, "thrift://" + bindIP +
":" + metastorePort);
+ int port = metastorePort;
+ if (conf.get(HiveConf.ConfVars.METASTORE_SERVER_PORT.varname, null) ==
null) {
+ conf.setInt(ConfVars.METASTORE_SERVER_PORT.varname, metastorePort);
+ } else {
+ port = conf.getInt(ConfVars.METASTORE_SERVER_PORT.varname,
metastorePort);
+ }
+ if (conf.get(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_PORT.varname, null) ==
null) {
+ conf.setInt(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname, serverPort);
+ }
+ conf.set(HiveConf.ConfVars.METASTOREURIS.varname, "thrift://" + bindIP +
":" + port);
conf.set(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST.varname, bindIP);
- conf.setInt(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_PORT.varname,
serverPort);
// The following line to turn of SASL has no effect since HiveAuthFactory
calls
// 'new HiveConf()'. This is fixed by
https://issues.apache.org/jira/browse/HIVE-6657,
// in Hive 0.14.
@@ -191,8 +209,9 @@ public class HiveTestService {
return new HiveConf(conf, this.getClass());
}
- private boolean waitForServerUp(HiveConf serverConf, String hostname, int
port, int timeout) {
+ private boolean waitForServerUp(HiveConf serverConf, String hostname, int
timeout) {
long start = System.currentTimeMillis();
+ int port = serverConf.getIntVar(HiveConf.ConfVars.METASTORE_SERVER_PORT);
while (true) {
try {
new HiveMetaStoreClient(serverConf);
@@ -288,11 +307,12 @@ public class HiveTestService {
}
}
- public TServer startMetaStore(String forceBindIP, int port, HiveConf conf)
throws IOException {
+ public TServer startMetaStore(String forceBindIP, HiveConf conf) throws
IOException {
try {
// Server will create new threads up to max as necessary. After an idle
// period, it will destory threads to keep the number of threads in the
// pool to min.
+ int port = conf.getIntVar(HiveConf.ConfVars.METASTORE_SERVER_PORT);
int minWorkerThreads =
conf.getIntVar(HiveConf.ConfVars.METASTORESERVERMINTHREADS);
int maxWorkerThreads =
conf.getIntVar(HiveConf.ConfVars.METASTORESERVERMAXTHREADS);
boolean tcpKeepAlive =
conf.getBoolVar(HiveConf.ConfVars.METASTORE_TCP_KEEP_ALIVE);
diff --git
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
index 1d6bfb4..46f95f6 100644
---
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
+++
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
@@ -123,10 +123,10 @@ public class HiveTestUtil {
public static void clear() throws IOException {
fileSystem.delete(new Path(hiveSyncConfig.basePath), true);
HoodieTableMetaClient.withPropertyBuilder()
- .setTableType(HoodieTableType.COPY_ON_WRITE)
- .setTableName(hiveSyncConfig.tableName)
- .setPayloadClass(HoodieAvroPayload.class)
- .initTable(configuration, hiveSyncConfig.basePath);
+ .setTableType(HoodieTableType.COPY_ON_WRITE)
+ .setTableName(hiveSyncConfig.tableName)
+ .setPayloadClass(HoodieAvroPayload.class)
+ .initTable(configuration, hiveSyncConfig.basePath);
HoodieHiveClient client = new HoodieHiveClient(hiveSyncConfig,
hiveServer.getHiveConf(), fileSystem);
for (String tableName : createdTablesSet) {
@@ -158,10 +158,10 @@ public class HiveTestUtil {
Path path = new Path(hiveSyncConfig.basePath);
FileIOUtils.deleteDirectory(new File(hiveSyncConfig.basePath));
HoodieTableMetaClient.withPropertyBuilder()
- .setTableType(HoodieTableType.COPY_ON_WRITE)
- .setTableName(hiveSyncConfig.tableName)
- .setPayloadClass(HoodieAvroPayload.class)
- .initTable(configuration, hiveSyncConfig.basePath);
+ .setTableType(HoodieTableType.COPY_ON_WRITE)
+ .setTableName(hiveSyncConfig.tableName)
+ .setPayloadClass(HoodieAvroPayload.class)
+ .initTable(configuration, hiveSyncConfig.basePath);
boolean result = fileSystem.mkdirs(path);
checkResult(result);
@@ -173,15 +173,15 @@ public class HiveTestUtil {
}
public static void createMORTable(String commitTime, String deltaCommitTime,
int numberOfPartitions,
- boolean createDeltaCommit, boolean useSchemaFromCommitMetadata)
+ boolean createDeltaCommit, boolean
useSchemaFromCommitMetadata)
throws IOException, URISyntaxException, InterruptedException {
Path path = new Path(hiveSyncConfig.basePath);
FileIOUtils.deleteDirectory(new File(hiveSyncConfig.basePath));
HoodieTableMetaClient.withPropertyBuilder()
- .setTableType(HoodieTableType.MERGE_ON_READ)
- .setTableName(hiveSyncConfig.tableName)
- .setPayloadClass(HoodieAvroPayload.class)
- .initTable(configuration, hiveSyncConfig.basePath);
+ .setTableType(HoodieTableType.MERGE_ON_READ)
+ .setTableName(hiveSyncConfig.tableName)
+ .setPayloadClass(HoodieAvroPayload.class)
+ .initTable(configuration, hiveSyncConfig.basePath);
boolean result = fileSystem.mkdirs(path);
checkResult(result);
@@ -189,25 +189,25 @@ public class HiveTestUtil {
HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions,
true,
useSchemaFromCommitMetadata, dateTime, commitTime);
createdTablesSet
- .add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName +
HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE);
+ .add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName +
HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE);
createdTablesSet
.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName +
HiveSyncTool.SUFFIX_SNAPSHOT_TABLE);
HoodieCommitMetadata compactionMetadata = new HoodieCommitMetadata();
commitMetadata.getPartitionToWriteStats()
.forEach((key, value) -> value.forEach(l ->
compactionMetadata.addWriteStat(key, l)));
addSchemaToCommitMetadata(compactionMetadata,
commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY),
- useSchemaFromCommitMetadata);
+ useSchemaFromCommitMetadata);
createCompactionCommitFile(compactionMetadata, commitTime);
if (createDeltaCommit) {
// Write a delta commit
HoodieCommitMetadata deltaMetadata =
createLogFiles(commitMetadata.getPartitionToWriteStats(), true,
-
useSchemaFromCommitMetadata);
+ useSchemaFromCommitMetadata);
createDeltaCommitFile(deltaMetadata, deltaCommitTime);
}
}
public static void addCOWPartitions(int numberOfPartitions, boolean
isParquetSchemaSimple,
- boolean useSchemaFromCommitMetadata, DateTime startFrom, String
instantTime) throws IOException, URISyntaxException {
+ boolean useSchemaFromCommitMetadata,
DateTime startFrom, String instantTime) throws IOException, URISyntaxException {
HoodieCommitMetadata commitMetadata =
createPartitions(numberOfPartitions, isParquetSchemaSimple,
useSchemaFromCommitMetadata, startFrom, instantTime);
createdTablesSet.add(hiveSyncConfig.databaseName + "." +
hiveSyncConfig.tableName);
@@ -215,7 +215,7 @@ public class HiveTestUtil {
}
public static void addCOWPartition(String partitionPath, boolean
isParquetSchemaSimple,
- boolean useSchemaFromCommitMetadata, String instantTime) throws
IOException, URISyntaxException {
+ boolean useSchemaFromCommitMetadata,
String instantTime) throws IOException, URISyntaxException {
HoodieCommitMetadata commitMetadata =
createPartition(partitionPath, isParquetSchemaSimple,
useSchemaFromCommitMetadata, instantTime);
createdTablesSet.add(hiveSyncConfig.databaseName + "." +
hiveSyncConfig.tableName);
@@ -223,7 +223,7 @@ public class HiveTestUtil {
}
public static void addMORPartitions(int numberOfPartitions, boolean
isParquetSchemaSimple, boolean isLogSchemaSimple,
- boolean useSchemaFromCommitMetadata, DateTime startFrom, String
instantTime, String deltaCommitTime)
+ boolean useSchemaFromCommitMetadata,
DateTime startFrom, String instantTime, String deltaCommitTime)
throws IOException, URISyntaxException, InterruptedException {
HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions,
isParquetSchemaSimple,
useSchemaFromCommitMetadata, startFrom, instantTime);
@@ -233,7 +233,7 @@ public class HiveTestUtil {
commitMetadata.getPartitionToWriteStats()
.forEach((key, value) -> value.forEach(l ->
compactionMetadata.addWriteStat(key, l)));
addSchemaToCommitMetadata(compactionMetadata,
commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY),
- useSchemaFromCommitMetadata);
+ useSchemaFromCommitMetadata);
createCompactionCommitFile(compactionMetadata, instantTime);
HoodieCommitMetadata deltaMetadata =
createLogFiles(commitMetadata.getPartitionToWriteStats(), isLogSchemaSimple,
useSchemaFromCommitMetadata);
@@ -241,7 +241,7 @@ public class HiveTestUtil {
}
private static HoodieCommitMetadata createLogFiles(Map<String,
List<HoodieWriteStat>> partitionWriteStats,
- boolean isLogSchemaSimple, boolean useSchemaFromCommitMetadata)
+ boolean
isLogSchemaSimple, boolean useSchemaFromCommitMetadata)
throws InterruptedException, IOException, URISyntaxException {
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
for (Entry<String, List<HoodieWriteStat>> wEntry :
partitionWriteStats.entrySet()) {
@@ -261,7 +261,7 @@ public class HiveTestUtil {
}
private static HoodieCommitMetadata createPartitions(int numberOfPartitions,
boolean isParquetSchemaSimple,
- boolean useSchemaFromCommitMetadata, DateTime startFrom, String
instantTime) throws IOException, URISyntaxException {
+ boolean
useSchemaFromCommitMetadata, DateTime startFrom, String instantTime) throws
IOException, URISyntaxException {
startFrom = startFrom.withTimeAtStartOfDay();
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
@@ -279,7 +279,7 @@ public class HiveTestUtil {
}
private static HoodieCommitMetadata createPartition(String partitionPath,
boolean isParquetSchemaSimple,
- boolean useSchemaFromCommitMetadata, String instantTime) throws
IOException, URISyntaxException {
+ boolean
useSchemaFromCommitMetadata, String instantTime) throws IOException,
URISyntaxException {
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
Path partPath = new Path(hiveSyncConfig.basePath + "/" + partitionPath);
fileSystem.makeQualified(partPath);
@@ -354,7 +354,7 @@ public class HiveTestUtil {
}
private static void addSchemaToCommitMetadata(HoodieCommitMetadata
commitMetadata, boolean isSimpleSchema,
- boolean useSchemaFromCommitMetadata) throws IOException {
+ boolean
useSchemaFromCommitMetadata) throws IOException {
if (useSchemaFromCommitMetadata) {
Schema dataSchema = getTestDataSchema(isSimpleSchema);
commitMetadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY,
dataSchema.toString());
@@ -362,7 +362,7 @@ public class HiveTestUtil {
}
private static void addSchemaToCommitMetadata(HoodieCommitMetadata
commitMetadata, String schema,
- boolean useSchemaFromCommitMetadata) {
+ boolean
useSchemaFromCommitMetadata) {
if (useSchemaFromCommitMetadata) {
commitMetadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, schema);
}
@@ -374,7 +374,7 @@ public class HiveTestUtil {
}
}
- private static void createCommitFile(HoodieCommitMetadata commitMetadata,
String instantTime) throws IOException {
+ public static void createCommitFile(HoodieCommitMetadata commitMetadata,
String instantTime) throws IOException {
byte[] bytes =
commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8);
Path fullPath = new Path(hiveSyncConfig.basePath + "/" +
HoodieTableMetaClient.METAFOLDER_NAME + "/"
+ HoodieTimeline.makeCommitFileName(instantTime));
@@ -383,6 +383,11 @@ public class HiveTestUtil {
fsout.close();
}
+ public static void createCommitFileWithSchema(HoodieCommitMetadata
commitMetadata, String instantTime, boolean isSimpleSchema) throws IOException {
+ addSchemaToCommitMetadata(commitMetadata, isSimpleSchema, true);
+ createCommitFile(commitMetadata, instantTime);
+ }
+
private static void createCompactionCommitFile(HoodieCommitMetadata
commitMetadata, String instantTime)
throws IOException {
byte[] bytes =
commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8);
@@ -406,4 +411,4 @@ public class HiveTestUtil {
public static Set<String> getCreatedTablesSet() {
return createdTablesSet;
}
-}
+}
\ No newline at end of file
diff --git
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/TestCluster.java
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/TestCluster.java
new file mode 100644
index 0000000..6a077e1
--- /dev/null
+++
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/TestCluster.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hive.testutils;
+
+import org.apache.hudi.avro.HoodieAvroWriteSupport;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.BloomFilterFactory;
+import org.apache.hudi.common.bloom.BloomFilterTypeCode;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.testutils.SchemaTestUtil;
+import org.apache.hudi.common.testutils.minicluster.HdfsTestService;
+import org.apache.hudi.common.util.FileIOUtils;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hive.service.server.HiveServer2;
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.runners.model.InitializationError;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.UUID;
+
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class TestCluster implements BeforeAllCallback, AfterAllCallback,
+ BeforeEachCallback, AfterEachCallback {
+ private HdfsTestService hdfsTestService;
+ public HiveTestService hiveTestService;
+ private Configuration conf;
+ public HiveServer2 server2;
+ private static volatile int port = 9083;
+ public MiniDFSCluster dfsCluster;
+ DateTimeFormatter dtfOut;
+ public File hiveSiteXml;
+ private IMetaStoreClient client;
+
+ @Override
+ public void beforeAll(ExtensionContext context) throws Exception {
+ setup();
+ }
+
+ @Override
+ public void afterAll(ExtensionContext context) throws Exception {
+ shutDown();
+ }
+
+ @Override
+ public void beforeEach(ExtensionContext context) throws Exception {
+ }
+
+ @Override
+ public void afterEach(ExtensionContext context) throws Exception {
+ }
+
+ public void setup() throws Exception {
+ hdfsTestService = new HdfsTestService();
+ dfsCluster = hdfsTestService.start(true);
+
+ conf = hdfsTestService.getHadoopConf();
+ conf.setInt(ConfVars.METASTORE_SERVER_PORT.varname, port++);
+ conf.setInt(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname, port++);
+ conf.setInt(ConfVars.HIVE_SERVER2_WEBUI_PORT.varname, port++);
+ hiveTestService = new HiveTestService(conf);
+ server2 = hiveTestService.start();
+ dtfOut = DateTimeFormat.forPattern("yyyy/MM/dd");
+ hiveSiteXml = File.createTempFile("hive-site", ".xml");
+ hiveSiteXml.deleteOnExit();
+ try (OutputStream os = new FileOutputStream(hiveSiteXml)) {
+ hiveTestService.getServerConf().writeXml(os);
+ }
+ client = HiveMetaStoreClient.newSynchronizedClient(
+ RetryingMetaStoreClient.getProxy(hiveTestService.getServerConf(),
true));
+ }
+
+ public Configuration getConf() {
+ return this.conf;
+ }
+
+ public String getHiveSiteXmlLocation() {
+ return hiveSiteXml.getAbsolutePath();
+ }
+
+ public IMetaStoreClient getHMSClient() {
+ return client;
+ }
+
+ public String getHiveJdBcUrl() {
+ return "jdbc:hive2://127.0.0.1:" +
conf.get(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname) + "";
+ }
+
+ public String tablePath(String dbName, String tableName) throws Exception {
+ return dbPath(dbName) + "/" + tableName;
+ }
+
+ private String dbPath(String dbName) throws Exception {
+ return dfsCluster.getFileSystem().getWorkingDirectory().toString() + "/" +
dbName;
+ }
+
+ public void forceCreateDb(String dbName) throws Exception {
+ try {
+ getHMSClient().dropDatabase(dbName);
+ } catch (NoSuchObjectException e) {
+ System.out.println("db does not exist but its ok " + dbName);
+ }
+ Database db = new Database(dbName, "", dbPath(dbName), new HashMap<>());
+ getHMSClient().createDatabase(db);
+ }
+
+ public void createCOWTable(String commitTime, int numberOfPartitions, String
dbName, String tableName)
+ throws Exception {
+ String tablePathStr = tablePath(dbName, tableName);
+ Path path = new Path(tablePathStr);
+ FileIOUtils.deleteDirectory(new File(path.toString()));
+ HoodieTableMetaClient.withPropertyBuilder()
+ .setTableType(HoodieTableType.COPY_ON_WRITE)
+ .setTableName(tableName)
+ .setPayloadClass(HoodieAvroPayload.class)
+ .initTable(conf, path.toString());
+ boolean result = dfsCluster.getFileSystem().mkdirs(path);
+ if (!result) {
+ throw new InitializationError("cannot initialize table");
+ }
+ DateTime dateTime = DateTime.now();
+ HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions,
true, dateTime, commitTime, path.toString());
+ createCommitFile(commitMetadata, commitTime, path.toString());
+ }
+
+ private void createCommitFile(HoodieCommitMetadata commitMetadata, String
commitTime, String basePath) throws IOException {
+ byte[] bytes =
commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8);
+ Path fullPath = new Path(basePath + "/" +
HoodieTableMetaClient.METAFOLDER_NAME + "/"
+ + HoodieTimeline.makeCommitFileName(commitTime));
+ FSDataOutputStream fsout = dfsCluster.getFileSystem().create(fullPath,
true);
+ fsout.write(bytes);
+ fsout.close();
+ }
+
+ private HoodieCommitMetadata createPartitions(int numberOfPartitions,
boolean isParquetSchemaSimple,
+ DateTime startFrom, String commitTime, String basePath) throws
IOException, URISyntaxException {
+ startFrom = startFrom.withTimeAtStartOfDay();
+
+ HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
+ for (int i = 0; i < numberOfPartitions; i++) {
+ String partitionPath = dtfOut.print(startFrom);
+ Path partPath = new Path(basePath + "/" + partitionPath);
+ dfsCluster.getFileSystem().makeQualified(partPath);
+ dfsCluster.getFileSystem().mkdirs(partPath);
+ List<HoodieWriteStat> writeStats = createTestData(partPath,
isParquetSchemaSimple, commitTime);
+ startFrom = startFrom.minusDays(1);
+ writeStats.forEach(s -> commitMetadata.addWriteStat(partitionPath, s));
+ }
+ return commitMetadata;
+ }
+
+ private List<HoodieWriteStat> createTestData(Path partPath, boolean
isParquetSchemaSimple, String commitTime)
+ throws IOException, URISyntaxException {
+ List<HoodieWriteStat> writeStats = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ // Create 5 files
+ String fileId = UUID.randomUUID().toString();
+ Path filePath = new Path(partPath.toString() + "/" + FSUtils
+ .makeDataFileName(commitTime, "1-0-1", fileId));
+ generateParquetData(filePath, isParquetSchemaSimple);
+ HoodieWriteStat writeStat = new HoodieWriteStat();
+ writeStat.setFileId(fileId);
+ writeStat.setPath(filePath.toString());
+ writeStats.add(writeStat);
+ }
+ return writeStats;
+ }
+
+ @SuppressWarnings({"unchecked", "deprecation"})
+ private void generateParquetData(Path filePath, boolean
isParquetSchemaSimple)
+ throws IOException, URISyntaxException {
+ Schema schema = (isParquetSchemaSimple ? SchemaTestUtil.getSimpleSchema()
: SchemaTestUtil.getEvolvedSchema());
+ org.apache.parquet.schema.MessageType parquetSchema = new
AvroSchemaConverter().convert(schema);
+ BloomFilter filter = BloomFilterFactory.createBloomFilter(1000, 0.0001, -1,
+ BloomFilterTypeCode.SIMPLE.name());
+ HoodieAvroWriteSupport writeSupport = new
HoodieAvroWriteSupport(parquetSchema, schema, filter);
+ ParquetWriter writer = new ParquetWriter(filePath, writeSupport,
CompressionCodecName.GZIP, 120 * 1024 * 1024,
+ ParquetWriter.DEFAULT_PAGE_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE,
ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED,
+ ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED,
ParquetWriter.DEFAULT_WRITER_VERSION, dfsCluster.getFileSystem().getConf());
+
+ List<IndexedRecord> testRecords = (isParquetSchemaSimple ?
SchemaTestUtil.generateTestRecords(0, 100)
+ : SchemaTestUtil.generateEvolvedTestRecords(100, 100));
+ testRecords.forEach(s -> {
+ try {
+ writer.write(s);
+ } catch (IOException e) {
+ fail("IOException while writing test records as parquet" +
e.toString());
+ }
+ });
+ writer.close();
+ }
+
+ public HiveConf getHiveConf() {
+ return server2.getHiveConf();
+ }
+
+ public void stopHiveServer2() {
+ if (server2 != null) {
+ server2.stop();
+ server2 = null;
+ }
+ }
+
+ public void startHiveServer2() {
+ if (server2 == null) {
+ server2 = new HiveServer2();
+ server2.init(hiveTestService.getServerConf());
+ server2.start();
+ }
+ }
+
+ public void shutDown() {
+ stopHiveServer2();
+ client.close();
+ hiveTestService.getHiveMetaStore().stop();
+ hdfsTestService.stop();
+ }
+}
diff --git
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/TestHiveSyncGlobalCommitTool.java
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/TestHiveSyncGlobalCommitTool.java
new file mode 100644
index 0000000..980374e
--- /dev/null
+++
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/TestHiveSyncGlobalCommitTool.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hive.testutils;
+
+import org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig;
+import org.apache.hudi.hive.replication.HiveSyncGlobalCommitTool;
+
+import org.apache.hadoop.fs.Path;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+
+import static
org.apache.hudi.hadoop.utils.HoodieHiveUtils.GLOBALLY_CONSISTENT_READ_TIMESTAMP;
+import static
org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.LOCAL_BASE_PATH;
+import static
org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.LOCAL_HIVE_SERVER_JDBC_URLS;
+import static
org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.LOCAL_HIVE_SITE_URI;
+import static
org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.REMOTE_BASE_PATH;
+import static
org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.REMOTE_HIVE_SERVER_JDBC_URLS;
+import static
org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.REMOTE_HIVE_SITE_URI;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestHiveSyncGlobalCommitTool {
+
+ TestCluster localCluster;
+ TestCluster remoteCluster;
+
+ private static String DB_NAME = "foo";
+ private static String TBL_NAME = "bar";
+
+ private HiveSyncGlobalCommitConfig getGlobalCommitConfig(
+ String commitTime, String dbName, String tblName) throws Exception {
+ HiveSyncGlobalCommitConfig config = new HiveSyncGlobalCommitConfig();
+ config.properties.setProperty(LOCAL_HIVE_SITE_URI,
localCluster.getHiveSiteXmlLocation());
+ config.properties.setProperty(REMOTE_HIVE_SITE_URI,
remoteCluster.getHiveSiteXmlLocation());
+ config.properties.setProperty(LOCAL_HIVE_SERVER_JDBC_URLS,
localCluster.getHiveJdBcUrl());
+ config.properties.setProperty(REMOTE_HIVE_SERVER_JDBC_URLS,
remoteCluster.getHiveJdBcUrl());
+ config.properties.setProperty(LOCAL_BASE_PATH,
localCluster.tablePath(dbName, tblName));
+ config.properties.setProperty(REMOTE_BASE_PATH,
remoteCluster.tablePath(dbName, tblName));
+ config.globallyReplicatedTimeStamp = commitTime;
+ config.hiveUser = System.getProperty("user.name");
+ config.hivePass = "";
+ config.databaseName = dbName;
+ config.tableName = tblName;
+ config.basePath = localCluster.tablePath(dbName, tblName);
+ config.assumeDatePartitioning = true;
+ config.usePreApacheInputFormat = false;
+ config.partitionFields = Collections.singletonList("datestr");
+ return config;
+ }
+
+ private void compareEqualLastReplicatedTimeStamp(HiveSyncGlobalCommitConfig
config) throws Exception {
+ assertEquals(localCluster.getHMSClient().getTable(config.databaseName,
config.tableName).getParameters().get(GLOBALLY_CONSISTENT_READ_TIMESTAMP),
+ remoteCluster.getHMSClient().getTable(config.databaseName,
config.tableName).getParameters().get(GLOBALLY_CONSISTENT_READ_TIMESTAMP),
+ "compare replicated timestamps");
+ }
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ localCluster = new TestCluster();
+ localCluster.setup();
+ remoteCluster = new TestCluster();
+ remoteCluster.setup();
+ localCluster.forceCreateDb(DB_NAME);
+ remoteCluster.forceCreateDb(DB_NAME);
+ localCluster.dfsCluster.getFileSystem().delete(new
Path(localCluster.tablePath(DB_NAME, TBL_NAME)), true);
+ remoteCluster.dfsCluster.getFileSystem().delete(new
Path(remoteCluster.tablePath(DB_NAME, TBL_NAME)), true);
+ }
+
+ @AfterEach
+ public void clear() throws Exception {
+ localCluster.getHMSClient().dropTable(DB_NAME, TBL_NAME);
+ remoteCluster.getHMSClient().dropTable(DB_NAME, TBL_NAME);
+ localCluster.shutDown();
+ remoteCluster.shutDown();
+ }
+
+ @Test
+ public void testBasicGlobalCommit() throws Exception {
+ String commitTime = "100";
+ localCluster.createCOWTable(commitTime, 5, DB_NAME, TBL_NAME);
+ // simulate drs
+ remoteCluster.createCOWTable(commitTime, 5, DB_NAME, TBL_NAME);
+ HiveSyncGlobalCommitConfig config = getGlobalCommitConfig(commitTime,
DB_NAME, TBL_NAME);
+ HiveSyncGlobalCommitTool tool = new HiveSyncGlobalCommitTool(config);
+ assertTrue(tool.commit());
+ compareEqualLastReplicatedTimeStamp(config);
+ }
+
+ @Test
+ public void testBasicRollback() throws Exception {
+ String commitTime = "100";
+ localCluster.createCOWTable(commitTime, 5, DB_NAME, TBL_NAME);
+ // simulate drs
+ remoteCluster.createCOWTable(commitTime, 5, DB_NAME, TBL_NAME);
+ HiveSyncGlobalCommitConfig config = getGlobalCommitConfig(commitTime,
DB_NAME, TBL_NAME);
+ HiveSyncGlobalCommitTool tool = new HiveSyncGlobalCommitTool(config);
+ assertFalse(localCluster.getHMSClient().tableExists(DB_NAME, TBL_NAME));
+ assertFalse(remoteCluster.getHMSClient().tableExists(DB_NAME, TBL_NAME));
+ // stop the remote cluster hive server to simulate cluster going down
+ remoteCluster.stopHiveServer2();
+ assertFalse(tool.commit());
+ assertEquals(commitTime, localCluster.getHMSClient()
+ .getTable(config.databaseName, config.tableName).getParameters()
+ .get(GLOBALLY_CONSISTENT_READ_TIMESTAMP));
+ assertTrue(tool.rollback()); // do a rollback
+ assertNotEquals(commitTime, localCluster.getHMSClient()
+ .getTable(config.databaseName, config.tableName).getParameters()
+ .get(GLOBALLY_CONSISTENT_READ_TIMESTAMP));
+ assertFalse(remoteCluster.getHMSClient().tableExists(DB_NAME, TBL_NAME));
+ remoteCluster.startHiveServer2();
+ }
+}
diff --git
a/hudi-utilities/src/test/resources/delta-streamer-config/hive-global-commit-config.xml
b/hudi-utilities/src/test/resources/delta-streamer-config/hive-global-commit-config.xml
new file mode 100644
index 0000000..e1a8347
--- /dev/null
+++
b/hudi-utilities/src/test/resources/delta-streamer-config/hive-global-commit-config.xml
@@ -0,0 +1,27 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+<!DOCTYPE properties SYSTEM "http://java.sun.com/dtd/properties.dtd">
+<properties>
+<comment>hi</comment>
+<entry key="hivesyncglobal.retry_attempts">1</entry>
+<entry
key="hivesyncglobal.remote_hive_site_uri">/home/hive-remote-site.xml</entry>
+<entry
key="hivesyncglobal.remote_base_path">hdfs://hadoop-cluster2:9000/tmp/hudi_trips_cow</entry>
+<entry
key="hivesyncglobal.local_hive_site_uri">/home/hive/packaging/target/apache-hive-2.3.4-uber-51-SNAPSHOT-bin/apache-hive-2.3.4-uber-51-SNAPSHOT-bin/conf/hive-site.xml</entry>
+<entry
key="hivesyncglobal.remote_hs2_jdbc_urls">jdbc:hive2://hadoop-cluster2:10000/default;transportMode=http;httpPath=hs2</entry>
+</properties>