Repository: hive Updated Branches: refs/heads/master 6cb5dbe64 -> bb1ee8167
HIVE-13753 : Make metastore client thread safe in DbTxnManager (Wei Zheng, reviewed by Vaibhav Gumashta) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/bb1ee816 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/bb1ee816 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/bb1ee816 Branch: refs/heads/master Commit: bb1ee8167006fb8ae7868502d95ebc31f6ea3dd5 Parents: 6cb5dbe Author: Wei Zheng <[email protected]> Authored: Mon May 16 10:24:39 2016 -0700 Committer: Wei Zheng <[email protected]> Committed: Mon May 16 10:24:39 2016 -0700 ---------------------------------------------------------------------- .../hadoop/hive/ql/lockmgr/DbLockManager.java | 4 +- .../hadoop/hive/ql/lockmgr/DbTxnManager.java | 50 +++++++++++++++++++- 2 files changed, 50 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/bb1ee816/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java index 2804514..b4ae1d1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java @@ -54,11 +54,11 @@ public class DbLockManager implements HiveLockManager{ private long MAX_SLEEP; //longer term we should always have a txn id and then we won't need to track locks here at all private Set<DbHiveLock> locks; - private IMetaStoreClient client; + private DbTxnManager.SynchronizedMetaStoreClient client; private long nextSleep = 50; private final HiveConf conf; - DbLockManager(IMetaStoreClient client, HiveConf conf) { + DbLockManager(DbTxnManager.SynchronizedMetaStoreClient client, HiveConf conf) { locks = new HashSet<>(); this.client = client; this.conf = conf; http://git-wip-us.apache.org/repos/asf/hive/blob/bb1ee816/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index 4539e71..9ab6169 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -62,7 +62,7 @@ public class DbTxnManager extends HiveTxnManagerImpl { static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME); private DbLockManager lockMgr = null; - private IMetaStoreClient client = null; + private SynchronizedMetaStoreClient client = null; /** * The Metastore NEXT_TXN_ID.NTXN_NEXT is initialized to 1; it contains the next available * transaction id. Thus is 1 is first transaction id. @@ -520,7 +520,7 @@ public class DbTxnManager extends HiveTxnManagerImpl { } try { Hive db = Hive.get(conf); - client = db.getMSC(); + client = new SynchronizedMetaStoreClient(db.getMSC()); initHeartbeatExecutorService(); } catch (MetaException e) { throw new LockException(ErrorMsg.METASTORE_COULD_NOT_INITIATE.getMsg(), e); @@ -615,4 +615,50 @@ public class DbTxnManager extends HiveTxnManagerImpl { } } } + + /** + * Synchronized MetaStoreClient wrapper + */ + final class SynchronizedMetaStoreClient { + private final IMetaStoreClient client; + SynchronizedMetaStoreClient(IMetaStoreClient client) { + this.client = client; + } + + synchronized long openTxn(String user) throws TException { + return client.openTxn(user); + } + + synchronized void commitTxn(long txnid) throws TException { + client.commitTxn(txnid); + } + + synchronized void rollbackTxn(long txnid) throws TException { + client.rollbackTxn(txnid); + } + + synchronized void heartbeat(long txnid, long lockid) throws TException { + client.heartbeat(txnid, lockid); + } + + synchronized ValidTxnList getValidTxns(long currentTxn) throws TException { + return client.getValidTxns(currentTxn); + } + + synchronized LockResponse lock(LockRequest request) throws TException { + return client.lock(request); + } + + synchronized LockResponse checkLock(long lockid) throws TException { + return client.checkLock(lockid); + } + + synchronized void unlock(long lockid) throws TException { + client.unlock(lockid); + } + + synchronized ShowLocksResponse showLocks(ShowLocksRequest showLocksRequest) throws TException { + return client.showLocks(showLocksRequest); + } + } }
