This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 67c2d4910ff HIVE-26316: Handle dangling open txns on both src & tgt in
unplanned failover. (Haymant Mangla reviewed by Peter Vary) (#3367)
67c2d4910ff is described below
commit 67c2d4910ff17c694653eb8bd9c9ed2405cec38b
Author: Haymant Mangla <[email protected]>
AuthorDate: Thu Jun 16 15:11:22 2022 +0530
HIVE-26316: Handle dangling open txns on both src & tgt in unplanned
failover. (Haymant Mangla reviewed by Peter Vary) (#3367)
---
.../parse/TestReplicationOptimisedBootstrap.java | 141 ++++++++++++++++++++-
.../hive/ql/exec/repl/OptimisedBootstrapUtils.java | 92 ++++++++++++--
.../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 77 +++++------
.../hadoop/hive/ql/exec/repl/ReplLoadTask.java | 41 +++++-
.../hadoop/hive/ql/exec/repl/util/ReplUtils.java | 61 +++++++++
.../repl/dump/events/AbstractEventHandler.java | 11 +-
6 files changed, 349 insertions(+), 74 deletions(-)
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
index 673e41b3065..dd6821dc578 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
@@ -24,12 +24,16 @@ import org.apache.hadoop.fs.QuotaUsage;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.AbortTxnsRequest;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.TxnType;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import
org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils;
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.HiveUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.jetbrains.annotations.NotNull;
@@ -55,7 +59,6 @@ import static
org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLI
import static
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.EVENT_ACK_FILE;
import static
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.TABLE_DIFF_COMPLETE_DIRECTORY;
import static
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.TABLE_DIFF_INPROGRESS_DIRECTORY;
-import static
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getEventIdFromFile;
import static
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getPathsFromTableFile;
import static
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.getTablesFromTableDiffFile;
@@ -71,6 +74,10 @@ import static org.junit.Assert.fail;
public class TestReplicationOptimisedBootstrap extends
BaseReplicationScenariosAcidTables {
String extraPrimaryDb;
+ HiveConf primaryConf;
+ TxnStore txnHandler;
+ List<Long> tearDownTxns = new ArrayList<>();
+ List<Long> tearDownLockIds = new ArrayList<>();
@BeforeClass
public static void classLevelSetup() throws Exception {
@@ -90,10 +97,19 @@ public class TestReplicationOptimisedBootstrap extends
BaseReplicationScenariosA
public void setup() throws Throwable {
super.setup();
extraPrimaryDb = "extra_" + primaryDbName;
+ primaryConf = primary.getConf();
+ txnHandler = TxnUtils.getTxnStore(primary.getConf());
}
@After
public void tearDown() throws Throwable {
+ if (!tearDownTxns.isEmpty()) {
+ //Abort the left out transactions which might not be completed due to
some test failures.
+ txnHandler.abortTxns(new AbortTxnsRequest(tearDownTxns));
+ }
+ //Release the unreleased locks acquired during tests. Although, we
specifically release the locks when not required.
+ //But there may be case when test failed and locks are left in dangling
state.
+ releaseLocks(txnHandler, tearDownLockIds);
primary.run("drop database if exists " + extraPrimaryDb + " cascade");
super.tearDown();
}
@@ -468,47 +484,56 @@ public class TestReplicationOptimisedBootstrap extends
BaseReplicationScenariosA
@Test
public void testReverseBootstrap() throws Throwable {
- HiveConf primaryConf = primary.getConf();
- TxnStore txnHandler = TxnUtils.getTxnStore(primary.getConf());
List<String> withClause = setUpFirstIterForOptimisedBootstrap();
// Open 3 txns for Database which is not under replication
int numTxnsForSecDb = 3;
List<Long> txnsForSecDb = openTxns(numTxnsForSecDb, txnHandler,
primaryConf);
+ tearDownTxns.addAll(txnsForSecDb);
Map<String, Long> tablesInSecDb = new HashMap<>();
- tablesInSecDb.put("t1", (long) numTxnsForSecDb);
- tablesInSecDb.put("t2", (long) numTxnsForSecDb);
+ tablesInSecDb.put("t1", (long) numTxnsForSecDb + 4);
+ tablesInSecDb.put("t2", (long) numTxnsForSecDb + 4);
List<Long> lockIdsForSecDb =
allocateWriteIdsForTablesAndAcquireLocks(primaryDbName + "_extra",
tablesInSecDb, txnHandler, txnsForSecDb, primaryConf);
+ tearDownLockIds.addAll(lockIdsForSecDb);
//Open 2 txns for Primary Db
int numTxnsForPrimaryDb = 2;
List<Long> txnsForSourceDb = openTxns(numTxnsForPrimaryDb, txnHandler,
primaryConf);
+ tearDownTxns.addAll(txnsForSourceDb);
// Allocate write ids for both tables of source database.
Map<String, Long> tablesInSourceDb = new HashMap<>();
- tablesInSourceDb.put("t1", (long) numTxnsForPrimaryDb + 4);
+ tablesInSourceDb.put("t1", (long) numTxnsForPrimaryDb + 6);
tablesInSourceDb.put("t2", (long) numTxnsForPrimaryDb);
- allocateWriteIdsForTablesAndAcquireLocks(replicatedDbName,
tablesInSourceDb, txnHandler,
+ List<Long> lockIdsForSourceDb =
allocateWriteIdsForTablesAndAcquireLocks(replicatedDbName, tablesInSourceDb,
txnHandler,
txnsForSourceDb, replica.getConf());
+ tearDownLockIds.addAll(lockIdsForSourceDb);
//Open 1 txn with no hive locks acquired
List<Long> txnsWithNoLocks = openTxns(1, txnHandler, primaryConf);
+ tearDownTxns.addAll(txnsWithNoLocks);
// Do a reverse second dump, this should do a bootstrap dump for the
tables in the table_diff and incremental for
// rest.
+ List<Long> allReplCreatedTxnsOnSource = getReplCreatedTxns();
+ tearDownTxns.addAll(allReplCreatedTxnsOnSource);
assertTrue("value1".equals(primary.getDatabase(primaryDbName).getParameters().get("key1")));
WarehouseInstance.Tuple tuple = replica.dump(replicatedDbName, withClause);
+ verifyAllOpenTxnsAborted(allReplCreatedTxnsOnSource, primaryConf);
+
//Verify that openTxns for sourceDb were aborted before proceeding with
bootstrap dump.
verifyAllOpenTxnsAborted(txnsForSourceDb, primaryConf);
verifyAllOpenTxnsNotAborted(txnsForSecDb, primaryConf);
verifyAllOpenTxnsNotAborted(txnsWithNoLocks, primaryConf);
txnHandler.abortTxns(new AbortTxnsRequest(txnsForSecDb));
+ txnHandler.abortTxns(new AbortTxnsRequest(txnsForSecDb));
txnHandler.abortTxns(new AbortTxnsRequest(txnsWithNoLocks));
releaseLocks(txnHandler, lockIdsForSecDb);
+ releaseLocks(txnHandler, lockIdsForSecDb);
String hiveDumpDir = tuple.dumpLocation + File.separator +
ReplUtils.REPL_HIVE_BASE_DIR;
// _bootstrap directory should be created as bootstrap enabled on external
tables.
@@ -829,6 +854,35 @@ public class TestReplicationOptimisedBootstrap extends
BaseReplicationScenariosA
primary.dump(primaryDbName, withClause);
replica.load(replicatedDbName, primaryDbName, withClause);
+ // Open 3 txns for Database which is not under replication
+ int numTxnsForSecDb = 3;
+ List<Long> txnsForSecDb = openTxns(numTxnsForSecDb, txnHandler,
primaryConf);
+ tearDownTxns.addAll(txnsForSecDb);
+
+ Map<String, Long> tablesInSecDb = new HashMap<>();
+ tablesInSecDb.put("t1", (long) numTxnsForSecDb);
+ tablesInSecDb.put("t2", (long) numTxnsForSecDb);
+ List<Long> lockIdsForSecDb =
allocateWriteIdsForTablesAndAcquireLocks(primaryDbName + "_extra",
+ tablesInSecDb, txnHandler, txnsForSecDb, primaryConf);
+ tearDownLockIds.addAll(lockIdsForSecDb);
+
+ //Open 2 txns for Primary Db
+ int numTxnsForPrimaryDb = 2;
+ List<Long> txnsForSourceDb = openTxns(numTxnsForPrimaryDb, txnHandler,
primaryConf);
+ tearDownTxns.addAll(txnsForSourceDb);
+
+ // Allocate write ids for both tables of source database.
+ Map<String, Long> tablesInSourceDb = new HashMap<>();
+ tablesInSourceDb.put("t1", (long) numTxnsForPrimaryDb);
+ tablesInSourceDb.put("t5", (long) numTxnsForPrimaryDb);
+ List<Long> lockIdsForSourceDb =
allocateWriteIdsForTablesAndAcquireLocks(primaryDbName, tablesInSourceDb,
txnHandler,
+ txnsForSourceDb, primary.getConf());
+ tearDownLockIds.addAll(lockIdsForSourceDb);
+
+ //Open 1 txn with no hive locks acquired
+ List<Long> txnsWithNoLocks = openTxns(1, txnHandler, primaryConf);
+ tearDownTxns.addAll(txnsWithNoLocks);
+
// Create 4 managed tables and do a dump & load.
WarehouseInstance.Tuple tuple =
primary.run("use " + primaryDbName)
@@ -850,6 +904,49 @@ public class TestReplicationOptimisedBootstrap extends
BaseReplicationScenariosA
.verifyResult("t1").run("show tables like
't2'").verifyResult("t2").run("show tables like 't3'")
.verifyResult("t3").run("show tables like
't4'").verifyResult("t4").verifyReplTargetProperty(replicatedDbName);
+ String forwardReplPolicy = HiveUtils.getReplPolicy(replicatedDbName);
+ List<Long> targetReplCreatedTxnIds = new ArrayList<>();
+ for (Long txn: txnsForSecDb) {
+ targetReplCreatedTxnIds.add(txnHandler.getTargetTxnId(forwardReplPolicy,
txn));
+ }
+ for (Long txn: txnsForSourceDb) {
+ targetReplCreatedTxnIds.add(txnHandler.getTargetTxnId(forwardReplPolicy,
txn));
+ }
+ for (Long txn: txnsWithNoLocks) {
+ targetReplCreatedTxnIds.add(txnHandler.getTargetTxnId(forwardReplPolicy,
txn));
+ }
+
+ verifyAllOpenTxnsNotAborted(targetReplCreatedTxnIds, primaryConf);
+
+ //Open New transactions on original source cluster post it went down.
+
+ // Open 1 txn for secondary Database
+ List<Long> newTxnsForSecDb = openTxns(1, txnHandler, primaryConf);
+ tearDownTxns.addAll(newTxnsForSecDb);
+
+ Map<String, Long> newTablesForSecDb = new HashMap<>();
+ newTablesForSecDb.put("t1", (long) numTxnsForSecDb + 1);
+ newTablesForSecDb.put("t2", (long) numTxnsForSecDb + 1);
+ List<Long> newLockIdsForSecDb =
allocateWriteIdsForTablesAndAcquireLocks(primaryDbName + "_extra",
+ newTablesForSecDb, txnHandler, newTxnsForSecDb, primaryConf);
+ tearDownLockIds.addAll(newLockIdsForSecDb);
+
+ //Open 1 txn for Primary Db
+ List<Long> newTxnsForSourceDb = openTxns(1, txnHandler, primaryConf);
+ tearDownTxns.addAll(newTxnsForSourceDb);
+
+ // Allocate write ids for both tables of source database.
+ Map<String, Long> newTablesInSourceDb = new HashMap<>();
+ newTablesInSourceDb.put("t1", (long) 5);
+ newTablesInSourceDb.put("t5", (long) 3);
+ List<Long> newLockIdsForSourceDb =
allocateWriteIdsForTablesAndAcquireLocks(primaryDbName, newTablesInSourceDb,
txnHandler,
+ newTxnsForSourceDb, primary.getConf());
+ tearDownLockIds.addAll(newLockIdsForSourceDb);
+
+ //Open 1 txn with no hive locks acquired
+ List<Long> newTxnsWithNoLock = openTxns(1, txnHandler, primaryConf);
+ tearDownTxns.addAll(newTxnsWithNoLock);
+
// Do some modifications on original source cluster. The diff
becomes(tnew_managed, t1, t2, t3)
primary.run("use " + primaryDbName).run("create table tnew_managed (id
int) clustered by(id) into 3 buckets " +
"stored as orc tblproperties (\"transactional\"=\"true\")")
@@ -886,14 +983,44 @@ public class TestReplicationOptimisedBootstrap extends
BaseReplicationScenariosA
// Do a load, this should create a table_diff_complete directory
primary.load(primaryDbName, replicatedDbName, withClause);
+ verifyAllOpenTxnsAborted(txnsForSourceDb, primaryConf);
+ verifyAllOpenTxnsNotAborted(txnsForSecDb, primaryConf);
+ verifyAllOpenTxnsNotAborted(txnsWithNoLocks, primaryConf);
+ verifyAllOpenTxnsAborted(newTxnsForSourceDb, primaryConf);
+ verifyAllOpenTxnsNotAborted(newTxnsForSecDb, primaryConf);
+ verifyAllOpenTxnsNotAborted(newTxnsWithNoLock, primaryConf);
+
+ txnHandler.abortTxns(new AbortTxnsRequest(txnsForSecDb));
+ releaseLocks(txnHandler, lockIdsForSecDb);
+ txnHandler.abortTxns(new AbortTxnsRequest(txnsWithNoLocks));
+ txnHandler.abortTxns(new AbortTxnsRequest(newTxnsForSecDb));
+ releaseLocks(txnHandler, newLockIdsForSecDb);
+ txnHandler.abortTxns(new AbortTxnsRequest(newTxnsWithNoLock));
+
// Check the table diff directory exist.
assertTrue(new Path(tuple.dumpLocation,
TABLE_DIFF_COMPLETE_DIRECTORY).toString() + " doesn't exist",
replicaFs.exists(new Path(tuple.dumpLocation,
TABLE_DIFF_COMPLETE_DIRECTORY)));
+ assertTrue(new Path(tuple.dumpLocation,
OptimisedBootstrapUtils.ABORT_TXNS_FILE).toString() + " doesn't exist",
+ replicaFs.exists(new Path(tuple.dumpLocation,
OptimisedBootstrapUtils.ABORT_TXNS_FILE)));
+
+ List<Long> txnsInAbortTxnFile = OptimisedBootstrapUtils.
+ getTxnIdFromAbortTxnsFile(new Path(tuple.dumpLocation),
primaryConf);
+ assertTrue (txnsInAbortTxnFile.containsAll(txnsForSourceDb));
+ assertTrue (txnsInAbortTxnFile.containsAll(txnsForSecDb));
+ assertTrue (txnsInAbortTxnFile.containsAll(txnsWithNoLocks));
+ assertEquals (txnsInAbortTxnFile.size(), txnsForSecDb.size() +
txnsForSourceDb.size() + txnsWithNoLocks.size());
+
// Check the table diff has all the modified table, including the dropped
and empty ones
HashSet<String> tableDiffEntries = getTablesFromTableDiffFile(dumpPath,
conf);
assertTrue("Table Diff Contains " + tableDiffEntries,
tableDiffEntries.containsAll(Arrays.asList("tnew_managed", "t1", "t2",
"t3")));
return withClause;
}
+
+ List<Long> getReplCreatedTxns() throws MetaException {
+ List<TxnType> excludedTxns = Arrays.asList(TxnType.DEFAULT,
TxnType.READ_ONLY, TxnType.COMPACTION,
+ TxnType.MATER_VIEW_REBUILD, TxnType.SOFT_DELETE);
+ return txnHandler.getOpenTxns(excludedTxns).getOpen_txns();
+ }
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/OptimisedBootstrapUtils.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/OptimisedBootstrapUtils.java
index f3aa5302832..7074226e14f 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/OptimisedBootstrapUtils.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/OptimisedBootstrapUtils.java
@@ -29,6 +29,11 @@ import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import
org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
+import org.apache.hadoop.hive.metastore.messaging.MessageBuilder;
+import org.apache.hadoop.hive.metastore.messaging.OpenTxnMessage;
+import org.apache.hadoop.hive.metastore.messaging.CommitTxnMessage;
+import org.apache.hadoop.hive.metastore.messaging.AbortTxnMessage;
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -68,6 +73,9 @@ public class OptimisedBootstrapUtils {
/** table diff directory when complete */
public static final String TABLE_DIFF_COMPLETE_DIRECTORY =
"table_diff_complete";
+ /** abort Txns file which contains all the txns that needs to be aborted on
new source cluster(initial target)*/
+ public static final String ABORT_TXNS_FILE = "abort_txns";
+
/** event ack file which contains the event id till which the cluster was
last loaded. */
public static final String EVENT_ACK_FILE = "event_ack";
@@ -90,6 +98,63 @@ public class OptimisedBootstrapUtils {
return fs.exists(new Path(dumpPath, fileName));
}
+ public static void prepareAbortTxnsFile(List<NotificationEvent>
notificationEvents, Set<Long> allOpenTxns,
+ Path dumpPath, HiveConf conf) throws
SemanticException {
+ if (notificationEvents.size() == 0) {
+ return;
+ }
+ Set<Long> txnsOpenedPostCurrEventId = new HashSet<>();
+ MessageDeserializer deserializer =
ReplUtils.getEventDeserializer(notificationEvents.get(0));
+ for (NotificationEvent event: notificationEvents) {
+ switch (event.getEventType()) {
+ case MessageBuilder.OPEN_TXN_EVENT:
+ OpenTxnMessage openTxnMessage =
deserializer.getOpenTxnMessage(event.getMessage());
+ txnsOpenedPostCurrEventId.addAll(openTxnMessage.getTxnIds());
+ allOpenTxns.removeAll(openTxnMessage.getTxnIds());
+ break;
+ case MessageBuilder.ABORT_TXN_EVENT:
+ AbortTxnMessage abortTxnMessage =
deserializer.getAbortTxnMessage(event.getMessage());
+ if (!txnsOpenedPostCurrEventId.contains(abortTxnMessage.getTxnId()))
{
+ allOpenTxns.add(abortTxnMessage.getTxnId());
+ }
+ break;
+ case MessageBuilder.COMMIT_TXN_EVENT:
+ CommitTxnMessage commitTxnMessage =
deserializer.getCommitTxnMessage(event.getMessage());
+ if
(!txnsOpenedPostCurrEventId.contains(commitTxnMessage.getTxnId())) {
+ allOpenTxns.add(commitTxnMessage.getTxnId());
+ }
+ break;
+ }
+ }
+ if (!allOpenTxns.isEmpty()) {
+ Utils.writeOutput(flattenListToString(allOpenTxns), new Path(dumpPath,
ABORT_TXNS_FILE), conf);
+ }
+ }
+
+ public static List<Long> getTxnIdFromAbortTxnsFile(Path dumpPath, HiveConf
conf) throws IOException {
+ String input;
+ Path abortTxnFile = new Path(dumpPath, ABORT_TXNS_FILE);
+ FileSystem fs = abortTxnFile.getFileSystem(conf);
+ try (FSDataInputStream stream = fs.open(abortTxnFile);) {
+ input = IOUtils.toString(stream, Charset.defaultCharset());
+ }
+ return unflattenListFromString(input);
+ }
+
+ private static String flattenListToString(Set<Long> list) {
+ return list.stream()
+ .map(Object::toString)
+ .collect(Collectors.joining(FILE_ENTRY_SEPARATOR));
+ }
+
+ private static List<Long> unflattenListFromString(String input) {
+ List<Long> ret = new ArrayList<>();
+ for (String val : input.replaceAll(System.lineSeparator(),
"").trim().split(FILE_ENTRY_SEPARATOR)) {
+ ret.add(Long.parseLong(val));
+ }
+ return ret;
+ }
+
/**
* Gets the source & target event id from the event ack file
* @param dumpPath the dump path
@@ -201,19 +266,17 @@ public class OptimisedBootstrapUtils {
}
/**
- * Prepares the table diff file, with tables modified post the specified
event id.
- * @param eventId the event id after which tables should be modified
+ * Returns list of notificationEvents starting from eventId that are related
to the database.
+ * @param eventId Starting eventId
* @param hiveDb the hive object
* @param work the load work
- * @param conf hive configuration
* @throws Exception
*/
- public static void prepareTableDiffFile(Long eventId, Hive hiveDb,
ReplLoadWork work, HiveConf conf)
- throws Exception {
- // Get the notification events.
+ public static List<NotificationEvent> getListOfNotificationEvents(Long
eventId, Hive hiveDb,
+
ReplLoadWork work) throws Exception {
List<NotificationEvent> notificationEvents =
- hiveDb.getMSC().getNextNotification(eventId - 1, -1, new
DatabaseAndTableFilter(work.dbNameToLoadIn, null))
- .getEvents();
+ hiveDb.getMSC().getNextNotification(eventId - 1, -1,
+ new DatabaseAndTableFilter(work.dbNameToLoadIn, null)).getEvents();
// Check the first eventId fetched is the same as what we fed, to ensure
the events post that hasn't expired.
if (notificationEvents.get(0).getEventId() != eventId) {
@@ -222,6 +285,19 @@ public class OptimisedBootstrapUtils {
// Remove the first one, it is already loaded, we fetched it to confirm
the notification events post that haven't
// expired.
notificationEvents.remove(0);
+ return notificationEvents;
+ }
+
+ /**
+ * Prepares the table diff file, with tables modified post the specified
event id.
+ * @param notificationEvents Events that can possibly contain table DDL/DML
metadata.
+ * @param hiveDb the hive object
+ * @param work the load work
+ * @param conf hive configuration
+ * @throws Exception
+ */
+ public static void prepareTableDiffFile(List<NotificationEvent>
notificationEvents, Hive hiveDb,
+ ReplLoadWork work, HiveConf conf)
throws Exception {
HashSet<String> modifiedTables = new HashSet<>();
for (NotificationEvent event : notificationEvents) {
String tableName = event.getTableName();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index 667ede3ca74..ee33debe41a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -39,7 +39,6 @@ import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.metastore.api.SQLAllTableConstraints;
import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
-import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
import org.apache.hadoop.hive.metastore.api.TxnType;
import org.apache.hadoop.hive.metastore.messaging.event.filters.AndFilter;
import org.apache.hadoop.hive.metastore.messaging.event.filters.CatalogFilter;
@@ -61,10 +60,12 @@ import org.apache.hadoop.hive.ql.exec.util.Retryable;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.lockmgr.DbLockManager;
import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager;
+import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
import org.apache.hadoop.hive.ql.lockmgr.LockException;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
+import org.apache.hadoop.hive.ql.metadata.HiveUtils;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.metadata.events.EventUtils;
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec;
@@ -142,6 +143,7 @@ import static
org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.isFirs
import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT;
import static
org.apache.hadoop.hive.ql.exec.repl.ReplAck.NON_RECOVERABLE_MARKER;
import static
org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.RANGER_AUTHORIZER;
+import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.getOpenTxns;
import static
org.apache.hadoop.hive.ql.exec.repl.util.SnapshotUtils.cleanupSnapshots;
import static org.apache.hadoop.hive.ql.exec.repl.util.SnapshotUtils.getDFS;
import static
org.apache.hadoop.hive.ql.exec.repl.util.SnapshotUtils.getListFromFileList;
@@ -253,7 +255,9 @@ public class ReplDumpTask extends Task<ReplDumpWork>
implements Serializable {
} else {
// We should be here only if TableDiff is Present.
boolean isTableDiffDirectoryPresent =
- checkFileExists(previousValidHiveDumpPath.getParent(), conf,
TABLE_DIFF_COMPLETE_DIRECTORY);
+ checkFileExists(previousValidHiveDumpPath.getParent(),
conf, TABLE_DIFF_COMPLETE_DIRECTORY);
+ boolean isAbortTxnsListPresent =
+ checkFileExists(previousValidHiveDumpPath.getParent(),
conf, OptimisedBootstrapUtils.ABORT_TXNS_FILE);
assert isTableDiffDirectoryPresent;
@@ -267,6 +271,9 @@ public class ReplDumpTask extends Task<ReplDumpWork>
implements Serializable {
// Get the tables to be bootstrapped from the table diff
tablesForBootstrap =
getTablesFromTableDiffFile(previousValidHiveDumpPath.getParent(), conf);
+ if (isAbortTxnsListPresent) {
+
abortReplCreatedTxnsPriorToFailover(previousValidHiveDumpPath.getParent(),
conf);
+ }
// Generate the bootstrapped table list and put it in the new
dump directory for the load to consume.
createBootstrapTableList(currentDumpPath, tablesForBootstrap,
conf);
@@ -327,6 +334,16 @@ public class ReplDumpTask extends Task<ReplDumpWork>
implements Serializable {
return 0;
}
+ private void abortReplCreatedTxnsPriorToFailover(Path dumpPath, HiveConf
conf) throws LockException, IOException {
+ List<Long> replCreatedTxnsToAbort =
OptimisedBootstrapUtils.getTxnIdFromAbortTxnsFile(dumpPath, conf);
+ String replPolicy = HiveUtils.getReplPolicy(work.dbNameOrPattern);
+ HiveTxnManager hiveTxnManager = getTxnMgr();
+ for (Long txnId : replCreatedTxnsToAbort) {
+ LOG.info("Rolling back Repl_Created txns:" +
replCreatedTxnsToAbort.toString() + " opened prior to failover.");
+ hiveTxnManager.replRollbackTxn(replPolicy, txnId);
+ }
+ }
+
private void preProcessFailoverIfRequired(Path previousValidHiveDumpDir,
boolean isPrevFailoverReadyMarkerPresent)
throws HiveException, IOException {
FileSystem fs = previousValidHiveDumpDir.getFileSystem(conf);
@@ -751,7 +768,8 @@ public class ReplDumpTask extends Task<ReplDumpWork>
implements Serializable {
work.setFailoverMetadata(fmd);
return;
}
- List<Long> txnsForDb = getOpenTxns(getTxnMgr().getValidTxns(excludedTxns),
work.dbNameOrPattern);
+ HiveTxnManager hiveTxnManager = getTxnMgr();
+ List<Long> txnsForDb = getOpenTxns(hiveTxnManager,
hiveTxnManager.getValidTxns(excludedTxns), work.dbNameOrPattern);
if (!txnsForDb.isEmpty()) {
LOG.debug("Going to abort transactions: {} for database: {}.",
txnsForDb, work.dbNameOrPattern);
hiveDb.abortTransactions(txnsForDb);
@@ -762,7 +780,7 @@ public class ReplDumpTask extends Task<ReplDumpWork>
implements Serializable {
List<Long> openTxns = getOpenTxns(allValidTxns);
fmd.setOpenTxns(openTxns);
fmd.setTxnsWithoutLock(getTxnsNotPresentInHiveLocksTable(openTxns));
- txnsForDb = getOpenTxns(allValidTxns, work.dbNameOrPattern);
+ txnsForDb = getOpenTxns(hiveTxnManager, allValidTxns,
work.dbNameOrPattern);
if (!txnsForDb.isEmpty()) {
LOG.debug("Going to abort transactions: {} for database: {}.",
txnsForDb, work.dbNameOrPattern);
hiveDb.abortTransactions(txnsForDb);
@@ -1483,33 +1501,6 @@ public class ReplDumpTask extends Task<ReplDumpWork>
implements Serializable {
return !showLocksResponse.getLocks().isEmpty();
}
- List<Long> getOpenTxns(ValidTxnList validTxnList, String dbName) throws
LockException {
- HiveLockManager lockManager = getTxnMgr().getLockManager();
- long[] invalidTxns = validTxnList.getInvalidTransactions();
- List<Long> openTxns = new ArrayList<>();
- Set<Long> dbTxns = new HashSet<>();
- if (lockManager instanceof DbLockManager) {
- ShowLocksRequest request = new ShowLocksRequest();
- request.setDbname(dbName.toLowerCase());
- ShowLocksResponse showLocksResponse =
((DbLockManager)lockManager).getLocks(request);
- for (ShowLocksResponseElement showLocksResponseElement :
showLocksResponse.getLocks()) {
- dbTxns.add(showLocksResponseElement.getTxnid());
- }
- for (long invalidTxn : invalidTxns) {
- if (dbTxns.contains(invalidTxn) &&
!validTxnList.isTxnAborted(invalidTxn)) {
- openTxns.add(invalidTxn);
- }
- }
- } else {
- for (long invalidTxn : invalidTxns) {
- if (!validTxnList.isTxnAborted(invalidTxn)) {
- openTxns.add(invalidTxn);
- }
- }
- }
- return openTxns;
- }
-
// Get list of valid transactions for Repl Dump. Also wait for a given
amount of time for the
// open transactions to finish. Abort any open transactions after the wait
is over.
String getValidTxnListForReplDump(Hive hiveDb, long waitUntilTime) throws
HiveException {
@@ -1522,7 +1513,8 @@ public class ReplDumpTask extends Task<ReplDumpWork>
implements Serializable {
// of time to see if all open txns < current txn is getting
aborted/committed. If not, then
// we forcefully abort those txns just like AcidHouseKeeperService.
//Exclude readonly and repl created tranasactions
- ValidTxnList validTxnList = getTxnMgr().getValidTxns(excludedTxns);
+ HiveTxnManager hiveTxnManager = getTxnMgr();
+ ValidTxnList validTxnList = hiveTxnManager.getValidTxns(excludedTxns);
while (System.currentTimeMillis() < waitUntilTime) {
//check if no open txns at all
List<Long> openTxnListForAllDbs = getOpenTxns(validTxnList);
@@ -1537,7 +1529,7 @@ public class ReplDumpTask extends Task<ReplDumpWork>
implements Serializable {
if (getTxnsNotPresentInHiveLocksTable(openTxnListForAllDbs).isEmpty()) {
//If all open txns have been inserted in the hive locks table, we just
need to check for the db under replication
// If there are no txns which are open for the given db under
replication, then just return it.
- if (getOpenTxns(validTxnList, work.dbNameOrPattern).isEmpty()) {
+ if (getOpenTxns(hiveTxnManager, validTxnList,
work.dbNameOrPattern).isEmpty()) {
return validTxnList.toString();
}
}
@@ -1547,17 +1539,17 @@ public class ReplDumpTask extends Task<ReplDumpWork>
implements Serializable {
} catch (InterruptedException e) {
LOG.info("REPL DUMP thread sleep interrupted", e);
}
- validTxnList = getTxnMgr().getValidTxns(excludedTxns);
+ validTxnList = hiveTxnManager.getValidTxns(excludedTxns);
}
// After the timeout just force abort the open txns
if (conf.getBoolVar(REPL_BOOTSTRAP_DUMP_ABORT_WRITE_TXN_AFTER_TIMEOUT)) {
- List<Long> openTxns = getOpenTxns(validTxnList, work.dbNameOrPattern);
+ List<Long> openTxns = getOpenTxns(hiveTxnManager, validTxnList,
work.dbNameOrPattern);
if (!openTxns.isEmpty()) {
//abort only write transactions for the db under replication if abort
transactions is enabled.
hiveDb.abortTransactions(openTxns);
- validTxnList = getTxnMgr().getValidTxns(excludedTxns);
- openTxns = getOpenTxns(validTxnList, work.dbNameOrPattern);
+ validTxnList = hiveTxnManager.getValidTxns(excludedTxns);
+ openTxns = getOpenTxns(hiveTxnManager, validTxnList,
work.dbNameOrPattern);
if (!openTxns.isEmpty()) {
LOG.warn("REPL DUMP unable to force abort all the open txns: {}
after timeout due to unknown reasons. " +
"However, this is rare case that shouldn't happen.", openTxns);
@@ -1577,17 +1569,6 @@ public class ReplDumpTask extends Task<ReplDumpWork>
implements Serializable {
|| conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST_REPL)) ?
SLEEP_TIME_FOR_TESTS : SLEEP_TIME;
}
- private List<Long> getOpenTxns(ValidTxnList validTxnList) {
- long[] invalidTxns = validTxnList.getInvalidTransactions();
- List<Long> openTxns = new ArrayList<>();
- for (long invalidTxn : invalidTxns) {
- if (!validTxnList.isTxnAborted(invalidTxn)) {
- openTxns.add(invalidTxn);
- }
- }
- return openTxns;
- }
-
private ReplicationSpec getNewReplicationSpec(String evState, String
objState,
boolean isMetadataOnly) {
return new ReplicationSpec(true, isMetadataOnly, evState, objState, false,
true);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
index 2ef04e2a306..bcef4fae57f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
@@ -17,13 +17,17 @@
*/
package org.apache.hadoop.hive.ql.exec.repl;
+import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.repl.ReplConst;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.hive.metastore.ReplChangeManager;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.api.TxnType;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import
org.apache.hadoop.hive.ql.ddl.database.alter.owner.AlterDatabaseSetOwnerDesc;
import org.apache.hadoop.hive.ql.ddl.privilege.PrincipalDesc;
import org.apache.hadoop.hive.ql.exec.repl.util.SnapshotUtils;
+import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
import org.apache.hadoop.hive.ql.parse.repl.load.log.IncrementalLoadLogger;
import org.apache.thrift.TException;
import com.google.common.collect.Collections2;
@@ -89,6 +93,8 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.LinkedList;
+import java.util.Arrays;
+import java.util.Set;
import static
org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_DUMP_SKIP_IMMUTABLE_DATA_COPY;
import static
org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_SNAPSHOT_DIFF_FOR_EXTERNAL_TABLE_COPY;
@@ -107,6 +113,7 @@ public class ReplLoadTask extends Task<ReplLoadWork>
implements Serializable {
private static final long serialVersionUID = 1L;
private final static int ZERO_TASKS = 0;
private final String STAGE_NAME = "REPL_LOAD";
+ private List<TxnType> excludedTxns = Arrays.asList(TxnType.READ_ONLY,
TxnType.REPL_CREATED);
@Override
public String getName() {
@@ -724,9 +731,23 @@ public class ReplLoadTask extends Task<ReplLoadWork>
implements Serializable {
}
boolean isTableDiffPresent =
checkFileExists(new Path(work.dumpDirectory).getParent(), conf,
TABLE_DIFF_COMPLETE_DIRECTORY);
+ boolean isAbortTxnsListPresent =
+ checkFileExists(new Path(work.dumpDirectory).getParent(), conf,
OptimisedBootstrapUtils.ABORT_TXNS_FILE);
+ Long eventId = Long.parseLong(getEventIdFromFile(new
Path(work.dumpDirectory).getParent(), conf)[0]);
+ List<NotificationEvent> notificationEvents =
OptimisedBootstrapUtils.getListOfNotificationEvents(eventId, getHive(), work);
+ if (!isAbortTxnsListPresent) {
+ //Abort the ongoing transactions(opened prior to failover) for the
target database.
+ HiveTxnManager hiveTxnManager = getTxnMgr();
+ ValidTxnList validTxnList = hiveTxnManager.getValidTxns(excludedTxns);
+ Set<Long> allOpenTxns = new
HashSet<>(ReplUtils.getOpenTxns(validTxnList));
+ abortOpenTxnsForDatabase(hiveTxnManager, validTxnList,
work.dbNameToLoadIn, getHive());
+ //Re-fetch the list of notification events post failover eventId.
+ notificationEvents =
OptimisedBootstrapUtils.getListOfNotificationEvents(eventId, getHive(), work);
+ OptimisedBootstrapUtils.prepareAbortTxnsFile(notificationEvents,
allOpenTxns,
+ new Path(work.dumpDirectory).getParent(), conf);
+ }
if (!isTableDiffPresent) {
- Long eventId = Long.parseLong(getEventIdFromFile(new
Path(work.dumpDirectory).getParent(), conf)[0]);
- prepareTableDiffFile(eventId, getHive(), work, conf);
+ prepareTableDiffFile(notificationEvents, getHive(), work, conf);
}
if (this.childTasks == null) {
this.childTasks = new ArrayList<>();
@@ -849,6 +870,22 @@ public class ReplLoadTask extends Task<ReplLoadWork>
implements Serializable {
return 0;
}
+ private void abortOpenTxnsForDatabase(HiveTxnManager hiveTxnManager,
ValidTxnList validTxnList, String dbName,
+ Hive hiveDb) throws HiveException {
+ List<Long> openTxns = ReplUtils.getOpenTxns(hiveTxnManager, validTxnList,
dbName);
+ if (!openTxns.isEmpty()) {
+ LOG.info("Rolling back write txns:" + openTxns.toString() + " for the
database: " + dbName);
+ //abort only write transactions for the current database if abort
transactions is enabled.
+ hiveDb.abortTransactions(openTxns);
+ validTxnList = hiveTxnManager.getValidTxns(excludedTxns);
+ openTxns = ReplUtils.getOpenTxns(hiveTxnManager, validTxnList, dbName);
+ if (!openTxns.isEmpty()) {
+ LOG.warn("Unable to force abort all the open txns: {}.", openTxns);
+ throw new IllegalStateException("Failover triggered abort txns request
failed for unknown reasons.");
+ }
+ }
+ }
+
private Database getSourceDbMetadata() throws IOException, SemanticException
{
Path dbMetadata = new Path(work.dumpDirectory,
EximUtil.METADATA_PATH_NAME);
BootstrapEventsIterator itr = new
BootstrapEventsIterator(dbMetadata.toString(), work.dbNameToLoadIn,
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
index d059e6c2d5b..f0330e021e5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hdfs.protocol.SnapshotException;
import org.apache.hadoop.hive.common.TableName;
+import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.repl.ReplConst;
import org.apache.hadoop.hive.common.repl.ReplScope;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -31,6 +32,12 @@ import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
+import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
+import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
import org.apache.hadoop.hive.metastore.utils.StringUtils;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.ddl.DDLWork;
@@ -42,6 +49,10 @@ import org.apache.hadoop.hive.ql.exec.repl.ReplAck;
import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork;
import org.apache.hadoop.hive.ql.exec.util.DAGTraversal;
import org.apache.hadoop.hive.ql.exec.util.Retryable;
+import org.apache.hadoop.hive.ql.lockmgr.DbLockManager;
+import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager;
+import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
+import org.apache.hadoop.hive.ql.lockmgr.LockException;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Table;
@@ -78,6 +89,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Base64;
+import java.util.Set;
import static
org.apache.hadoop.hive.conf.Constants.SCHEDULED_QUERY_EXECUTIONID;
import static
org.apache.hadoop.hive.conf.Constants.SCHEDULED_QUERY_SCHEDULENAME;
@@ -358,6 +370,55 @@ public class ReplUtils {
return parameters != null &&
ReplConst.TRUE.equalsIgnoreCase(parameters.get(ReplConst.REPL_FIRST_INC_PENDING_FLAG));
}
+ public static List<Long> getOpenTxns(ValidTxnList validTxnList) {
+ long[] invalidTxns = validTxnList.getInvalidTransactions();
+ List<Long> openTxns = new ArrayList<>();
+ for (long invalidTxn : invalidTxns) {
+ if (!validTxnList.isTxnAborted(invalidTxn)) {
+ openTxns.add(invalidTxn);
+ }
+ }
+ return openTxns;
+ }
+
+ public static List<Long> getOpenTxns(HiveTxnManager hiveTxnManager,
ValidTxnList validTxnList, String dbName) throws LockException {
+ HiveLockManager lockManager = hiveTxnManager.getLockManager();
+ long[] invalidTxns = validTxnList.getInvalidTransactions();
+ List<Long> openTxns = new ArrayList<>();
+ Set<Long> dbTxns = new HashSet<>();
+ if (lockManager instanceof DbLockManager) {
+ ShowLocksRequest request = new ShowLocksRequest();
+ request.setDbname(dbName.toLowerCase());
+ ShowLocksResponse showLocksResponse =
((DbLockManager)lockManager).getLocks(request);
+ for (ShowLocksResponseElement showLocksResponseElement :
showLocksResponse.getLocks()) {
+ dbTxns.add(showLocksResponseElement.getTxnid());
+ }
+ for (long invalidTxn : invalidTxns) {
+ if (dbTxns.contains(invalidTxn) &&
!validTxnList.isTxnAborted(invalidTxn)) {
+ openTxns.add(invalidTxn);
+ }
+ }
+ } else {
+ for (long invalidTxn : invalidTxns) {
+ if (!validTxnList.isTxnAborted(invalidTxn)) {
+ openTxns.add(invalidTxn);
+ }
+ }
+ }
+ return openTxns;
+ }
+
+ public static MessageDeserializer getEventDeserializer(NotificationEvent
event) {
+ try {
+ return
MessageFactory.getInstance(event.getMessageFormat()).getDeserializer();
+ } catch (Exception e) {
+ String message =
+ "could not create appropriate messageFactory for format " +
event.getMessageFormat();
+ LOG.error(message, e);
+ throw new IllegalStateException(message, e);
+ }
+ }
+
public static EnvironmentContext
setReplDataLocationChangedFlag(EnvironmentContext envContext) {
if (envContext == null) {
envContext = new EnvironmentContext();
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java
index 6a59b2f2d8e..f488b8577f6 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java
@@ -26,8 +26,8 @@ import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.metastore.messaging.EventMessage;
import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
import org.apache.hadoop.hive.metastore.messaging.MessageEncoder;
-import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageEncoder;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.metadata.HiveFatalException;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
@@ -56,14 +56,7 @@ abstract class AbstractEventHandler<T extends EventMessage>
implements EventHand
AbstractEventHandler(NotificationEvent event) {
this.event = event;
- try {
- deserializer =
MessageFactory.getInstance(event.getMessageFormat()).getDeserializer();
- } catch (Exception e) {
- String message =
- "could not create appropriate messageFactory for format " +
event.getMessageFormat();
- LOG.error(message, e);
- throw new IllegalStateException(message, e);
- }
+ deserializer = ReplUtils.getEventDeserializer(event);
eventMessage = eventMessage(event.getMessage());
eventMessageAsJSON = eventMessageAsJSON(eventMessage);
}