Repository: hive Updated Branches: refs/heads/branch-1 14ac6de6c -> d273fba8f
HIVE-13753 : Make metastore client thread safe in DbTxnManager (Wei Zheng, reviewed by Vaibhav Gumashta) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d273fba8 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d273fba8 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d273fba8 Branch: refs/heads/branch-1 Commit: d273fba8f54fcea887c2873ecf84c6cafe6d6aa6 Parents: 14ac6de Author: Wei Zheng <[email protected]> Authored: Mon May 16 10:24:39 2016 -0700 Committer: Wei Zheng <[email protected]> Committed: Mon May 16 10:26:19 2016 -0700 ---------------------------------------------------------------------- .../hadoop/hive/ql/lockmgr/DbLockManager.java | 4 +- .../hadoop/hive/ql/lockmgr/DbTxnManager.java | 50 +++++++++++++++++++- 2 files changed, 50 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/d273fba8/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java index ad4bd4c..089a48a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java @@ -50,11 +50,11 @@ public class DbLockManager implements HiveLockManager{ private long MAX_SLEEP; //longer term we should always have a txn id and then we won't need to track locks here at all private Set<DbHiveLock> locks; - private IMetaStoreClient client; + private DbTxnManager.SynchronizedMetaStoreClient client; private long nextSleep = 50; private final HiveConf conf; - DbLockManager(IMetaStoreClient client, HiveConf conf) { + DbLockManager(DbTxnManager.SynchronizedMetaStoreClient client, HiveConf conf) { locks = new HashSet<>(); this.client = client; this.conf = conf; http://git-wip-us.apache.org/repos/asf/hive/blob/d273fba8/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index b0f1362..21b0cb2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -61,7 +61,7 @@ public class DbTxnManager extends HiveTxnManagerImpl { static final private Log LOG = LogFactory.getLog(CLASS_NAME); private DbLockManager lockMgr = null; - private IMetaStoreClient client = null; + private SynchronizedMetaStoreClient client = null; /** * The Metastore NEXT_TXN_ID.NTXN_NEXT is initialized to 1; it contains the next available * transaction id. Thus is 1 is first transaction id. @@ -518,7 +518,7 @@ public class DbTxnManager extends HiveTxnManagerImpl { } try { Hive db = Hive.get(conf); - client = db.getMSC(); + client = new SynchronizedMetaStoreClient(db.getMSC()); initHeartbeatExecutorService(); } catch (MetaException e) { throw new LockException(ErrorMsg.METASTORE_COULD_NOT_INITIATE.getMsg(), e); @@ -613,4 +613,50 @@ public class DbTxnManager extends HiveTxnManagerImpl { } } } + + /** + * Synchronized MetaStoreClient wrapper + */ + final class SynchronizedMetaStoreClient { + private final IMetaStoreClient client; + SynchronizedMetaStoreClient(IMetaStoreClient client) { + this.client = client; + } + + synchronized long openTxn(String user) throws TException { + return client.openTxn(user); + } + + synchronized void commitTxn(long txnid) throws TException { + client.commitTxn(txnid); + } + + synchronized void rollbackTxn(long txnid) throws TException { + client.rollbackTxn(txnid); + } + + synchronized void heartbeat(long txnid, long lockid) throws TException { + client.heartbeat(txnid, lockid); + } + + synchronized ValidTxnList getValidTxns(long currentTxn) throws TException { + return client.getValidTxns(currentTxn); + } + + synchronized LockResponse lock(LockRequest request) throws TException { + return client.lock(request); + } + + synchronized LockResponse checkLock(long lockid) throws TException { + return client.checkLock(lockid); + } + + synchronized void unlock(long lockid) throws TException { + client.unlock(lockid); + } + + synchronized ShowLocksResponse showLocks(ShowLocksRequest showLocksRequest) throws TException { + return client.showLocks(showLocksRequest); + } + } }
