This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new fe0f1a648b1 HIVE-26301: Fix ACID tables bootstrap during reverse
replication in unplanned failover (Haymant Mangla reviewed by Peter Vary)
(#3352)
fe0f1a648b1 is described below
commit fe0f1a648b14cdf27edcf7a5d323cbd060104ebf
Author: Haymant Mangla <[email protected]>
AuthorDate: Fri Jun 10 16:06:58 2022 +0530
HIVE-26301: Fix ACID tables bootstrap during reverse replication in
unplanned failover (Haymant Mangla reviewed by Peter Vary) (#3352)
---
.../parse/TestReplicationOptimisedBootstrap.java | 360 ++++-----------------
.../TestReplicationScenariosExclusiveReplica.java | 292 ++++++++++++++++-
.../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 5 +-
3 files changed, 349 insertions(+), 308 deletions(-)
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
index 5bd6ac3d362..673e41b3065 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
@@ -23,14 +23,11 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.QuotaUsage;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore;
-import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
-import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
+import org.apache.hadoop.hive.metastore.api.AbortTxnsRequest;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import
org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter;
import
org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.security.UserGroupInformation;
@@ -71,7 +68,7 @@ import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-public class TestReplicationOptimisedBootstrap extends
BaseReplicationAcrossInstances {
+public class TestReplicationOptimisedBootstrap extends
BaseReplicationScenariosAcidTables {
String extraPrimaryDb;
@@ -84,8 +81,9 @@ public class TestReplicationOptimisedBootstrap extends
BaseReplicationAcrossInst
overrides.put(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname,
"true");
overrides.put(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname,
UserGroupInformation.getCurrentUser().getUserName());
overrides.put(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname,
"true");
-
- internalBeforeClassSetupExclusiveReplica(overrides, overrides,
TestReplicationOptimisedBootstrap.class);
+ overrides.put("hive.repl.bootstrap.dump.open.txn.timeout", "1s");
+ overrides.put("hive.in.repl.test", "true");
+ internalBeforeClassSetup(overrides,
TestReplicationOptimisedBootstrap.class);
}
@Before
@@ -112,7 +110,8 @@ public class TestReplicationOptimisedBootstrap extends
BaseReplicationAcrossInst
.run("create external table t2 (place string) partitioned by (country
string)")
.run("insert into table t2 partition(country='india') values
('chennai')")
.run("insert into table t2 partition(country='us') values ('new
york')")
- .run("create table t1_managed (id int)")
+ .run("create table t1_managed (id int) clustered by(id) into 3 buckets
stored as orc " +
+ "tblproperties (\"transactional\"=\"true\")")
.run("insert into table t1_managed values (10)")
.run("insert into table t1_managed values (20),(31),(42)")
.run("create table t2_managed (place string) partitioned by (country
string)")
@@ -125,14 +124,8 @@ public class TestReplicationOptimisedBootstrap extends
BaseReplicationAcrossInst
.run("repl status " + replicatedDbName)
.verifyResult(tuple.lastReplicationId)
.run("use " + replicatedDbName)
- .run("show tables like 't1'")
- .verifyResult("t1")
- .run("show tables like 't2'")
- .verifyResult("t2")
- .run("show tables like 't1_managed'")
- .verifyResult("t1_managed")
- .run("show tables like 't2_managed'")
- .verifyResult("t2_managed")
+ .run("show tables")
+ .verifyResults(new String[]{"t1", "t2", "t1_managed", "t2_managed"})
.verifyReplTargetProperty(replicatedDbName);
// Do an incremental dump & load, Add one table which we can drop & an
empty table as well.
@@ -145,10 +138,8 @@ public class TestReplicationOptimisedBootstrap extends
BaseReplicationAcrossInst
replica.load(replicatedDbName, primaryDbName, withClause)
.run("use " + replicatedDbName)
- .run("show tables like 't5_managed'")
- .verifyResult("t5_managed")
- .run("show tables like 't6_managed'")
- .verifyResult("t6_managed")
+ .run("show tables")
+ .verifyResults(new String[]{"t1", "t2", "t1_managed", "t2_managed",
"t5_managed", "t6_managed"})
.verifyReplTargetProperty(replicatedDbName);
// Do some modifications on other database with similar table names &
some modifications on original source
@@ -161,7 +152,8 @@ public class TestReplicationOptimisedBootstrap extends
BaseReplicationAcrossInst
.run("create external table t4 (id int)")
.run("insert into table t4 values (100)")
.run("insert into table t4 values (201)")
- .run("create table t4_managed (id int)")
+ .run("create table t4_managed (id int) clustered by(id) into 3 buckets
stored as orc " +
+ "tblproperties (\"transactional\"=\"true\")")
.run("insert into table t4_managed values (110)")
.run("insert into table t4_managed values (220)")
.run("insert into table t2 partition(country='france') values
('lyon')")
@@ -475,281 +467,34 @@ public class TestReplicationOptimisedBootstrap extends
BaseReplicationAcrossInst
}
@Test
- public void testTargetEventIdGenerationAfterFirstIncremental() throws
Throwable {
- List<String> withClause =
ReplicationTestUtils.includeExternalTableClause(true);
- withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" +
primary.repldDir + "'");
-
- // Do a bootstrap cycle(A->B)
- primary.dump(primaryDbName, withClause);
- replica.load(replicatedDbName, primaryDbName, withClause);
-
- // Add some table & do an incremental dump.
- WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
- .run("create external table table1 (id int)")
- .run("insert into table table1 values (100)")
- .run("create table table1_managed (name string)")
- .run("insert into table table1_managed values ('ABC')")
- .dump(primaryDbName, withClause);
-
- // Do an incremental load
- replica.load(replicatedDbName, primaryDbName, withClause);
-
- // Get the latest notification from the notification log for the target
database, just after replication.
- CurrentNotificationEventId notificationIdAfterRepl =
replica.getCurrentNotificationEventId();
-
- // Check the tables are there post incremental load.
- replica.run("repl status " + replicatedDbName)
- .verifyResult(tuple.lastReplicationId)
- .run("use " + replicatedDbName)
- .run("select id from table1")
- .verifyResult("100")
- .run("select name from table1_managed")
- .verifyResult("ABC")
- .verifyReplTargetProperty(replicatedDbName);
-
- // Do some modifications on the source cluster, so we have some entries in
the table diff.
- primary.run("use " + primaryDbName)
- .run("create table table2_managed (id string)")
- .run("insert into table table1_managed values ('SDC')")
- .run("insert into table table2_managed values ('A'),('B'),('C')");
-
-
- // Do some modifications in another database to have unrelated events as
well after the last load, which should
- // get filtered.
-
- primary.run("create database " + extraPrimaryDb)
- .run("use " + extraPrimaryDb)
- .run("create external table t1 (id int)")
- .run("insert into table t1 values (15),(1),(96)")
- .run("create table t1_managed (id string)")
- .run("insert into table t1_managed values ('SA'),('PS')");
-
- // Do some modifications on the target database.
- replica.run("use " + replicatedDbName)
- .run("alter database "+ replicatedDbName + " set DBPROPERTIES
('key1'='value1')")
- .run("alter database "+ replicatedDbName + " set DBPROPERTIES
('key2'='value2')");
-
- // Validate the current replication id on original target has changed now.
- assertNotEquals(replica.getCurrentNotificationEventId().getEventId(),
notificationIdAfterRepl.getEventId());
-
- // Prepare for reverse replication.
- DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
- Path newReplDir = new Path(replica.repldDir + "reverse1");
- replicaFs.mkdirs(newReplDir);
- withClause = ReplicationTestUtils.includeExternalTableClause(true);
- withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" +
newReplDir + "'");
-
- tuple = replica.dump(replicatedDbName);
-
- // Check event ack file should get created.
- assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + "
doesn't exist",
- replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
-
- // Get the target event id.
- NotificationEventResponse nl = new HiveMetaStoreClient(replica.hiveConf)
- .getNextNotification(Long.parseLong(getEventIdFromFile(new
Path(tuple.dumpLocation), conf)[1]), -1,
- new DatabaseAndTableFilter(replicatedDbName, null));
-
- // There should be 2 events, two custom alter operations.
- assertEquals(2, nl.getEvents().size());
- }
-
- @Test
- public void testTargetEventIdGeneration() throws Throwable {
- // Do a a cycle of bootstrap dump & load.
- List<String> withClause =
ReplicationTestUtils.includeExternalTableClause(true);
- withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" +
primary.repldDir + "'");
-
- // Do a bootstrap cycle(A->B)
- primary.dump(primaryDbName, withClause);
- replica.load(replicatedDbName, primaryDbName, withClause);
-
- // Add some table & do the first incremental dump.
- primary.run("use " + primaryDbName)
- .run("create external table tablei1 (id int)")
- .run("create external table tablei2 (id int)")
- .run("create table tablem1 (id int)")
- .run("create table tablem2 (id int)")
- .run("insert into table tablei1 values(1),(2),(3),(4)")
- .run("insert into table tablei2 values(10),(20),(30),(40)")
- .run("insert into table tablem1 values(5),(10),(15),(20)")
- .run("insert into table tablem2 values(6),(12),(18),(24)")
- .dump(primaryDbName, withClause);
-
- // Do the incremental load, and check everything is intact.
- replica.load(replicatedDbName, primaryDbName, withClause)
- .run("use "+ replicatedDbName)
- .run("select id from tablei1")
- .verifyResults(new String[]{"1","2","3","4"})
- .run("select id from tablei2")
- .verifyResults(new String[]{"10","20","30","40"})
- .run("select id from tablem1")
- .verifyResults(new String[]{"5","10","15","20"})
- .run("select id from tablem2")
- .verifyResults(new String[]{"6","12","18","24"});
-
- // Do some modifications & call for the second cycle of incremental dump &
load.
- WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
- .run("create external table table1 (id int)")
- .run("insert into table table1 values (25),(35),(82)")
- .run("create table table1_managed (name string)")
- .run("insert into table table1_managed values ('CAD'),('DAS'),('MSA')")
- .run("insert into table tablei1 values(15),(62),(25),(62)")
- .run("insert into table tablei2 values(10),(22),(11),(22)")
- .run("insert into table tablem1 values(5),(10),(15),(20)")
- .run("alter table table1 set TBLPROPERTIES('comment'='abc')")
- .dump(primaryDbName, withClause);
-
- // Do an incremental load
- replica.load(replicatedDbName, primaryDbName, withClause);
-
- // Get the latest notification from the notification log for the target
database, just after replication.
- CurrentNotificationEventId notificationIdAfterRepl =
replica.getCurrentNotificationEventId();
-
- // Check the tables are there post incremental load.
- replica.run("repl status " + replicatedDbName)
- .verifyResult(tuple.lastReplicationId)
- .run("use " + replicatedDbName)
- .run("select id from table1")
- .verifyResults(new String[]{"25", "35", "82"})
- .run("select name from table1_managed")
- .verifyResults(new String[]{"CAD", "DAS", "MSA"})
- .verifyReplTargetProperty(replicatedDbName);
-
- // Do some modifications on the source cluster, so we have some entries in
the table diff.
- primary.run("use " + primaryDbName)
- .run("create table table2_managed (id string)")
- .run("insert into table table1_managed values ('AAA'),('BBB')")
- .run("insert into table table2_managed values ('A1'),('B1'),('C2')");
-
-
- // Do some modifications in another database to have unrelated events as
well after the last load, which should
- // get filtered.
-
- primary.run("create database " + extraPrimaryDb)
- .run("use " + extraPrimaryDb)
- .run("create external table table1 (id int)")
- .run("insert into table table1 values (15),(1),(96)")
- .run("create table table1_managed (id string)")
- .run("insert into table table1_managed values ('SAA'),('PSA')");
-
- // Do some modifications on the target database.
- replica.run("use " + replicatedDbName)
- .run("alter database "+ replicatedDbName + " set DBPROPERTIES
('repl1'='value1')")
- .run("alter database "+ replicatedDbName + " set DBPROPERTIES
('repl2'='value2')");
-
- // Validate the current replication id on original target has changed now.
- assertNotEquals(replica.getCurrentNotificationEventId().getEventId(),
notificationIdAfterRepl.getEventId());
-
- // Prepare for reverse replication.
- DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
- Path newReplDir = new Path(replica.repldDir + "reverse01");
- replicaFs.mkdirs(newReplDir);
- withClause = ReplicationTestUtils.includeExternalTableClause(true);
- withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" +
newReplDir + "'");
-
- tuple = replica.dump(replicatedDbName, withClause);
-
- // Check event ack file should get created.
- assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + "
doesn't exist",
- replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
-
- // Get the target event id.
- NotificationEventResponse nl = new HiveMetaStoreClient(replica.hiveConf)
- .getNextNotification(Long.parseLong(getEventIdFromFile(new
Path(tuple.dumpLocation), conf)[1]), 10,
- new DatabaseAndTableFilter(replicatedDbName, null));
-
- assertEquals(0, nl.getEventsSize());
- }
-
- @Test
- public void testTargetEventIdWithNotificationsExpired() throws Throwable {
- // Do a a cycle of bootstrap dump & load.
- List<String> withClause =
ReplicationTestUtils.includeExternalTableClause(true);
- withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" +
primary.repldDir + "'");
-
- // Do a bootstrap cycle(A->B)
- primary.dump(primaryDbName, withClause);
- replica.load(replicatedDbName, primaryDbName, withClause);
+ public void testReverseBootstrap() throws Throwable {
+ HiveConf primaryConf = primary.getConf();
+ TxnStore txnHandler = TxnUtils.getTxnStore(primary.getConf());
+ List<String> withClause = setUpFirstIterForOptimisedBootstrap();
- // Add some table & do the first incremental dump.
- primary.run("use " + primaryDbName)
- .run("create external table tablei1 (id int)")
- .run("create table tablem1 (id int)")
- .run("insert into table tablei1 values(1),(2),(3),(4)")
- .run("insert into table tablem1 values(5),(10),(15),(20)")
- .dump(primaryDbName, withClause);
+ // Open 3 txns for Database which is not under replication
+ int numTxnsForSecDb = 3;
+ List<Long> txnsForSecDb = openTxns(numTxnsForSecDb, txnHandler,
primaryConf);
- // Do the incremental load, and check everything is intact.
- replica.load(replicatedDbName, primaryDbName, withClause)
- .run("use "+ replicatedDbName)
- .run("select id from tablei1")
- .verifyResults(new String[]{"1","2","3","4"})
- .run("select id from tablem1")
- .verifyResults(new String[]{"5","10","15","20"});
-
- // Explicitly make the notification logs.
- // Get the latest notification from the notification log for the target
database, just after replication.
- CurrentNotificationEventId notificationIdAfterRepl =
replica.getCurrentNotificationEventId();
- // Inject a behaviour where some events missing from notification_log
table.
- // This ensures the incremental dump doesn't get all events for
replication.
-
InjectableBehaviourObjectStore.BehaviourInjection<NotificationEventResponse,
NotificationEventResponse>
- eventIdSkipper =
- new
InjectableBehaviourObjectStore.BehaviourInjection<NotificationEventResponse,
NotificationEventResponse>() {
-
- @Nullable
- @Override
- public NotificationEventResponse apply(@Nullable
NotificationEventResponse eventIdList) {
- if (null != eventIdList) {
- List<NotificationEvent> eventIds = eventIdList.getEvents();
- List<NotificationEvent> outEventIds = new ArrayList<>();
- for (NotificationEvent event : eventIds) {
- // Skip the last db event.
- if (event.getDbName().equalsIgnoreCase(replicatedDbName)) {
- injectionPathCalled = true;
- continue;
- }
- outEventIds.add(event);
- }
-
- // Return the new list
- return new NotificationEventResponse(outEventIds);
- } else {
- return null;
- }
- }
- };
+ Map<String, Long> tablesInSecDb = new HashMap<>();
+ tablesInSecDb.put("t1", (long) numTxnsForSecDb);
+ tablesInSecDb.put("t2", (long) numTxnsForSecDb);
+ List<Long> lockIdsForSecDb =
allocateWriteIdsForTablesAndAcquireLocks(primaryDbName + "_extra",
+ tablesInSecDb, txnHandler, txnsForSecDb, primaryConf);
- try {
-
InjectableBehaviourObjectStore.setGetNextNotificationBehaviour(eventIdSkipper);
-
- // Prepare for reverse replication.
- DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
- Path newReplDir = new Path(replica.repldDir + "reverse01");
- replicaFs.mkdirs(newReplDir);
- withClause = ReplicationTestUtils.includeExternalTableClause(true);
- withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" +
newReplDir + "'");
-
- try {
- replica.dump(replicatedDbName, withClause);
- fail("Expected the dump to fail since the notification event is
missing.");
- } catch (Exception e) {
- // Expected due to missing notification log entry.
- }
-
- // Check if there is a non-recoverable error or not.
- Path nonRecoverablePath =
- TestReplicationScenarios.getNonRecoverablePath(newReplDir,
replicatedDbName, replica.hiveConf);
- assertTrue(replicaFs.exists(nonRecoverablePath));
- } finally {
- InjectableBehaviourObjectStore.resetGetNextNotificationBehaviour(); //
reset the behaviour
- }
- }
+ //Open 2 txns for Primary Db
+ int numTxnsForPrimaryDb = 2;
+ List<Long> txnsForSourceDb = openTxns(numTxnsForPrimaryDb, txnHandler,
primaryConf);
+ // Allocate write ids for both tables of source database.
+ Map<String, Long> tablesInSourceDb = new HashMap<>();
+ tablesInSourceDb.put("t1", (long) numTxnsForPrimaryDb + 4);
+ tablesInSourceDb.put("t2", (long) numTxnsForPrimaryDb);
+ allocateWriteIdsForTablesAndAcquireLocks(replicatedDbName,
tablesInSourceDb, txnHandler,
+ txnsForSourceDb, replica.getConf());
- @Test
- public void testReverseBootstrap() throws Throwable {
- List<String> withClause = setUpFirstIterForOptimisedBootstrap();
+ //Open 1 txn with no hive locks acquired
+ List<Long> txnsWithNoLocks = openTxns(1, txnHandler, primaryConf);
// Do a reverse second dump, this should do a bootstrap dump for the
tables in the table_diff and incremental for
// rest.
@@ -757,6 +502,14 @@ public class TestReplicationOptimisedBootstrap extends
BaseReplicationAcrossInst
assertTrue("value1".equals(primary.getDatabase(primaryDbName).getParameters().get("key1")));
WarehouseInstance.Tuple tuple = replica.dump(replicatedDbName, withClause);
+ //Verify that openTxns for sourceDb were aborted before proceeding with
bootstrap dump.
+ verifyAllOpenTxnsAborted(txnsForSourceDb, primaryConf);
+ verifyAllOpenTxnsNotAborted(txnsForSecDb, primaryConf);
+ verifyAllOpenTxnsNotAborted(txnsWithNoLocks, primaryConf);
+ txnHandler.abortTxns(new AbortTxnsRequest(txnsForSecDb));
+ txnHandler.abortTxns(new AbortTxnsRequest(txnsWithNoLocks));
+ releaseLocks(txnHandler, lockIdsForSecDb);
+
String hiveDumpDir = tuple.dumpLocation + File.separator +
ReplUtils.REPL_HIVE_BASE_DIR;
// _bootstrap directory should be created as bootstrap enabled on external
tables.
Path dumpPath1 = new Path(hiveDumpDir, INC_BOOTSTRAP_ROOT_DIR_NAME +"/" +
EximUtil.METADATA_PATH_NAME +"/" + replicatedDbName);
@@ -950,7 +703,8 @@ public class TestReplicationOptimisedBootstrap extends
BaseReplicationAcrossInst
// Create some partitioned and non partitioned tables and do a dump & load.
WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
- .run("create table t1 (id int)")
+ .run("create table t1 (id int) clustered by(id) into 3 buckets stored
as orc " +
+ "tblproperties (\"transactional\"=\"true\")")
.run("insert into table t1 values (1)")
.run("insert into table t1 values (2),(3),(4)")
.run("create table t2 (id int)")
@@ -968,14 +722,8 @@ public class TestReplicationOptimisedBootstrap extends
BaseReplicationAcrossInst
.run("repl status " + replicatedDbName)
.verifyResult(tuple.lastReplicationId)
.run("use " + replicatedDbName)
- .run("show tables like 't1'")
- .verifyResult("t1")
- .run("show tables like 't2'")
- .verifyResult("t2")
- .run("show tables like 't3'")
- .verifyResult("t3")
- .run("show tables like 't4'")
- .verifyResult("t4")
+ .run("show tables")
+ .verifyResults(new String[]{"t1", "t2", "t3", "t4"})
.verifyReplTargetProperty(replicatedDbName);
// Prepare for reverse bootstrap.
@@ -1083,7 +831,10 @@ public class TestReplicationOptimisedBootstrap extends
BaseReplicationAcrossInst
// Create 4 managed tables and do a dump & load.
WarehouseInstance.Tuple tuple =
- primary.run("use " + primaryDbName).run("create table t1 (id
int)").run("insert into table t1 values (1)")
+ primary.run("use " + primaryDbName)
+ .run("create table t1 (id int) clustered by(id) into 3 buckets
stored as orc " +
+ "tblproperties (\"transactional\"=\"true\")")
+ .run("insert into table t1 values (1)")
.run("insert into table t1 values (2),(3),(4)")
.run("create table t2 (place string) partitioned by (country
string)")
.run("insert into table t2 partition(country='india') values
('chennai')")
@@ -1100,7 +851,8 @@ public class TestReplicationOptimisedBootstrap extends
BaseReplicationAcrossInst
.verifyResult("t3").run("show tables like
't4'").verifyResult("t4").verifyReplTargetProperty(replicatedDbName);
// Do some modifications on original source cluster. The diff
becomes(tnew_managed, t1, t2, t3)
- primary.run("use " + primaryDbName).run("create table tnew_managed (id
int)")
+ primary.run("use " + primaryDbName).run("create table tnew_managed (id
int) clustered by(id) into 3 buckets " +
+ "stored as orc tblproperties (\"transactional\"=\"true\")")
.run("insert into table t1 values (25)").run("insert into table
tnew_managed values (110)")
.run("insert into table t2 partition(country='france') values
('lyon')").run("drop table t3")
.run("alter database "+ primaryDbName + " set DBPROPERTIES
('key1'='value1')");
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java
index c9f4753ba99..8710e2c70a0 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java
@@ -19,10 +19,17 @@ package org.apache.hadoop.hive.ql.parse;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
+import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import
org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter;
import
org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.security.UserGroupInformation;
@@ -31,6 +38,7 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import javax.annotation.Nullable;
import java.io.BufferedReader;
import java.io.File;
@@ -39,7 +47,6 @@ import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -47,12 +54,22 @@ import java.util.Map;
import java.util.Set;
import static
org.apache.hadoop.hive.common.repl.ReplConst.SOURCE_OF_REPLICATION;
+import static
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.EVENT_ACK_FILE;
+import static
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getEventIdFromFile;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
/**
* Test replication scenarios with staging on replica.
*/
public class TestReplicationScenariosExclusiveReplica extends
BaseReplicationAcrossInstances {
+ String extraPrimaryDb;
+
@BeforeClass
public static void classLevelSetup() throws Exception {
Map<String, String> overrides = new HashMap<>();
@@ -68,6 +85,7 @@ public class TestReplicationScenariosExclusiveReplica extends
BaseReplicationAcr
@Before
public void setup() throws Throwable {
super.setup();
+ extraPrimaryDb = "extra_" + primaryDbName;
}
@After
@@ -75,6 +93,278 @@ public class TestReplicationScenariosExclusiveReplica
extends BaseReplicationAcr
super.tearDown();
}
+ @Test
+ public void testTargetEventIdGenerationAfterFirstIncrementalInOptFailover()
throws Throwable {
+ List<String> withClause =
ReplicationTestUtils.includeExternalTableClause(true);
+ withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" +
primary.repldDir + "'");
+
+ // Do a bootstrap cycle(A->B)
+ primary.dump(primaryDbName, withClause);
+ replica.load(replicatedDbName, primaryDbName, withClause);
+
+ // Add some table & do an incremental dump.
+ WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+ .run("create external table table1 (id int)")
+ .run("insert into table table1 values (100)")
+ .run("create table table1_managed (name string)")
+ .run("insert into table table1_managed values ('ABC')")
+ .dump(primaryDbName, withClause);
+
+ // Do an incremental load
+ replica.load(replicatedDbName, primaryDbName, withClause);
+
+ // Get the latest notification from the notification log for the target
database, just after replication.
+ CurrentNotificationEventId notificationIdAfterRepl =
replica.getCurrentNotificationEventId();
+
+ // Check the tables are there post incremental load.
+ replica.run("repl status " + replicatedDbName)
+ .verifyResult(tuple.lastReplicationId)
+ .run("use " + replicatedDbName)
+ .run("select id from table1")
+ .verifyResult("100")
+ .run("select name from table1_managed")
+ .verifyResult("ABC")
+ .verifyReplTargetProperty(replicatedDbName);
+
+ // Do some modifications on the source cluster, so we have some entries in
the table diff.
+ primary.run("use " + primaryDbName)
+ .run("create table table2_managed (id string)")
+ .run("insert into table table1_managed values ('SDC')")
+ .run("insert into table table2_managed values ('A'),('B'),('C')");
+
+
+ // Do some modifications in another database to have unrelated events as
well after the last load, which should
+ // get filtered.
+
+ primary.run("create database " + extraPrimaryDb)
+ .run("use " + extraPrimaryDb)
+ .run("create external table t1 (id int)")
+ .run("insert into table t1 values (15),(1),(96)")
+ .run("create table t1_managed (id string)")
+ .run("insert into table t1_managed values ('SA'),('PS')");
+
+ // Do some modifications on the target database.
+ replica.run("use " + replicatedDbName)
+ .run("alter database "+ replicatedDbName + " set DBPROPERTIES
('key1'='value1')")
+ .run("alter database "+ replicatedDbName + " set DBPROPERTIES
('key2'='value2')");
+
+ // Validate the current replication id on original target has changed now.
+ assertNotEquals(replica.getCurrentNotificationEventId().getEventId(),
notificationIdAfterRepl.getEventId());
+
+ // Prepare for reverse replication.
+ DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+ Path newReplDir = new Path(replica.repldDir + "reverse1");
+ replicaFs.mkdirs(newReplDir);
+ withClause = ReplicationTestUtils.includeExternalTableClause(true);
+ withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" +
newReplDir + "'");
+
+ tuple = replica.dump(replicatedDbName);
+
+ // Check event ack file should get created.
+ assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + "
doesn't exist",
+ replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
+
+ // Get the target event id.
+ NotificationEventResponse nl = new HiveMetaStoreClient(replica.hiveConf)
+ .getNextNotification(Long.parseLong(getEventIdFromFile(new
Path(tuple.dumpLocation), conf)[1]), -1,
+ new DatabaseAndTableFilter(replicatedDbName, null));
+
+ // There should be 2 events, two custom alter operations.
+ assertEquals(2, nl.getEvents().size());
+ }
+
+ @Test
+ public void testTargetEventIdGenerationInOptmisedFailover() throws Throwable
{
+ // Do a a cycle of bootstrap dump & load.
+ List<String> withClause =
ReplicationTestUtils.includeExternalTableClause(true);
+ withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" +
primary.repldDir + "'");
+
+ // Do a bootstrap cycle(A->B)
+ primary.dump(primaryDbName, withClause);
+ replica.load(replicatedDbName, primaryDbName, withClause);
+
+ // Add some table & do the first incremental dump.
+ primary.run("use " + primaryDbName)
+ .run("create external table tablei1 (id int)")
+ .run("create external table tablei2 (id int)")
+ .run("create table tablem1 (id int)")
+ .run("create table tablem2 (id int)")
+ .run("insert into table tablei1 values(1),(2),(3),(4)")
+ .run("insert into table tablei2 values(10),(20),(30),(40)")
+ .run("insert into table tablem1 values(5),(10),(15),(20)")
+ .run("insert into table tablem2 values(6),(12),(18),(24)")
+ .dump(primaryDbName, withClause);
+
+ // Do the incremental load, and check everything is intact.
+ replica.load(replicatedDbName, primaryDbName, withClause)
+ .run("use "+ replicatedDbName)
+ .run("select id from tablei1")
+ .verifyResults(new String[]{"1","2","3","4"})
+ .run("select id from tablei2")
+ .verifyResults(new String[]{"10","20","30","40"})
+ .run("select id from tablem1")
+ .verifyResults(new String[]{"5","10","15","20"})
+ .run("select id from tablem2")
+ .verifyResults(new String[]{"6","12","18","24"});
+
+ // Do some modifications & call for the second cycle of incremental dump &
load.
+ WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+ .run("create external table table1 (id int)")
+ .run("insert into table table1 values (25),(35),(82)")
+ .run("create table table1_managed (name string)")
+ .run("insert into table table1_managed values
('CAD'),('DAS'),('MSA')")
+ .run("insert into table tablei1 values(15),(62),(25),(62)")
+ .run("insert into table tablei2 values(10),(22),(11),(22)")
+ .run("insert into table tablem1 values(5),(10),(15),(20)")
+ .run("alter table table1 set TBLPROPERTIES('comment'='abc')")
+ .dump(primaryDbName, withClause);
+
+ // Do an incremental load
+ replica.load(replicatedDbName, primaryDbName, withClause);
+
+ // Get the latest notification from the notification log for the target
database, just after replication.
+ CurrentNotificationEventId notificationIdAfterRepl =
replica.getCurrentNotificationEventId();
+
+ // Check the tables are there post incremental load.
+ replica.run("repl status " + replicatedDbName)
+ .verifyResult(tuple.lastReplicationId)
+ .run("use " + replicatedDbName)
+ .run("select id from table1")
+ .verifyResults(new String[]{"25", "35", "82"})
+ .run("select name from table1_managed")
+ .verifyResults(new String[]{"CAD", "DAS", "MSA"})
+ .verifyReplTargetProperty(replicatedDbName);
+
+ // Do some modifications on the source cluster, so we have some entries in
the table diff.
+ primary.run("use " + primaryDbName)
+ .run("create table table2_managed (id string)")
+ .run("insert into table table1_managed values ('AAA'),('BBB')")
+ .run("insert into table table2_managed values
('A1'),('B1'),('C2')");
+
+
+ // Do some modifications in another database to have unrelated events as
well after the last load, which should
+ // get filtered.
+
+ primary.run("create database " + extraPrimaryDb)
+ .run("use " + extraPrimaryDb)
+ .run("create external table table1 (id int)")
+ .run("insert into table table1 values (15),(1),(96)")
+ .run("create table table1_managed (id string)")
+ .run("insert into table table1_managed values ('SAA'),('PSA')");
+
+ // Do some modifications on the target database.
+ replica.run("use " + replicatedDbName)
+ .run("alter database "+ replicatedDbName + " set DBPROPERTIES
('repl1'='value1')")
+ .run("alter database "+ replicatedDbName + " set DBPROPERTIES
('repl2'='value2')");
+
+ // Validate the current replication id on original target has changed now.
+ assertNotEquals(replica.getCurrentNotificationEventId().getEventId(),
notificationIdAfterRepl.getEventId());
+
+ // Prepare for reverse replication.
+ DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+ Path newReplDir = new Path(replica.repldDir + "reverse01");
+ replicaFs.mkdirs(newReplDir);
+ withClause = ReplicationTestUtils.includeExternalTableClause(true);
+ withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" +
newReplDir + "'");
+
+ tuple = replica.dump(replicatedDbName, withClause);
+
+ // Check event ack file should get created.
+ assertTrue(new Path(tuple.dumpLocation, EVENT_ACK_FILE).toString() + "
doesn't exist",
+ replicaFs.exists(new Path(tuple.dumpLocation, EVENT_ACK_FILE)));
+
+ // Get the target event id.
+ NotificationEventResponse nl = new HiveMetaStoreClient(replica.hiveConf)
+ .getNextNotification(Long.parseLong(getEventIdFromFile(new
Path(tuple.dumpLocation), conf)[1]), 10,
+ new DatabaseAndTableFilter(replicatedDbName, null));
+
+ assertEquals(0, nl.getEventsSize());
+ }
+
+ @Test
+ public void testTargetEventIdWithNotificationsExpiredInOptimisedFailover()
throws Throwable {
+ // Do a a cycle of bootstrap dump & load.
+ List<String> withClause =
ReplicationTestUtils.includeExternalTableClause(true);
+ withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" +
primary.repldDir + "'");
+
+ // Do a bootstrap cycle(A->B)
+ primary.dump(primaryDbName, withClause);
+ replica.load(replicatedDbName, primaryDbName, withClause);
+
+ // Add some table & do the first incremental dump.
+ primary.run("use " + primaryDbName)
+ .run("create external table tablei1 (id int)")
+ .run("create table tablem1 (id int)")
+ .run("insert into table tablei1 values(1),(2),(3),(4)")
+ .run("insert into table tablem1 values(5),(10),(15),(20)")
+ .dump(primaryDbName, withClause);
+
+ // Do the incremental load, and check everything is intact.
+ replica.load(replicatedDbName, primaryDbName, withClause)
+ .run("use "+ replicatedDbName)
+ .run("select id from tablei1")
+ .verifyResults(new String[]{"1","2","3","4"})
+ .run("select id from tablem1")
+ .verifyResults(new String[]{"5","10","15","20"});
+
+ // Explicitly make the notification logs.
+ // Get the latest notification from the notification log for the target
database, just after replication.
+ CurrentNotificationEventId notificationIdAfterRepl =
replica.getCurrentNotificationEventId();
+ // Inject a behaviour where some events missing from notification_log
table.
+ // This ensures the incremental dump doesn't get all events for
replication.
+
InjectableBehaviourObjectStore.BehaviourInjection<NotificationEventResponse,
NotificationEventResponse>
+ eventIdSkipper =
+ new
InjectableBehaviourObjectStore.BehaviourInjection<NotificationEventResponse,
NotificationEventResponse>() {
+
+ @Nullable
+ @Override
+ public NotificationEventResponse apply(@Nullable
NotificationEventResponse eventIdList) {
+ if (null != eventIdList) {
+ List<NotificationEvent> eventIds = eventIdList.getEvents();
+ List<NotificationEvent> outEventIds = new ArrayList<>();
+ for (NotificationEvent event : eventIds) {
+ // Skip the last db event.
+ if (event.getDbName().equalsIgnoreCase(replicatedDbName)) {
+ injectionPathCalled = true;
+ continue;
+ }
+ outEventIds.add(event);
+ }
+
+ // Return the new list
+ return new NotificationEventResponse(outEventIds);
+ } else {
+ return null;
+ }
+ }
+ };
+
+ try {
+
InjectableBehaviourObjectStore.setGetNextNotificationBehaviour(eventIdSkipper);
+
+ // Prepare for reverse replication.
+ DistributedFileSystem replicaFs = replica.miniDFSCluster.getFileSystem();
+ Path newReplDir = new Path(replica.repldDir + "reverse01");
+ replicaFs.mkdirs(newReplDir);
+ withClause = ReplicationTestUtils.includeExternalTableClause(true);
+ withClause.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" +
newReplDir + "'");
+
+ try {
+ replica.dump(replicatedDbName, withClause);
+ fail("Expected the dump to fail since the notification event is
missing.");
+ } catch (Exception e) {
+ // Expected due to missing notification log entry.
+ }
+
+ // Check if there is a non-recoverable error or not.
+ Path nonRecoverablePath =
+ TestReplicationScenarios.getNonRecoverablePath(newReplDir,
replicatedDbName, replica.hiveConf);
+ assertTrue(replicaFs.exists(nonRecoverablePath));
+ } finally {
+ InjectableBehaviourObjectStore.resetGetNextNotificationBehaviour(); //
reset the behaviour
+ }
+ }
+
@Test
public void testDistCpCopyWithRemoteStagingAndCopyTaskOnTarget() throws
Throwable {
List<String> withClauseOptions =
getStagingLocationConfig(replica.repldDir, true);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index b76354eb459..667ede3ca74 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -1088,9 +1088,8 @@ public class ReplDumpTask extends Task<ReplDumpWork>
implements Serializable {
// of the ACID tables might be included for bootstrap during incremental
dump. For old policy, its because the table
// may not satisfying the old policy but satisfying the new policy. For
filter, it may happen that the table
// is renamed and started satisfying the policy.
- return ((!work.replScope.includeAllTables())
- || (previousReplScopeModified())
- || conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES));
+ return !work.replScope.includeAllTables() || previousReplScopeModified()
|| !tablesForBootstrap.isEmpty()
+ || conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES);
}
private void dumpEvent(NotificationEvent ev, Path evRoot, Path dumpRoot,
Path cmRoot, Hive db) throws Exception {