hmangla98 commented on a change in pull request #2121:
URL: https://github.com/apache/hive/pull/2121#discussion_r655043932
##########
File path:
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
##########
@@ -138,6 +141,211 @@ public void tearDown() throws Throwable {
primary.run("drop database if exists " + primaryDbName + "_extra cascade");
}
+ @Test
+ public void testFailoverDuringDump() throws Throwable {
+ HiveConf primaryConf = primary.getConf();
+ TxnStore txnHandler = TxnUtils.getTxnStore(primary.getConf());
+ WarehouseInstance.Tuple dumpData = null;
+ List<String> failoverConfigs = Arrays.asList("'" +
HiveConf.ConfVars.HIVE_REPL_FAILOVER_START + "'='true'");
+ dumpData = primary.run("use " + primaryDbName)
+ .run("create table t1 (id int) clustered by(id) into 3 buckets
stored as orc " +
+ "tblproperties (\"transactional\"=\"true\")")
+ .run("create table t2 (rank int) partitioned by (name string)
tblproperties(\"transactional\"=\"true\", " +
+ "\"transactional_properties\"=\"insert_only\")")
+ .dump(primaryDbName, failoverConfigs);
+
+ //This dump is not failover ready as target db can be used for replication
only after first incremental load.
+ FileSystem fs = new Path(dumpData.dumpLocation).getFileSystem(conf);
+ Path dumpPath = new Path(dumpData.dumpLocation,
ReplUtils.REPL_HIVE_BASE_DIR);
+ assertFalse(fs.exists(new Path(dumpPath,
ReplAck.FAILOVER_READY_MARKER.toString())));
+
assertFalse(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+
+ replica.load(replicatedDbName, primaryDbName)
+ .run("use " + replicatedDbName)
+ .run("show tables")
+ .verifyResults(new String[]{"t1", "t2"})
+ .run("repl status " + replicatedDbName)
+ .verifyResult(dumpData.lastReplicationId);
+
+ primary.run("use " + primaryDbName)
+ .run("insert into t1 values(1)")
+ .run("insert into t2 partition(name='Bob') values(11)")
+ .run("insert into t2 partition(name='Carl') values(10)");
+
+ /**There are 2 options with respect to open transactions:
+ Case 1) Txns that belong to different db or have not acquired HIVE LOCKS:
These txns would be caught in
+ _failovermetadata file.
+ Case 2) Txns that belong to db under replication: These txns would be
aborted as part of dump operation.
+ */
+ // Open 3 txns for Secondary Database
+ int numTxnsForSecDb = 3;
+ List<Long> txnsForSecDb = openTxns(numTxnsForSecDb, txnHandler,
primaryConf);
+
+ // Allocate write ids for both tables of secondary db for 3 txns
+ // t1=5 and t2=5
+ Map<String, Long> tablesInSecDb = new HashMap<>();
+ tablesInSecDb.put("t1", (long) numTxnsForSecDb);
+ tablesInSecDb.put("t2", (long) numTxnsForSecDb);
+ List<Long> lockIdsForSecDb =
allocateWriteIdsForTablesAndAquireLocks(primaryDbName + "_extra",
+ tablesInSecDb, txnHandler, txnsForSecDb, primaryConf);
+
+ //Open 2 txns for Primary Db
+ int numTxnsForPrimaryDb = 2;
+ List<Long> txnsForPrimaryDb = openTxns(numTxnsForPrimaryDb, txnHandler,
primaryConf);
+
+ // Allocate write ids for both tables of primary db for 2 txns
+ // t1=5 and t2=5
+ Map<String, Long> tablesInPrimaryDb = new HashMap<>();
+ tablesInPrimaryDb.put("t1", (long) numTxnsForPrimaryDb + 1);
+ tablesInPrimaryDb.put("t2", (long) numTxnsForPrimaryDb + 2);
+ List<Long> lockIdsForPrimaryDb =
allocateWriteIdsForTablesAndAquireLocks(primaryDbName,
+ tablesInPrimaryDb, txnHandler, txnsForPrimaryDb, primaryConf);
+
+ //Open 1 txn with no hive locks acquired
+ List<Long> txnsWithNoLocks = openTxns(1, txnHandler, primaryConf);
+
+ dumpData = primary.dump(primaryDbName, failoverConfigs);
+
+ fs = new Path(dumpData.dumpLocation).getFileSystem(conf);
+ dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+ assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
+ assertTrue(fs.exists(new Path(dumpPath,
FailoverMetaData.FAILOVER_METADATA)));
+ assertTrue(fs.exists(new Path(dumpPath,
ReplAck.FAILOVER_READY_MARKER.toString())));
+
assertTrue(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+ FailoverMetaData failoverMD = new FailoverMetaData(dumpPath, conf);
+
+ List<Long> openTxns = failoverMD.getOpenTxns();
+ List<Long> txnsAborted = failoverMD.getAbortedTxns();
+ assertTrue(txnsAborted.size() == 2);
+ assertTrue(txnsAborted.containsAll(txnsForPrimaryDb));
+ assertTrue(openTxns.size() == 4);
+ assertTrue(openTxns.containsAll(txnsForSecDb));
+ assertTrue(openTxns.containsAll(txnsWithNoLocks));
+ assertTrue(failoverMD.getTxnsWithoutLocks().equals(txnsWithNoLocks));
+
+
+ //TxnsForPrimaryDb and txnsWithNoLocks would have been aborted by dump
operation.
+ verifyAllOpenTxnsAborted(txnsForPrimaryDb, primaryConf);
+ verifyAllOpenTxnsNotAborted(txnsForSecDb, primaryConf);
+ //Abort the txns
+ txnHandler.abortTxns(new AbortTxnsRequest(txnsForSecDb));
+ verifyAllOpenTxnsAborted(txnsForSecDb, primaryConf);
+ releaseLocks(txnHandler, lockIdsForSecDb);
+
+ replica.load(replicatedDbName, primaryDbName)
+ .run("use " + replicatedDbName)
+ .run("show tables")
+ .verifyResults(new String[]{"t1", "t2"})
+ .run("repl status " + replicatedDbName)
+ .verifyResult(dumpData.lastReplicationId)
+ .run("select id from t1")
+ .verifyResults(new String[]{"1"})
+ .run("select rank from t2 order by rank")
+ .verifyResults(new String[]{"10", "11"});
+
+ assertTrue(fs.exists(new Path(dumpPath,
ReplAck.LOAD_ACKNOWLEDGEMENT.toString())));
+
+ try {
+ primary.dump(primaryDbName, failoverConfigs); //Will skip this dump
since previous dump is failover Ready.
+ assertTrue(false);
+ } catch (Exception e) {
+ Assert.assertEquals(IndexOutOfBoundsException.class, e.getClass());
+ }
+
+
assertTrue(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+ dumpData = primary.dump(primaryDbName);
+ dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+ assertFalse(fs.exists(new Path(dumpPath,
ReplAck.FAILOVER_READY_MARKER.toString())));
+ assertFalse(fs.exists(new Path(dumpPath,
FailoverMetaData.FAILOVER_METADATA)));
+
assertFalse(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+ replica.load(replicatedDbName, primaryDbName);
+
+ fs.create(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString()));
+ fs.create(new Path(dumpPath, FailoverMetaData.FAILOVER_METADATA));
+ assertTrue(fs.exists(new Path(dumpPath,
ReplAck.FAILOVER_READY_MARKER.toString())));
+ assertTrue(fs.exists(new Path(dumpPath,
FailoverMetaData.FAILOVER_METADATA)));
+
+ //Since the failover start config is disabled and previous valid dump
directory contains _failover_ready marker file
+ //So, this dump iteration will perform bootstrap dump instead of
incremental.
+ dumpData = primary.dump(primaryDbName);
+ DumpMetaData dmd = new DumpMetaData(new Path(dumpData.dumpLocation,
ReplUtils.REPL_HIVE_BASE_DIR), conf);
+ assertTrue(dmd.getDumpType() == DumpType.BOOTSTRAP);
+ }
+
+ @Test
+ public void testFailoverDuringDumpWithPreviousFailed() throws Throwable {
+ HiveConf primaryConf = primary.getConf();
+ TxnStore txnHandler = TxnUtils.getTxnStore(primary.getConf());
+ WarehouseInstance.Tuple dumpData = null;
+ List<String> failoverConfigs = Arrays.asList("'" +
HiveConf.ConfVars.HIVE_REPL_FAILOVER_START + "'='true'");
+ dumpData = primary.run("use " + primaryDbName)
+ .run("create table t1 (id int) clustered by(id) into 3 buckets
stored as orc " +
+ "tblproperties (\"transactional\"=\"true\")")
+ .run("create table t2 (rank int) partitioned by (name string)
tblproperties(\"transactional\"=\"true\", " +
+ "\"transactional_properties\"=\"insert_only\")")
+ .dump(primaryDbName, failoverConfigs);
+
+ //This dump is not failover ready as target db can be used for replication
only after first incremental load.
+ FileSystem fs = new Path(dumpData.dumpLocation).getFileSystem(conf);
+ Path dumpPath = new Path(dumpData.dumpLocation,
ReplUtils.REPL_HIVE_BASE_DIR);
+ assertFalse(fs.exists(new Path(dumpPath,
ReplAck.FAILOVER_READY_MARKER.toString())));
+
assertFalse(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+
+ replica.load(replicatedDbName, primaryDbName)
+ .run("use " + replicatedDbName)
+ .run("show tables")
+ .verifyResults(new String[]{"t1", "t2"})
+ .run("repl status " + replicatedDbName)
+ .verifyResult(dumpData.lastReplicationId);
+
+ primary.run("use " + primaryDbName)
+ .run("insert into t1 values(1)")
+ .run("insert into t2 partition(name='Bob') values(11)")
+ .run("insert into t2 partition(name='Carl') values(10)");
+
+ FailoverMetaData failoverMD = new FailoverMetaData(dumpPath, conf);
+ Long cursorPoint = primary.getCurrentNotificationEventId().getEventId();
+ failoverMD.setCursorPoint(cursorPoint);
+ failoverMD.setOpenTxns(Arrays.asList(1L, 2L, 3L));
+ failoverMD.setAbortedTxns(new ArrayList<>());
+ Long failoverEventId =
primary.getCurrentNotificationEventId().getEventId();
+ failoverMD.setEventId(failoverEventId);
+ failoverMD.setTxnsWithoutLocks(new ArrayList<>());
+ failoverMD.write();
+ assertTrue(fs.exists(new Path(dumpPath,
FailoverMetaData.FAILOVER_METADATA)));
Review comment:
This UT is to test the case in which dump fails while dumping the events
and after capturing the failover metadata. So, the next dump operations should
use the same metadata info in next iteration.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]