This is an automated email from the ASF dual-hosted git repository.
pravin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new a07c3cc HIVE-25708: Implement creation of table_diff. (Ayush Saxena,
reviewed by Pravin Kumar Sinha)
a07c3cc is described below
commit a07c3ccb1bff733ff41f6c38947f406aebc11353
Author: Ayush Saxena <[email protected]>
AuthorDate: Fri Dec 17 10:25:18 2021 +0530
HIVE-25708: Implement creation of table_diff. (Ayush Saxena, reviewed by
Pravin Kumar Sinha)
---
.../java/org/apache/hadoop/hive/ql/ErrorMsg.java | 3 +-
.../parse/TestReplicationOptimisedBootstrap.java | 453 +++++++++++++++++++++
.../parse/TestReplicationScenariosAcidTables.java | 24 --
.../TestReplicationScenariosAcrossInstances.java | 7 +-
.../hadoop/hive/ql/parse/WarehouseInstance.java | 4 +-
.../hive/ql/exec/repl/OptimisedBootstrapUtils.java | 294 +++++++++++++
.../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 82 +++-
.../hadoop/hive/ql/exec/repl/ReplLoadTask.java | 29 ++
.../hadoop/hive/ql/exec/repl/ReplLoadWork.java | 4 +
.../hive/ql/parse/ReplicationSemanticAnalyzer.java | 4 +-
10 files changed, 860 insertions(+), 44 deletions(-)
diff --git a/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
b/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
index 6220175..927650f 100644
--- a/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
+++ b/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
@@ -633,7 +633,8 @@ public enum ErrorMsg {
REPL_DISTCP_SNAPSHOT_EXCEPTION(40015, "SNAPSHOT_ERROR", true),
RANGER_AUTHORIZATION_FAILED(40016, "Authorization Failure while
communicating to Ranger admin", true),
RANGER_AUTHENTICATION_FAILED(40017, "Authentication Failure while
communicating to Ranger admin", true),
- REPL_INCOMPATIBLE_EXCEPTION(40018, "Cannot load into database {0} as it is
replication incompatible.", true)
+ REPL_INCOMPATIBLE_EXCEPTION(40018, "Cannot load into database {0} as it is
replication incompatible.", true),
+ REPL_FAILOVER_TARGET_MODIFIED(40019,"Database event id changed post table
diff generation.")
;
private int errorCode;
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
new file mode 100644
index 0000000..d5b819d
--- /dev/null
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import
org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.QUOTA_RESET;
+import static
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.EVENT_ACK_FILE;
+import static
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.TABLE_DIFF_COMPLETE_DIRECTORY;
+import static
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.TABLE_DIFF_INPROGRESS_DIRECTORY;
+import static
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getPathsFromTableFile;
+import static
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getTablesFromTableDiffFile;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestReplicationOptimisedBootstrap extends
BaseReplicationAcrossInstances {
+
+ String extraPrimaryDb;
+
+ @BeforeClass
+ public static void classLevelSetup() throws Exception {
+ HashMap<String, String> overrides = new HashMap<>();
+ overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
+ GzipJSONMessageEncoder.class.getCanonicalName());
+ overrides.put(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY.varname, "false");
+ overrides.put(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname,
"true");
+ overrides.put(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname,
UserGroupInformation.getCurrentUser().getUserName());
+
overrides.put(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname,
"true");
+
+ internalBeforeClassSetupExclusiveReplica(overrides, overrides,
TestReplicationOptimisedBootstrap.class);
+ }
+
+ @Before
+ public void setup() throws Throwable {
+ super.setup();
+ extraPrimaryDb = "extra_" + primaryDbName;
+ }
+
+ @After
+ public void tearDown() throws Throwable {
+ primary.run("drop database if exists " + extraPrimaryDb + " cascade");
+ super.tearDown();
+ }
+
+ @Test
+ public void testBuildTableDiffGeneration() throws Throwable {
+ List<String> withClause =
ReplicationTestUtils.includeExternalTableClause(true);
+ withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" +
primary.repldDir + "'");
+ // Create two external & two managed tables and do a bootstrap dump & load.
+ WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+ .run("create external table t1 (id int)")
+ .run("insert into table t1 values (1)")
+ .run("insert into table t1 values (2),(3),(4)")
+ .run("create external table t2 (place string) partitioned by (country
string)")
+ .run("insert into table t2 partition(country='india') values
('chennai')")
+ .run("insert into table t2 partition(country='us') values ('new
york')")
+ .run("create table t1_managed (id int)")
+ .run("insert into table t1_managed values (10)")
+ .run("insert into table t1_managed values (20),(31),(42)")
+ .run("create table t2_managed (place string) partitioned by (country
string)")
+ .run("insert into table t2_managed partition(country='india') values
('bangalore')")
+ .run("insert into table t2_managed partition(country='us') values
('austin')")
+ .dump(primaryDbName, withClause);
+
+ // Do the bootstrap load and check all the external & managed tables are
present.
+ replica.load(replicatedDbName, primaryDbName, withClause)
+ .run("repl status " + replicatedDbName)
+ .verifyResult(tuple.lastReplicationId)
+ .run("use " + replicatedDbName)
+ .run("show tables like 't1'")
+ .verifyResult("t1")
+ .run("show tables like 't2'")
+ .verifyResult("t2")
+ .run("show tables like 't1_managed'")
+ .verifyResult("t1_managed")
+ .run("show tables like 't2_managed'")
+ .verifyResult("t2_managed")
+ .verifyReplTargetProperty(replicatedDbName);
+
+ // Do an incremental dump & load, Add one table which we can drop & an
empty table as well.
+ tuple = primary.run("use " + primaryDbName)
+ .run("create table t5_managed (id int)")
+ .run("insert into table t5_managed values (110)")
+ .run("insert into table t5_managed values (110)")
+ .run("create table t6_managed (id int)")
+ .dump(primaryDbName, withClause);
+
+ replica.load(replicatedDbName, primaryDbName, withClause)
+ .run("use " + replicatedDbName)
+ .run("show tables like 't5_managed'")
+ .verifyResult("t5_managed")
+ .run("show tables like 't6_managed'")
+ .verifyResult("t6_managed")
+ .verifyReplTargetProperty(replicatedDbName);
+
+ // Do some modifications on other database with similar table names &
some modifications on original source
+ // cluster.
+ primary.run("create database " + extraPrimaryDb)
+ .run("use " + extraPrimaryDb)
+ .run("create external table t1 (id int)")
+ .run("create table t1_managed (id int)")
+ .run("use " + primaryDbName)
+ .run("create external table t4 (id int)")
+ .run("insert into table t4 values (100)")
+ .run("insert into table t4 values (201)")
+ .run("create table t4_managed (id int)")
+ .run("insert into table t4_managed values (110)")
+ .run("insert into table t4_managed values (220)")
+ .run("insert into table t2 partition(country='france') values
('lyon')")
+ .run("insert into table t2_managed partition(country='france') values
('nice')")
+ .run("alter table t6_managed add columns (name string)")
+ .run("drop table t5_managed");
+
+ // Prepare for reverse replication.
+ DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+ Path newReplDir = new Path(replica.repldDir + "1");
+ replicaFs.mkdirs(newReplDir);
+ withClause = ReplicationTestUtils.includeExternalTableClause(true);
+ withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" +
newReplDir + "'");
+
+ // Do a reverse dump
+ tuple = replica.dump(replicatedDbName, withClause);
+
+ // Check the event ack file got created.
+ assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + "
doesn't exist",
+ replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
+
+ // Check in case the dump isn't consumed, and we attempt a dump again,
that gets skipped and the dump directory
+ // doesn't change, without any errors.
+
+ Path dumpPath = new Path(tuple.dumpLocation);
+ ContentSummary beforeContentSummary =
replicaFs.getContentSummary(dumpPath.getParent());
+ WarehouseInstance.Tuple emptyTuple = replica.dump(replicatedDbName,
withClause);
+ assertTrue(emptyTuple.dumpLocation.isEmpty());
+ assertTrue(emptyTuple.lastReplicationId.isEmpty());
+ ContentSummary afterContentSummary =
replicaFs.getContentSummary(dumpPath.getParent());
+ assertEquals(beforeContentSummary.getFileAndDirectoryCount(),
afterContentSummary.getFileAndDirectoryCount());
+
+ // Check the event ack file stays intact, despite having a skipped dump.
+ assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString(),
+ replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
+
+ // Do a load, this should create a table_diff_complete directory
+ primary.load(primaryDbName,replicatedDbName, withClause);
+
+ // Check the table diff directory exist.
+ assertTrue(new Path(tuple.dumpLocation,
TABLE_DIFF_COMPLETE_DIRECTORY).toString() + " doesn't exist",
+ replicaFs.exists(new Path(tuple.dumpLocation,
TABLE_DIFF_COMPLETE_DIRECTORY)));
+
+ // Check the table diff has all the modified table, including the dropped
and empty ones
+ HashSet<String> tableDiffEntries = getTablesFromTableDiffFile(dumpPath,
conf);
+ assertTrue("Table Diff Contains " + tableDiffEntries, tableDiffEntries
+ .containsAll(Arrays.asList("t4", "t2", "t4_managed", "t2_managed",
"t5_managed", "t6_managed")));
+
+ // Do a load again and see, nothing changes as this load isn't consumed.
+ beforeContentSummary = replicaFs.getContentSummary(dumpPath.getParent());
+ primary.load(primaryDbName, replicatedDbName, withClause);
+ assertTrue("Table Diff Contains " + tableDiffEntries, tableDiffEntries
+ .containsAll(Arrays.asList("t4", "t2", "t4_managed", "t2_managed",
"t5_managed", "t6_managed")));
+ afterContentSummary = replicaFs.getContentSummary(dumpPath.getParent());
+ assertEquals(beforeContentSummary.getFileAndDirectoryCount(),
afterContentSummary.getFileAndDirectoryCount());
+
+ // Check there are entries in the table files.
+ assertFalse(getPathsFromTableFile("t4", dumpPath, conf).isEmpty());
+ assertFalse(getPathsFromTableFile("t2", dumpPath, conf).isEmpty());
+ assertFalse(getPathsFromTableFile("t4_managed", dumpPath, conf).isEmpty());
+ assertFalse(getPathsFromTableFile("t2_managed", dumpPath, conf).isEmpty());
+
+ // Check the dropped and empty tables.
+ assertTrue(getPathsFromTableFile("t5_managed", dumpPath, conf).isEmpty());
+ assertTrue(getPathsFromTableFile("t6_managed", dumpPath, conf).size() ==
1);
+ }
+
+ @Test
+ public void testEmptyDiffForControlFailover() throws Throwable {
+
+ // In case of control failover both A & B will be in sync, so the table
diff should be created empty, without any
+ // error.
+ List<String> withClause =
ReplicationTestUtils.includeExternalTableClause(true);
+ withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" +
primary.repldDir + "'");
+
+ // Do a bootstrap cycle(A->B)
+ primary.dump(primaryDbName, withClause);
+ replica.load(replicatedDbName, primaryDbName, withClause);
+
+ // Add some tables & do a incremental dump.
+ WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+ .run("create external table t1 (id int)")
+ .run("insert into table t1 values (100),(200)")
+ .run("insert into table t1 values (12),(35),(46)")
+ .run("create table t1_managed (id int)")
+ .run("insert into table t1_managed values (120)")
+ .run("insert into table t1_managed values (10),(321),(423)")
+ .dump(primaryDbName, withClause);
+
+ // Do an incremental load and see all the tables are there.
+ replica.load(replicatedDbName, primaryDbName, withClause)
+ .run("repl status " + replicatedDbName)
+ .verifyResult(tuple.lastReplicationId).run("use " + replicatedDbName)
+ .run("show tables like 't1'")
+ .verifyResult("t1")
+ .run("show tables like 't1_managed'")
+ .verifyResult("t1_managed");
+
+ // Trigger reverse cycle. Do dump on target cluster.
+ DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+ Path newReplDir = new Path(replica.repldDir + "rev");
+ replicaFs.mkdirs(newReplDir);
+ withClause = ReplicationTestUtils.includeExternalTableClause(true);
+ withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" +
newReplDir + "'");
+
+ // Do a reverse dump
+ tuple = replica.dump(replicatedDbName, withClause);
+
+ // Even though no diff, the event ack file should be created.
+ assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + "
doesn't exist",
+ replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
+
+ // Do a reverse load.
+ primary.load(primaryDbName, replicatedDbName, withClause);
+
+ // Check the table diff directory still gets created.
+ assertTrue(new Path(tuple.dumpLocation,
TABLE_DIFF_COMPLETE_DIRECTORY).toString() + " doesn't exist",
+ replicaFs.exists(new Path(tuple.dumpLocation,
TABLE_DIFF_COMPLETE_DIRECTORY)));
+
+ // Check the table diff is empty, since we are in sync, so no tables got
modified.
+ HashSet<String> tableDiffEntries = getTablesFromTableDiffFile(new
Path(tuple.dumpLocation), conf);
+ assertEquals("Table diff is not empty, contains :" + tableDiffEntries, 0,
tableDiffEntries.size());
+ }
+
+ @Test
+ public void testFirstIncrementalMandatory() throws Throwable {
+ List<String> withClause =
ReplicationTestUtils.includeExternalTableClause(true);
+ withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" +
primary.repldDir + "'");
+ // Create one external and one managed tables and do a bootstrap dump.
+ WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+ .run("create external table t1 (id int)")
+ .run("insert into table t1 values (1)")
+ .run("insert into table t1 values (2),(3),(4)")
+ .run("create table t1_managed (id int)")
+ .run("insert into table t1_managed values (10)")
+ .run("insert into table t1_managed values (20),(31),(42)")
+ .dump(primaryDbName, withClause);
+
+ // Do a bootstrap load and check both managed and external tables are
loaded.
+ replica.load(replicatedDbName, primaryDbName, withClause)
+ .run("repl status " + replicatedDbName)
+ .verifyResult(tuple.lastReplicationId).run("use " + replicatedDbName)
+ .run("show tables like 't1'")
+ .verifyResult("t1")
+ .run("show tables like 't1_managed'")
+ .verifyResult("t1_managed")
+ .verifyReplTargetProperty(replicatedDbName);
+
+ // Trigger reverse dump just after the bootstrap cycle.
+ DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+ Path newReplDir = new Path(replica.repldDir + "1");
+ replicaFs.mkdirs(newReplDir);
+ withClause = ReplicationTestUtils.includeExternalTableClause(true);
+ withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" +
newReplDir + "'");
+
+ // Do a dump on cluster B, it should throw an exception, since the first
incremental isn't done yet.
+ try {
+ replica.dump(replicatedDbName, withClause);
+ } catch (HiveException he) {
+ assertTrue(he.getMessage()
+ .contains("Replication dump not allowed for replicated database with
first incremental dump pending : " + replicatedDbName));
+ }
+
+ // Do a incremental cycle and check we don't get this exception.
+ withClause = ReplicationTestUtils.includeExternalTableClause(true);
+ withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" +
primary.repldDir + "'");
+ primary.dump(primaryDbName, withClause);
+ replica.load(replicatedDbName, primaryDbName, withClause);
+
+ // Retrigger reverse dump, this time it should be successful and event ack
should get created.
+ withClause = ReplicationTestUtils.includeExternalTableClause(true);
+ withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" +
newReplDir + "'");
+
+ tuple = replica.dump(replicatedDbName, withClause);
+
+ // Check event ack file should get created.
+ assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + "
doesn't exist",
+ replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
+ }
+
+ @Test
+ public void testFailureCasesInTableDiffGeneration() throws Throwable {
+ List<String> withClause =
ReplicationTestUtils.includeExternalTableClause(true);
+ withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" +
primary.repldDir + "'");
+
+ // Do a bootstrap cycle(A->B)
+ primary.dump(primaryDbName, withClause);
+ replica.load(replicatedDbName, primaryDbName, withClause);
+
+ // Add some table & do an incremental dump.
+ WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+ .run("create external table t1 (id int)")
+ .run("insert into table t1 values (1)")
+ .run("create table t1_managed (id string)")
+ .run("insert into table t1_managed values ('A')")
+ .dump(primaryDbName, withClause);
+
+ // Do an incremental load and check the tables are there.
+ replica.load(replicatedDbName, primaryDbName, withClause)
+ .run("repl status " + replicatedDbName)
+ .verifyResult(tuple.lastReplicationId).run("use " + replicatedDbName)
+ .run("show tables like 't1'")
+ .verifyResult("t1")
+ .run("show tables like 't1_managed'")
+ .verifyResult("t1_managed")
+ .verifyReplTargetProperty(replicatedDbName);
+
+ // Do some modifications on the source cluster, so we have some entries in
the table diff.
+ primary.run("use " + primaryDbName)
+ .run("create table t2_managed (id string)")
+ .run("insert into table t1_managed values ('S')")
+ .run("insert into table t2_managed values ('A'),('B'),('C')");
+
+ // Do some modifications in another database to have unrelated events as
well after the last load, which should
+ // get filtered.
+
+ primary.run("create database " + extraPrimaryDb)
+ .run("use " + extraPrimaryDb)
+ .run("create external table t1 (id int)")
+ .run("insert into table t1 values (15),(1),(96)")
+ .run("create table t1_managed (id string)")
+ .run("insert into table t1_managed values ('SA'),('PS')");
+
+ // Prepare for reverse replication.
+ DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+ Path newReplDir = new Path(replica.repldDir + "reverse");
+ replicaFs.mkdirs(newReplDir);
+ withClause = ReplicationTestUtils.includeExternalTableClause(true);
+ withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" +
newReplDir + "'");
+
+ // Trigger dump on target cluster.
+
+ replicaFs.setQuota(newReplDir, 1, 10000);
+ try {
+ tuple = replica.dump(replicatedDbName, withClause);
+ fail("Should have failed due to quota violation");
+ } catch (Exception e) {
+ // Ignore it is expected due to Quota violation.
+ }
+ // Check the event_ack file doesn't exist.
+ assertFalse("event ack file exists despite quota violation",
replicaFs.listFiles(newReplDir, true).hasNext());
+
+ // Set the quota to a value that makes sure event ack file gets created
and then fails
+ replicaFs.setQuota(newReplDir,
replicaFs.getQuotaUsage(newReplDir).getFileAndDirectoryCount() + 3,
QUOTA_RESET);
+ try {
+ tuple = replica.dump(replicatedDbName, withClause);
+ fail("Should have failed due to quota violation");
+ } catch (Exception e) {
+ // Ignore it is expected due to Quota violation.
+ }
+
+ // Check the event ack file got created despite exception and failure.
+ assertEquals("event_ack", replicaFs.listFiles(newReplDir,
true).next().getPath().getName());
+
+ // Remove quota for a successful dump
+ replicaFs.setQuota(newReplDir, QUOTA_RESET, QUOTA_RESET);
+
+ // Retry Dump
+ tuple = replica.dump(replicatedDbName, withClause);
+
+ // Check event ack file is there.
+ assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + "
doesn't exist",
+ replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
+
+ // Set quota again to restrict creation of table diff in middle during
load.
+ replicaFs.setQuota(newReplDir,
replicaFs.getQuotaUsage(newReplDir).getFileAndDirectoryCount() + 2,
QUOTA_RESET);
+
+ try {
+ primary.load(primaryDbName, replicatedDbName, withClause);
+ } catch (Exception e) {
+ // Ignore, expected due to quota violation.
+ }
+
+ // Check table diff in progress directory gets created.
+ assertTrue(new Path(tuple.dumpLocation,
TABLE_DIFF_INPROGRESS_DIRECTORY).toString() + " doesn't exist",
+ replicaFs.exists(new Path(tuple.dumpLocation,
TABLE_DIFF_INPROGRESS_DIRECTORY)));
+
+ // Check table diff complete directory doesn't gets created.
+ assertFalse(new Path(tuple.dumpLocation,
TABLE_DIFF_COMPLETE_DIRECTORY).toString() + " exist",
+ replicaFs.exists(new Path(tuple.dumpLocation,
TABLE_DIFF_COMPLETE_DIRECTORY)));
+
+ // Set Quota to a value so that table diff complete gets created and we
fail post that.
+ replicaFs.setQuota(newReplDir,
replicaFs.getQuotaUsage(newReplDir).getFileAndDirectoryCount() + 1,
QUOTA_RESET);
+ try {
+ primary.load(primaryDbName, replicatedDbName, withClause);
+ fail("Expected failure due to quota violation");
+ } catch (Exception e) {
+ // Ignore, expected due to quota violation.
+ }
+
+ // Check table diff complete directory gets created.
+ assertTrue(new Path(tuple.dumpLocation,
TABLE_DIFF_COMPLETE_DIRECTORY).toString() + " doesn't exist",
+ replicaFs.exists(new Path(tuple.dumpLocation,
TABLE_DIFF_COMPLETE_DIRECTORY)));
+
+ // Remove the quota and see everything recovers.
+ replicaFs.setQuota(newReplDir, QUOTA_RESET, QUOTA_RESET);
+ primary.load(primaryDbName, replicatedDbName, withClause);
+
+ // Check table diff in complete directory gets created.
+ assertTrue(new Path(tuple.dumpLocation,
TABLE_DIFF_COMPLETE_DIRECTORY).toString() + " doesn't exist",
+ replicaFs.exists(new Path(tuple.dumpLocation,
TABLE_DIFF_COMPLETE_DIRECTORY)));
+
+ // Check table diff in progress directory isn't there now.
+ assertFalse(new Path(tuple.dumpLocation,
TABLE_DIFF_INPROGRESS_DIRECTORY).toString() + " exist",
+ replicaFs.exists(new Path(tuple.dumpLocation,
TABLE_DIFF_INPROGRESS_DIRECTORY)));
+
+ // Check the entries in table diff are correct.
+ HashSet<String> tableDiffEntries = getTablesFromTableDiffFile(new
Path(tuple.dumpLocation), conf);
+ assertTrue("Table Diff Contains " + tableDiffEntries,
+ tableDiffEntries.containsAll(Arrays.asList("t1_managed",
"t2_managed")));
+ }
+}
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
index d05ff85..b3ddbf5 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
@@ -3031,30 +3031,6 @@ public class TestReplicationScenariosAcidTables extends
BaseReplicationScenarios
assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
}
- @Test
- public void testReplTargetOfReplication() throws Throwable {
- // Bootstrap
- WarehouseInstance.Tuple bootstrapDump = prepareDataAndDump(primaryDbName,
null);
- replica.load(replicatedDbName,
primaryDbName).verifyReplTargetProperty(replicatedDbName);
- verifyLoadExecution(replicatedDbName, bootstrapDump.lastReplicationId,
true);
-
- //Try to do a dump on replicated db. It should fail
- replica.run("alter database " + replicatedDbName + " set dbproperties
('repl.source.for'='1')");
- try {
- replica.dump(replicatedDbName);
- } catch (Exception e) {
- Assert.assertEquals("Cannot dump database as it is a Target of
replication.", e.getMessage());
-
Assert.assertEquals(ErrorMsg.REPL_DATABASE_IS_TARGET_OF_REPLICATION.getErrorCode(),
- ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode());
- }
- replica.run("alter database " + replicatedDbName + " set dbproperties
('repl.source.for'='')");
-
- //Try to dump a different db on replica. That should succeed
- replica.run("create database " + replicatedDbName + "_extra with
dbproperties ('repl.source.for' = '1, 2, 3')")
- .dump(replicatedDbName + "_extra");
- replica.run("drop database if exists " + replicatedDbName + "_extra
cascade");
- }
-
private void verifyPathExist(FileSystem fs, Path filePath) throws
IOException {
assertTrue("Path not found:" + filePath, fs.exists(filePath));
}
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
index 37c3970..d9c63c6 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
@@ -1084,10 +1084,8 @@ public class TestReplicationScenariosAcrossInstances
extends BaseReplicationAcro
.verifyResult(tuplePrimary.lastReplicationId)
.run("show tblproperties t1('custom.property')")
.verifyResults(new String[] { "custom.property\tcustom.value" })
- .dumpFailure(replicatedDbName)
.run("alter database " + replicatedDbName
- + " set dbproperties ('" + SOURCE_OF_REPLICATION + "' =
'1, 2, 3')")
- .dumpFailure(replicatedDbName); //can not dump the db before
first successful incremental load is done.
+ + " set dbproperties ('" + SOURCE_OF_REPLICATION + "' =
'1, 2, 3')");
// do a empty incremental load to allow dump of replicatedDbName
WarehouseInstance.Tuple temp = primary.dump(primaryDbName,
Collections.emptyList());
@@ -1178,8 +1176,6 @@ public class TestReplicationScenariosAcrossInstances
extends BaseReplicationAcro
replica.load(replicatedDbName, primaryDbName);
assertTrue(MetaStoreUtils.isTargetOfReplication(replica.getDatabase(replicatedDbName)));
- replica.dumpFailure(replicatedDbName); //can not dump db which is target
of replication
-
replica.run("ALTER DATABASE " + replicatedDbName + " Set
DBPROPERTIES('repl.target.for' = '')");
assertFalse(MetaStoreUtils.isTargetOfReplication(replica.getDatabase(replicatedDbName)));
replica.dump(replicatedDbName);
@@ -1192,7 +1188,6 @@ public class TestReplicationScenariosAcrossInstances
extends BaseReplicationAcro
replica.getDatabase(replicatedDbName).getParameters());
assertTrue(MetaStoreUtils.isTargetOfReplication(replica.getDatabase(replicatedDbName)));
- replica.dumpFailure(replicatedDbName); //Cannot dump database which is
target of replication.
}
@Test
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
index 1191267..eee5702 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
@@ -227,7 +227,7 @@ public class WarehouseInstance implements Closeable {
driver.getResults(lastResults);
}
// Split around the 'tab' character
- return (lastResults.get(0).split("\\t"))[colNum];
+ return !lastResults.isEmpty() ? (lastResults.get(0).split("\\t"))[colNum]
: "";
}
public WarehouseInstance run(String command) throws Throwable {
@@ -388,7 +388,7 @@ public class WarehouseInstance implements Closeable {
List<String> lowerCaseData =
Arrays.stream(data).map(String::toLowerCase).collect(Collectors.toList());
assertEquals(data.length, filteredResults.size());
- assertTrue(StringUtils.join(filteredResults, ",") + " does not contain all
expected" + StringUtils
+ assertTrue(StringUtils.join(filteredResults, ",") + " does not contain all
expected " + StringUtils
.join(lowerCaseData, ","),
filteredResults.containsAll(lowerCaseData));
return this;
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/OptimisedBootstrapUtils.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/OptimisedBootstrapUtils.java
new file mode 100644
index 0000000..85bbbec
--- /dev/null
+++
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/OptimisedBootstrapUtils.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import
org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
+import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static
org.apache.hadoop.hive.ql.parse.ReplicationSpec.getLastReplicatedStateFromParameters;
+
+/**
+ * Utility class for handling operations regarding optimised bootstrap in case
of replication.
+ */
+public class OptimisedBootstrapUtils {
+
+ /** Separator used to separate entries in the listing. */
+ public static final String FILE_ENTRY_SEPARATOR = "#";
+ private static Logger LOG =
LoggerFactory.getLogger(OptimisedBootstrapUtils.class);
+
+ /** table diff directory when in progress */
+ public static final String TABLE_DIFF_INPROGRESS_DIRECTORY = "table_diff";
+
+ /** table diff directory when complete */
+ public static final String TABLE_DIFF_COMPLETE_DIRECTORY =
"table_diff_complete";
+
+ /** event ack file which contains the event id till which the cluster was
last loaded. */
+ public static final String EVENT_ACK_FILE = "event_ack";
+
+ /**
+ * Gets & checks whether the database is target of replication.
+ * @param dbName name of database
+ * @param hive hive object
+ * @return true, if the database has repl.target.for property set.
+ * @throws HiveException
+ */
+ public static boolean isFailover(String dbName, Hive hive) throws
HiveException {
+ Database database = hive.getDatabase(dbName);
+ return database != null ? MetaStoreUtils.isTargetOfReplication(database) :
false;
+ }
+
+ public static boolean checkFileExists(Path dumpPath, HiveConf conf, String
fileName) throws IOException {
+ FileSystem fs = dumpPath.getFileSystem(conf);
+ return fs.exists(new Path(dumpPath, fileName));
+ }
+
+ /**
+ * Gets the event id from the event ack file
+ * @param dumpPath the dump path
+ * @param conf the hive configuration
+ * @return the event id from file.
+ * @throws IOException
+ */
+ public static String getEventIdFromFile(Path dumpPath, HiveConf conf) throws
IOException {
+ String lastEventId;
+ Path eventAckFilePath = new Path(dumpPath, EVENT_ACK_FILE);
+ FileSystem fs = eventAckFilePath.getFileSystem(conf);
+ try (FSDataInputStream stream = fs.open(eventAckFilePath);) {
+ lastEventId = IOUtils.toString(stream, Charset.defaultCharset());
+ }
+ return lastEventId.replaceAll(System.lineSeparator(),"").trim();
+ }
+
+ /**
+ * Gets the name of tables in the table diff file.
+ * @param dumpPath the dump path
+ * @param conf the hive configuration
+ * @return Set with list of tables
+ * @throws Exception
+ */
+ public static HashSet<String> getTablesFromTableDiffFile(Path dumpPath,
HiveConf conf) throws Exception {
+ FileSystem fs = dumpPath.getFileSystem(conf);
+ Path tableDiffPath = new Path(dumpPath, TABLE_DIFF_COMPLETE_DIRECTORY);
+ FileStatus[] list = fs.listStatus(tableDiffPath);
+ HashSet<String> tables = new HashSet<>();
+ for (FileStatus fStatus : list) {
+ tables.add(fStatus.getPath().getName());
+ }
+ return tables;
+ }
+
+ /**
+ * Extracts the recursive listing from the table file.
+ * @param file the name of table
+ * @param dumpPath the dump path
+ * @param conf the hive conf
+ * @return the list of paths in the table.
+ * @throws IOException
+ */
+ public static HashSet<String> getPathsFromTableFile(String file, Path
dumpPath, HiveConf conf) throws IOException {
+ HashSet<String> paths = new HashSet<>();
+ FileSystem fs = dumpPath.getFileSystem(conf);
+ Path tableDiffPath = new Path(dumpPath, TABLE_DIFF_COMPLETE_DIRECTORY);
+ Path filePath = new Path(tableDiffPath, file);
+ String allEntries;
+ try (FSDataInputStream stream = fs.open(filePath);) {
+ allEntries = IOUtils.toString(stream, Charset.defaultCharset());
+ }
+
paths.addAll(Arrays.asList(allEntries.split(System.lineSeparator())).stream().filter(item
-> !item.isEmpty())
+ .collect(Collectors.toSet()));
+ return paths;
+ }
+
+ /**
+ * Gets the event id stored in database denoting the last loaded event id.
+ * @param dbName the name of database
+ * @param hiveDb the hive object
+ * @return event id from the database
+ * @throws HiveException
+ */
+ public static String getReplEventIdFromDatabase(String dbName, Hive hiveDb)
throws HiveException {
+ Database database = hiveDb.getDatabase(dbName);
+ String currentLastEventId =
getLastReplicatedStateFromParameters(database.getParameters());
+ return currentLastEventId;
+ }
+
+ /**
+ * Validates if the first incremental is done before starting optimised
bootstrap
+ * @param dbName name of database
+ * @param hiveDb the hive object
+ * @throws HiveException
+ */
+ public static void isFirstIncrementalPending(String dbName, Hive hiveDb)
throws HiveException {
+ Database database = hiveDb.getDatabase(dbName);
+ if (database == null ||
ReplUtils.isFirstIncPending(database.getParameters()))
+ throw new HiveException(
+ "Replication dump not allowed for replicated database with first
incremental dump pending : " + dbName);
+ }
+
+ /**
+ * Creates the event ack file and sets the dump metadata post that marking
completion of dump flow for first round
+ * of optimised failover dump.
+ * @param currentDumpPath the dump path
+ * @param dmd the dump metadata
+ * @param cmRoot the cmRoot
+ * @param dbEventId the database event id to which we have to write in the
file.
+ * @param conf the hive configuraiton
+ * @param work the repldump work
+ * @return the lastReplId denoting a fake dump(-1) always
+ * @throws SemanticException
+ */
+ public static Long createAndGetEventAckFile(Path currentDumpPath,
DumpMetaData dmd, Path cmRoot, String dbEventId,
+ HiveConf conf, ReplDumpWork work)
+ throws SemanticException {
+ // Keep an invalid value for lastReplId, to denote it isn't a actual dump.
+ Long lastReplId = -1L;
+ Path filePath = new Path(currentDumpPath, EVENT_ACK_FILE);
+ Utils.writeOutput(dbEventId, filePath, conf);
+ LOG.info("Created event_ack file at {} with eventId {}", filePath,
dbEventId);
+ work.setResultValues(Arrays.asList(currentDumpPath.toUri().toString(),
String.valueOf(lastReplId)));
+ dmd.setDump(DumpType.INCREMENTAL, work.eventFrom, lastReplId, cmRoot, -1L,
false);
+ dmd.write(true);
+ return lastReplId;
+ }
+
+ /**
+ * Prepares the table diff file, with tables modified post the specified
event id.
+ * @param eventId the event id after which tables should be modified
+ * @param hiveDb the hive object
+ * @param work the load work
+ * @param conf hive configuration
+ * @throws Exception
+ */
+ public static void prepareTableDiffFile(Long eventId, Hive hiveDb,
ReplLoadWork work, HiveConf conf)
+ throws Exception {
+ // Get the notification events.
+ List<NotificationEvent> notificationEvents =
+ hiveDb.getMSC().getNextNotification(eventId - 1, -1, new
DatabaseAndTableFilter(work.dbNameToLoadIn, null))
+ .getEvents();
+
+ // Check the first eventId fetched is the same as what we fed, to ensure
the events post that hasn't expired.
+ if (notificationEvents.get(0).getEventId() != eventId) {
+ throw new Exception("Failover notification events expired.");
+ }
+ // Remove the first one, it is already loaded, we fetched it to confirm
the notification events post that haven't
+ // expired.
+ notificationEvents.remove(0);
+ HashSet<String> modifiedTables = new HashSet<>();
+ for (NotificationEvent event : notificationEvents) {
+ String tableName = event.getTableName();
+ if (tableName != null) {
+ LOG.debug("Added table {} because of eventId {} and eventType {}",
event.getTableName(), event.getEventId(),
+ event.getEventType());
+ modifiedTables.add(event.getTableName());
+ }
+ }
+ Path dumpPath = new Path(work.dumpDirectory).getParent();
+ FileSystem fs = dumpPath.getFileSystem(conf);
+ Path diffFilePath = new Path(dumpPath, TABLE_DIFF_INPROGRESS_DIRECTORY);
+ fs.mkdirs(diffFilePath);
+ for (String table : modifiedTables) {
+ String tables = "";
+ LOG.info("Added table {} to table diff", table);
+ ArrayList<String> pathList = getListing(work.dbNameToLoadIn, table,
hiveDb, conf);
+ for (String path : pathList) {
+ tables += path + System.lineSeparator();
+ }
+ Utils.writeOutput(tables, new Path(diffFilePath, table), conf);
+ }
+ LOG.info("Completed writing table diff progress file at {} with entries
{}", dumpPath, modifiedTables);
+ // The operation is complete, we can rename to TABLE_DIFF_COMPLETE
+ fs.rename(diffFilePath, new Path(dumpPath, TABLE_DIFF_COMPLETE_DIRECTORY));
+ LOG.info("Completed renaming table diff progress file to table diff
complete file.");
+ }
+
+ private static ArrayList<String> getListing(String dbName, String tableName,
Hive hiveDb, HiveConf conf)
+ throws HiveException, IOException {
+ ArrayList<String> paths = new ArrayList<>();
+ Table table = hiveDb.getTable(dbName, tableName, false);
+ if (table == null) {
+ LOG.info("Table {} not found, excluding the dropped table", tableName);
+ // If the table is not there, return an empty list of paths, we would
need to copy the entire stuff.
+ return new ArrayList<>();
+ }
+ Path tableLocation = new Path(table.getSd().getLocation());
+ paths.add(table.getSd().getLocation());
+ FileSystem tableFs = tableLocation.getFileSystem(conf);
+ buildListingForDirectory(paths, tableLocation, tableFs);
+ // Check if the table is partitioned, in case the table is partitioned we
need to check for the partitions
+ // listing as well.
+ if (table.isPartitioned()) {
+ List<Partition> partitions = hiveDb.getPartitions(table);
+ for (Partition part : partitions) {
+ Path partPath = part.getDataLocation();
+ // Build listing for the partition only if it doesn't lies within the
table location, else it would have been
+ // already included as part of recursive listing of table directory.
+ if (!FileUtils.isPathWithinSubtree(partPath, tableLocation)) {
+ buildListingForDirectory(paths, partPath, tableFs);
+ }
+ }
+ }
+ return paths;
+ }
+
+ private static void buildListingForDirectory(ArrayList<String> listing, Path
tableLocation, FileSystem tableFs)
+ throws IOException {
+ if (!tableFs.exists(tableLocation)) {
+ return;
+ }
+ RemoteIterator<FileStatus> itr = tableFs.listStatusIterator(tableLocation);
+ while (itr.hasNext()) {
+ FileStatus fstatus = itr.next();
+ if (fstatus.isDirectory()) {
+ listing.add(fstatus.getPath().toString());
+ buildListingForDirectory(listing, fstatus.getPath(), tableFs);
+ } else {
+ listing.add(fstatus.getPath() + FILE_ENTRY_SEPARATOR +
fstatus.getLen() + FILE_ENTRY_SEPARATOR + tableFs
+ .getFileChecksum(fstatus.getPath()));
+ }
+ }
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index de18fe6..a9d728d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -125,6 +125,14 @@ import static
org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_EXTERNAL_WAREHO
import static
org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_SNAPSHOT_DIFF_FOR_EXTERNAL_TABLE_COPY;
import static
org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
import static
org.apache.hadoop.hive.metastore.ReplChangeManager.getReplPolicyIdString;
+import static
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.EVENT_ACK_FILE;
+import static
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.TABLE_DIFF_COMPLETE_DIRECTORY;
+import static
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.checkFileExists;
+import static
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.createAndGetEventAckFile;
+import static
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getEventIdFromFile;
+import static
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getReplEventIdFromDatabase;
+import static
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.isFailover;
+import static
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.isFirstIncrementalPending;
import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT;
import static
org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.RANGER_AUTHORIZER;
import static
org.apache.hadoop.hive.ql.exec.repl.util.SnapshotUtils.cleanupSnapshots;
@@ -139,6 +147,7 @@ public class ReplDumpTask extends Task<ReplDumpWork>
implements Serializable {
private static final long SLEEP_TIME_FOR_TESTS = 30000;
private Set<String> tablesForBootstrap = new HashSet<>();
private List<TxnType> excludedTxns = Arrays.asList(TxnType.READ_ONLY,
TxnType.REPL_CREATED);
+ private boolean createEventMarker = false;
public enum ConstraintFileType {COMMON("common", "c_"), FOREIGNKEY("fk",
"f_");
private final String name;
@@ -178,19 +187,21 @@ public class ReplDumpTask extends Task<ReplDumpWork>
implements Serializable {
}
Path previousValidHiveDumpPath =
getPreviousValidDumpMetadataPath(dumpRoot);
boolean isFailoverMarkerPresent = false;
- if (previousValidHiveDumpPath == null) {
+ boolean isFailover = isFailover(work.dbNameOrPattern, getHive());
+ LOG.debug("Database is {} going through failover", isFailover ? "" :
"not");
+ if (previousValidHiveDumpPath == null && !isFailover) {
work.setBootstrap(true);
} else {
- work.setOldReplScope(new DumpMetaData(previousValidHiveDumpPath,
conf).getReplScope());
- isFailoverMarkerPresent =
isDumpFailoverReady(previousValidHiveDumpPath);
+ work.setOldReplScope(isFailover ? null : new
DumpMetaData(previousValidHiveDumpPath, conf).getReplScope());
+ isFailoverMarkerPresent = !isFailover &&
isDumpFailoverReady(previousValidHiveDumpPath);
}
//Proceed with dump operation in following cases:
//1. No previous dump is present.
//2. Previous dump is already loaded and it is not in failover ready
status.
- if (shouldDump(previousValidHiveDumpPath, isFailoverMarkerPresent)) {
+ if (shouldDump(previousValidHiveDumpPath, isFailoverMarkerPresent,
isFailover)) {
Path currentDumpPath = getCurrentDumpPath(dumpRoot,
work.isBootstrap());
Path hiveDumpRoot = new Path(currentDumpPath,
ReplUtils.REPL_HIVE_BASE_DIR);
- if (!work.isBootstrap()) {
+ if (!work.isBootstrap() && !isFailover) {
preProcessFailoverIfRequired(previousValidHiveDumpPath,
isFailoverMarkerPresent);
}
// Set distCp custom name corresponding to the replication policy.
@@ -211,14 +222,37 @@ public class ReplDumpTask extends Task<ReplDumpWork>
implements Serializable {
Path cmRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLCMDIR));
Long lastReplId;
LOG.info("Data copy at load enabled : {}",
conf.getBoolVar(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET));
- if (work.isBootstrap()) {
+ if (isFailover) {
+ LOG.info("Optimised Bootstrap Dump triggered for {}.",
work.dbNameOrPattern);
+ // Before starting optimised bootstrap, check if the first
incremental is done to ensure database is in
+ // consistent state.
+ isFirstIncrementalPending(work.dbNameOrPattern, getHive());
+ // Get the last replicated event id from the database.
+ String dbEventId =
getReplEventIdFromDatabase(work.dbNameOrPattern, getHive());
+ // Check if the tableDiff directory is present or not.
+ boolean isTableDiffDirectoryPresent =
checkFileExists(currentDumpPath, conf, TABLE_DIFF_COMPLETE_DIRECTORY);
+ if (createEventMarker) {
+ LOG.info("Creating event_ack file for database {} with event id
{}.", work.dbNameOrPattern, dbEventId);
+ lastReplId = createAndGetEventAckFile(currentDumpPath, dmd,
cmRoot, dbEventId, conf, work);
+ finishRemainingTasks();
+ } else {
+ // We should be here only if TableDiff is Present.
+ assert isTableDiffDirectoryPresent;
+ // TODO: Dump using TableDiff file & get lastReplId
+ lastReplId = -1L;
+ }
+ }
+ else if (work.isBootstrap()) {
lastReplId = bootStrapDump(hiveDumpRoot, dmd, cmRoot, getHive());
} else {
work.setEventFrom(getEventFromPreviousDumpMetadata(previousValidHiveDumpPath));
lastReplId = incrementalDump(hiveDumpRoot, dmd, cmRoot, getHive());
}
-
work.setResultValues(Arrays.asList(currentDumpPath.toUri().toString(),
String.valueOf(lastReplId)));
- initiateDataCopyTasks();
+ // The datacopy doesn't need to be initialised in case of optimised
bootstrap first dump.
+ if (lastReplId >= 0) {
+
work.setResultValues(Arrays.asList(currentDumpPath.toUri().toString(),
String.valueOf(lastReplId)));
+ initiateDataCopyTasks();
+ }
} else {
if (isFailoverMarkerPresent) {
LOG.info("Previous Dump is failover ready. Skipping this
iteration.");
@@ -487,16 +521,46 @@ public class ReplDumpTask extends Task<ReplDumpWork>
implements Serializable {
return false;
}
- private boolean shouldDump(Path previousDumpPath, boolean
isFailoverMarkerPresent) throws IOException {
+ private boolean shouldDump(Path previousDumpPath, boolean
isFailoverMarkerPresent, boolean isFailover)
+ throws IOException, HiveException {
/** a) If there is no previous dump dir found, the current run is
bootstrap case.
* b) If the previous dump was successful and it contains failover marker
file as well as
* HiveConf.ConfVars.HIVE_REPL_FAILOVER_START == true, last dump was a
controlled failover dump,
* skip doing any further dump.
*/
if (previousDumpPath == null) {
+ createEventMarker = isFailover;
return true;
} else if (isFailoverMarkerPresent && shouldFailover()) {
return false;
+ } else if (isFailover) {
+ // In case of OptimisedBootstrap Failover, We need to do a dump in case:
+ // 1. No EVENT_ACK file is there.
+ // 2. EVENT_ACK file and TABLE_DIFF_COMPLETE file is also there and the
current database id is same as that in
+ // the EVENT_ACK file
+ boolean isEventAckFilePresent =
checkFileExists(previousDumpPath.getParent(), conf, EVENT_ACK_FILE);
+ if (!isEventAckFilePresent) {
+ // If in the previous valid dump path, Event_Ack isn't there that
means the previous one was a normal dump,
+ // we need to trigger the failover dump
+ LOG.debug("EVENT_ACK file not found in {}. Proceeding with
OptimisedBootstrap Failover",
+ previousDumpPath.getParent());
+ createEventMarker = true;
+ return true;
+ }
+ // Event_ACK file is present check if it contains correct value or not.
+ String fileEventId = getEventIdFromFile(previousDumpPath.getParent(),
conf);
+ String dbEventId = getReplEventIdFromDatabase(work.dbNameOrPattern,
getHive()).trim();
+ if (!dbEventId.equalsIgnoreCase(fileEventId)) {
+ // In case the database event id changed post table_diff_complete
generation, that means both forward &
+ // backward policies are operational, We fail in that case with
non-recoverable error.
+ LOG.error("The database eventID {} and the event id in the EVENT_ACK
file {} both mismatch. FilePath {}",
+ dbEventId, fileEventId, previousDumpPath.getParent());
+ throw new RuntimeException("Database event id changed post table diff
generation.");
+ } else {
+ // Check table_diff_complete and Load_ACK
+ return checkFileExists(previousDumpPath.getParent(), conf,
TABLE_DIFF_COMPLETE_DIRECTORY) && checkFileExists(previousDumpPath,
+ conf, LOAD_ACKNOWLEDGEMENT.toString());
+ }
} else {
FileSystem fs = previousDumpPath.getFileSystem(conf);
return fs.exists(new Path(previousDumpPath,
LOAD_ACKNOWLEDGEMENT.toString()));
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
index bac747f..3a2dda9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.exec.repl;
import org.apache.hadoop.hive.common.repl.ReplConst;
import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.hive.metastore.ReplChangeManager;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.ql.exec.repl.util.SnapshotUtils;
import org.apache.hadoop.hive.ql.parse.repl.load.log.IncrementalLoadLogger;
@@ -88,6 +89,10 @@ import java.util.LinkedList;
import static
org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_DUMP_SKIP_IMMUTABLE_DATA_COPY;
import static
org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_SNAPSHOT_DIFF_FOR_EXTERNAL_TABLE_COPY;
+import static
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.TABLE_DIFF_COMPLETE_DIRECTORY;
+import static
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.checkFileExists;
+import static
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getEventIdFromFile;
+import static
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.prepareTableDiffFile;
import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_METADATA;
import static
org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.getExternalTableBaseDir;
import static
org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadDatabase.AlterDatabase;
@@ -680,6 +685,30 @@ public class ReplLoadTask extends Task<ReplLoadWork>
implements Serializable {
}
Database targetDb = getHive().getDatabase(work.dbNameToLoadIn);
Map<String, String> props = new HashMap<>();
+
+ // Check if it is a optimised bootstrap failover.
+ if (work.isFailover) {
+ // Check it should be marked as target of replication & not source of
replication.
+ if (MetaStoreUtils.isTargetOfReplication(targetDb)) {
+ LOG.error("The database {} is already marked as target for
replication", targetDb.getName());
+ throw new Exception("Failover target is already marked as target");
+ }
+ if (!ReplChangeManager.isSourceOfReplication(targetDb)) {
+ LOG.error("The database {} is already source of replication.",
targetDb.getName());
+ throw new Exception("Failover target was not source of replication");
+ }
+ boolean isTableDiffPresent =
+ checkFileExists(new Path(work.dumpDirectory).getParent(), conf,
TABLE_DIFF_COMPLETE_DIRECTORY);
+ Long eventId = Long.parseLong(getEventIdFromFile(new
Path(work.dumpDirectory).getParent(), conf));
+ if (!isTableDiffPresent) {
+ prepareTableDiffFile(eventId, getHive(), work, conf);
+ if (this.childTasks == null) {
+ this.childTasks = new ArrayList<>();
+ }
+ createReplLoadCompleteAckTask();
+ return 0;
+ }
+ }
if (!MetaStoreUtils.isTargetOfReplication(targetDb)) {
props.put(ReplConst.TARGET_OF_REPLICATION, ReplConst.TRUE);
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
index 7c51df5..cf38651 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
@@ -58,6 +58,7 @@ import java.util.stream.Collectors;
import static
org.apache.hadoop.hive.conf.Constants.SCHEDULED_QUERY_EXECUTIONID;
import static
org.apache.hadoop.hive.conf.Constants.SCHEDULED_QUERY_SCHEDULENAME;
import static
org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_STATS_TOP_EVENTS_COUNTS;
+import static
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.checkFileExists;
@Explain(displayName = "Replication Load Operator", explainLevels = {
Explain.Level.USER,
Explain.Level.DEFAULT,
@@ -86,6 +87,7 @@ public class ReplLoadWork implements Serializable,
ReplLoadWorkMBean {
private String scheduledQueryName;
private String executionId;
private boolean shouldFailover;
+ public boolean isFailover;
/*
these are sessionState objects that are copied over to work to allow for
parallel execution.
@@ -134,6 +136,8 @@ public class ReplLoadWork implements Serializable,
ReplLoadWorkMBean {
FileSystem fs = failoverReadyMarker.getFileSystem(hiveConf);
shouldFailover =
hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_REPL_FAILOVER_START)
&& fs.exists(failoverReadyMarker);
+ isFailover =
+ checkFileExists(new Path(dumpDirectory).getParent(), hiveConf,
OptimisedBootstrapUtils.EVENT_ACK_FILE);
incrementalLoadTasksBuilder = new
IncrementalLoadTasksBuilder(dbNameToLoadIn, dumpDirectory,
new IncrementalLoadEventsIterator(dumpDirectory, hiveConf),
hiveConf, eventTo, metricCollector,
replStatsTracker, shouldFailover);
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index dd611c0..0b9eb57 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -183,8 +183,8 @@ public class ReplicationSemanticAnalyzer extends
BaseSemanticAnalyzer {
"{} is set to TARGET.", dbName,
ReplConst.REPL_FAILOVER_ENDPOINT);
ReplUtils.unsetDbPropIfSet(database,
ReplConst.TARGET_OF_REPLICATION, db);
} else {
- LOG.error("Cannot dump database " + dbNameOrPattern + " as it is a
target of replication (repl.target.for)");
- throw new
SemanticException(ErrorMsg.REPL_DATABASE_IS_TARGET_OF_REPLICATION.getMsg());
+ LOG.warn("Database " + dbNameOrPattern + " is marked as target of
replication (repl.target.for), Will "
+ + "trigger failover.");
}
}
} else {