This is an automated email from the ASF dual-hosted git repository.
dengzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 18361875834 HIVE-28669: Deadlock found when TxnStoreMutex trying to
acquireLock (Zhihua Deng, reviewed by Raghav Aggarwal, Indhumathi
Muthumurugesh, Denys Kuzmenko)
18361875834 is described below
commit 18361875834951b78c65dd46d35b5a94d8c5dd63
Author: dengzh <[email protected]>
AuthorDate: Wed Jan 8 15:40:00 2025 +0800
HIVE-28669: Deadlock found when TxnStoreMutex trying to acquireLock (Zhihua
Deng, reviewed by Raghav Aggarwal, Indhumathi Muthumurugesh, Denys Kuzmenko)
Closes #5585
---
.../hadoop/hive/ql/txn/compactor/Cleaner.java | 15 +++---
.../hadoop/hive/ql/txn/compactor/Initiator.java | 20 +++----
.../hadoop/hive/metastore/MetastoreTaskThread.java | 8 +++
.../MaterializationsRebuildLockCleanerTask.java | 16 +++---
.../hadoop/hive/metastore/MetaStoreThread.java | 8 +++
.../hive/metastore/leader/CompactorTasks.java | 1 +
.../hive/metastore/leader/HouseKeepingTasks.java | 2 +
.../hive/metastore/leader/LeaderElection.java | 9 ++++
.../hive/metastore/leader/LeaseLeaderElection.java | 8 ++-
.../metastore/leader/StaticLeaderElection.java | 7 ++-
.../hive/metastore/leader/StatsUpdaterTask.java | 1 +
.../apache/hadoop/hive/metastore/txn/NoMutex.java | 62 ++++++++++++++++++++++
.../hadoop/hive/metastore/txn/TxnStoreMutex.java | 7 ++-
.../txn/jdbc/TransactionContextManager.java | 10 ++--
.../txn/service/AcidHouseKeeperService.java | 16 +++---
.../txn/service/AcidTxnCleanerService.java | 16 +++---
16 files changed, 163 insertions(+), 43 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
index 3493ea4d8da..6126f150e3a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
@@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hive.metastore.ReplChangeManager;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
+import org.apache.hadoop.hive.metastore.txn.NoMutex;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.ql.txn.compactor.handler.TaskHandler;
import org.apache.hadoop.hive.ql.txn.compactor.handler.TaskHandlerFactory;
@@ -46,6 +47,7 @@ public class Cleaner extends MetaStoreCompactorThread {
static final private String CLASS_NAME = Cleaner.class.getName();
static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
private boolean metricsEnabled = false;
+ private boolean shouldUseMutex = true;
private ExecutorService cleanerExecutor;
private List<TaskHandler> cleanupHandlers;
@@ -70,14 +72,13 @@ public class Cleaner extends MetaStoreCompactorThread {
LOG.info("Starting Cleaner thread");
try {
do {
- TxnStore.MutexAPI.LockHandle handle = null;
+ TxnStore.MutexAPI mutex = shouldUseMutex ? txnHandler.getMutexAPI() :
new NoMutex();
metadataCache.invalidate();
long startedAt = -1;
// Make sure nothing escapes this run method and kills the metastore
at large,
// so wrap it in a big catch Throwable statement.
- try {
- handle =
txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name());
+ try (AutoCloseable closeable =
mutex.acquireLock(TxnStore.MUTEX_KEY.Cleaner.name())) {
startedAt = System.currentTimeMillis();
if (metricsEnabled) {
@@ -120,9 +121,6 @@ public class Cleaner extends MetaStoreCompactorThread {
LOG.error("Caught an exception in the main loop of compactor
cleaner, {}",
StringUtils.stringifyException(t));
} finally {
- if (handle != null) {
- handle.releaseLocks();
- }
if (metricsEnabled) {
updateCycleDurationMetric(MetricsConstants.COMPACTION_CLEANER_CYCLE_DURATION,
startedAt);
}
@@ -170,4 +168,9 @@ public class Cleaner extends MetaStoreCompactorThread {
updateCycleDurationMetric(metric, startedAt);
}
}
+
+ @Override
+ public void enforceMutex(boolean enableMutex) {
+ this.shouldUseMutex = enableMutex;
+ }
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index 8a1bcb98733..f166d677e40 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hive.metastore.api.*;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
import org.apache.hadoop.hive.metastore.metrics.PerfLogger;
+import org.apache.hadoop.hive.metastore.txn.NoMutex;
import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo;
import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
@@ -56,6 +57,7 @@ public class Initiator extends MetaStoreCompactorThread {
private ExecutorService compactionExecutor;
private boolean metricsEnabled;
+ private boolean shouldUseMutex = true;
@Override
public void run() {
@@ -70,6 +72,7 @@ public class Initiator extends MetaStoreCompactorThread {
long abortedTimeThreshold = HiveConf
.getTimeVar(conf,
HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD,
TimeUnit.MILLISECONDS);
+ TxnStore.MutexAPI mutex = shouldUseMutex ? txnHandler.getMutexAPI() :
new NoMutex();
// Make sure we run through the loop once before checking to stop as
this makes testing
// much easier. The stop value is only for testing anyway and not used
when called from
@@ -78,13 +81,10 @@ public class Initiator extends MetaStoreCompactorThread {
PerfLogger perfLogger = PerfLogger.getPerfLogger(false);
long startedAt = -1;
long prevStart;
- TxnStore.MutexAPI.LockHandle handle = null;
- boolean exceptionally = false;
// Wrap the inner parts of the loop in a catch throwable so that any
errors in the loop
// don't doom the entire thread.
- try {
- handle =
txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Initiator.name());
+ try (TxnStore.MutexAPI.LockHandle handle =
mutex.acquireLock(TxnStore.MUTEX_KEY.Initiator.name())) {
startedAt = System.currentTimeMillis();
prevStart = handle.getLastUpdateTime();
@@ -174,16 +174,13 @@ public class Initiator extends MetaStoreCompactorThread {
// Check for timed out remote workers.
recoverFailedCompactions(true);
+ handle.releaseLocks(startedAt);
} catch (InterruptedException e) {
// do not ignore interruption requests
return;
} catch (Throwable t) {
LOG.error("Initiator loop caught unexpected exception this time
through the loop", t);
- exceptionally = true;
} finally {
- if (handle != null) {
- if (!exceptionally) handle.releaseLocks(startedAt); else
handle.releaseLocks();
- }
if (metricsEnabled) {
perfLogger.perfLogEnd(CLASS_NAME,
MetricsConstants.COMPACTION_INITIATOR_CYCLE);
updateCycleDurationMetric(MetricsConstants.COMPACTION_INITIATOR_CYCLE_DURATION,
startedAt);
@@ -215,8 +212,6 @@ public class Initiator extends MetaStoreCompactorThread {
return CompactorUtil.resolveDatabase(conf, ci.dbname);
}
-
-
@VisibleForTesting
protected String resolveUserToRunAs(Map<String, String> cache, Table t,
Partition p)
throws IOException, InterruptedException {
@@ -428,4 +423,9 @@ public class Initiator extends MetaStoreCompactorThread {
}
}
}
+
+ @Override
+ public void enforceMutex(boolean enableMutex) {
+ this.shouldUseMutex = enableMutex;
+ }
}
diff --git
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MetastoreTaskThread.java
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MetastoreTaskThread.java
index d56bc2ac8be..82beb909773 100644
---
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MetastoreTaskThread.java
+++
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MetastoreTaskThread.java
@@ -43,4 +43,12 @@ public interface MetastoreTaskThread extends Configurable,
Runnable {
default long initialDelay(TimeUnit unit) {
return runFrequency(unit);
}
+
+ /**
+ * Should use mutex support to allow only one copy of this task running
across the warehouse.
+ * @param useMutex true for enabling the mutex, false otherwise
+ */
+ default void enforceMutex(boolean useMutex) {
+ // no-op
+ }
}
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsRebuildLockCleanerTask.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsRebuildLockCleanerTask.java
index 4c2d5e31b32..10f9721be21 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsRebuildLockCleanerTask.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsRebuildLockCleanerTask.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils;
+import org.apache.hadoop.hive.metastore.txn.NoMutex;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.slf4j.Logger;
@@ -37,6 +38,7 @@ public class MaterializationsRebuildLockCleanerTask
implements MetastoreTaskThre
private Configuration conf;
private TxnStore txnHandler;
+ private boolean shouldUseMutex = true;
@Override
public long runFrequency(TimeUnit unit) {
@@ -60,9 +62,8 @@ public class MaterializationsRebuildLockCleanerTask
implements MetastoreTaskThre
LOG.debug("Cleaning up materialization rebuild locks");
}
- TxnStore.MutexAPI.LockHandle handle = null;
- try {
- handle =
txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.MaterializationRebuild.name());
+ TxnStore.MutexAPI mutex = shouldUseMutex ? txnHandler.getMutexAPI() : new
NoMutex();
+ try (AutoCloseable closeable =
mutex.acquireLock(TxnStore.MUTEX_KEY.MaterializationRebuild.name())) {
ValidTxnList validTxnList =
TxnCommonUtils.createValidReadTxnList(txnHandler.getOpenTxns(), 0);
long removedCnt =
txnHandler.cleanupMaterializationRebuildLocks(validTxnList,
MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.TXN_TIMEOUT,
TimeUnit.MILLISECONDS));
@@ -73,10 +74,11 @@ public class MaterializationsRebuildLockCleanerTask
implements MetastoreTaskThre
}
} catch (Throwable t) {
LOG.error("Unexpected error in thread: {}, message: {}",
Thread.currentThread().getName(), t.getMessage(), t);
- } finally {
- if (handle != null) {
- handle.releaseLocks();
- }
}
}
+
+ @Override
+ public void enforceMutex(boolean enableMutex) {
+ this.shouldUseMutex = enableMutex;
+ }
}
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java
index dabd61c4b46..a5c98942a5e 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreThread.java
@@ -42,4 +42,12 @@ public interface MetaStoreThread extends Configurable {
* been called.
*/
void start();
+
+ /**
+ * Should use mutex support to allow only one copy of this task running
across the warehouse.
+ * @param enableMutex true for enabling the mutex, false otherwise
+ */
+ default void enforceMutex(boolean enableMutex) {
+ // no-op
+ }
}
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/CompactorTasks.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/CompactorTasks.java
index 684862762fe..8f86e5fbc0e 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/CompactorTasks.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/CompactorTasks.java
@@ -138,6 +138,7 @@ public class CompactorTasks implements
LeaderElection.LeadershipStateListener {
AtomicBoolean flag = new AtomicBoolean();
thread.setConf(configuration);
thread.init(flag);
+ thread.enforceMutex(election.enforceMutex());
metastoreThreadsMap.put(thread, flag);
HiveMetaStore.LOG.info("Starting metastore thread of type " +
thread.getClass().getName());
thread.start();
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/HouseKeepingTasks.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/HouseKeepingTasks.java
index 3a4414fd004..f09be0966ce 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/HouseKeepingTasks.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/HouseKeepingTasks.java
@@ -99,6 +99,7 @@ public class HouseKeepingTasks implements
LeaderElection.LeadershipStateListener
List<MetastoreTaskThread> alwaysTasks = new
ArrayList<>(getAlwaysTasks());
for (MetastoreTaskThread task : alwaysTasks) {
task.setConf(configuration);
+ task.enforceMutex(election.enforceMutex());
long freq = task.runFrequency(TimeUnit.MILLISECONDS);
// For backwards compatibility, since some threads used to be hard
coded but only run if
// frequency was > 0
@@ -111,6 +112,7 @@ public class HouseKeepingTasks implements
LeaderElection.LeadershipStateListener
List<MetastoreTaskThread> remoteOnlyTasks = new
ArrayList<>(getRemoteOnlyTasks());
for (MetastoreTaskThread task : remoteOnlyTasks) {
task.setConf(configuration);
+ task.enforceMutex(election.enforceMutex());
long freq = task.runFrequency(TimeUnit.MILLISECONDS);
runningTasks.add(task);
metastoreTaskThreadPool.getPool().scheduleAtFixedRate(task, freq,
freq, TimeUnit.MILLISECONDS);
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElection.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElection.java
index b6f10e5936b..5a6ab5d77bb 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElection.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaderElection.java
@@ -31,6 +31,11 @@ import java.io.Closeable;
* @param <T> the type of mutex
*/
public interface LeaderElection<T> extends Closeable {
+ // We might have different versions of HMS, or even the same version but
with different
+ // leader election methods running inside the warehouse, so it's hard to
know how many HMS instances
+ // that elected as the leader. Relying on this property to tell us, default
is true, means it has multiple
+ // HMS instances acting as the leader.
+ static final String HIVE_TXN_ENFORCE_AUX_MUTEX =
"hive.metastore.enforce.aux.mutex";
/**
* Place where election happens
@@ -65,6 +70,10 @@ public interface LeaderElection<T> extends Closeable {
*/
public String getName();
+ default boolean enforceMutex() {
+ return true;
+ }
+
public interface LeadershipStateListener {
/**
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaseLeaderElection.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaseLeaderElection.java
index d6ad76dcce9..fc4d4078df2 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaseLeaderElection.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaseLeaderElection.java
@@ -95,6 +95,7 @@ public class LeaseLeaderElection implements
LeaderElection<TableName> {
private String name;
private String userName;
private String hostName;
+ private boolean enforceMutex;
public LeaseLeaderElection() throws IOException {
userName = SecurityUtils.getUser();
@@ -152,7 +153,7 @@ public class LeaseLeaderElection implements
LeaderElection<TableName> {
public void tryBeLeader(Configuration conf, TableName table) throws
LeaderException {
requireNonNull(conf, "conf is null");
requireNonNull(table, "table is null");
-
+ this.enforceMutex = conf.getBoolean(HIVE_TXN_ENFORCE_AUX_MUTEX, true);
if (store == null) {
store = TxnUtils.getTxnStore(conf);
}
@@ -471,4 +472,9 @@ public class LeaseLeaderElection implements
LeaderElection<TableName> {
public String getName() {
return name;
}
+
+ @Override
+ public boolean enforceMutex() {
+ return this.enforceMutex;
+ }
}
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/StaticLeaderElection.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/StaticLeaderElection.java
index 8a1752bd9a3..45917feab7e 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/StaticLeaderElection.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/StaticLeaderElection.java
@@ -39,13 +39,14 @@ public class StaticLeaderElection implements
LeaderElection<String> {
private volatile boolean isLeader;
private String name;
private List<LeadershipStateListener> listeners = new ArrayList<>();
+ private boolean enforceMutex;
@Override
public void tryBeLeader(Configuration conf, String hostName)
throws LeaderException {
String leaderHost = MetastoreConf.getVar(conf,
MetastoreConf.ConfVars.METASTORE_HOUSEKEEPING_LEADER_HOSTNAME);
-
+ this.enforceMutex = conf.getBoolean(HIVE_TXN_ENFORCE_AUX_MUTEX, true);
// For the sake of backward compatibility, when the current HMS becomes
the leader when no
// leader is specified.
if (leaderHost == null || leaderHost.isEmpty()) {
@@ -103,4 +104,8 @@ public class StaticLeaderElection implements
LeaderElection<String> {
}
}
+ @Override
+ public boolean enforceMutex() {
+ return this.enforceMutex;
+ }
}
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/StatsUpdaterTask.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/StatsUpdaterTask.java
index 9c3b754d1ab..82a49188ba6 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/StatsUpdaterTask.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/StatsUpdaterTask.java
@@ -76,6 +76,7 @@ public class StatsUpdaterTask implements
LeaderElection.LeadershipStateListener
thread.setConf(configuration);
stop = new AtomicBoolean(false);
thread.init(stop);
+ thread.enforceMutex(election.enforceMutex());
HiveMetaStore.LOG.info("Starting metastore thread of type " +
thread.getClass().getName());
thread.start();
} catch (Exception e) {
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/NoMutex.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/NoMutex.java
new file mode 100644
index 00000000000..4ce65aac060
--- /dev/null
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/NoMutex.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.txn;
+
+import org.apache.hadoop.hive.metastore.api.MetaException;
+
+/**
+ * An empty implementation of TxnStore.MutexAPI
+ */
+public class NoMutex implements TxnStore.MutexAPI {
+
+ @Override
+ public LockHandle acquireLock(String key) throws MetaException {
+ return new DummyHandle();
+ }
+
+ @Override
+ public void acquireLock(String key, LockHandle handle) throws MetaException {
+ // no-op
+ }
+
+ private static class DummyHandle implements LockHandle {
+
+ private long lastUpdateTime = 0L;
+
+ @Override
+ public void releaseLocks() {
+ // no-op
+ }
+
+ @Override
+ public Long getLastUpdateTime() {
+ return lastUpdateTime;
+ }
+
+ @Override
+ public void releaseLocks(Long timestamp) {
+ this.lastUpdateTime = timestamp;
+ }
+
+ @Override
+ public void close() {
+ // no-op
+ }
+ }
+
+}
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStoreMutex.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStoreMutex.java
index 1013493a791..3dad12fc1f5 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStoreMutex.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStoreMutex.java
@@ -142,6 +142,7 @@ public class TxnStoreMutex implements TxnStore.MutexAPI {
private final Semaphore derbySemaphore;
private final String key;
private final Long lastUpdateTime;
+ private boolean released = false;
public LockHandleImpl(MultiDataSourceJdbcResource jdbcResource,
TransactionContext context, String key,
Long lastUpdateTime, Semaphore derbySemaphore) {
@@ -166,6 +167,7 @@ public class TxnStoreMutex implements TxnStore.MutexAPI {
LOG.debug("{} unlocked by {}", key, HOSTNAME);
}
} finally {
+ released = true;
jdbcResource.unbindDataSource();
}
}
@@ -196,13 +198,16 @@ public class TxnStoreMutex implements TxnStore.MutexAPI {
LOG.debug("{} unlocked by {}", key, HOSTNAME);
}
} finally {
+ released = true;
jdbcResource.unbindDataSource();
}
}
@Override
public void close() {
- releaseLocks();
+ if (!released) {
+ releaseLocks();
+ }
}
}
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/TransactionContextManager.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/TransactionContextManager.java
index 5be42248fe7..40daf6f1d39 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/TransactionContextManager.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/TransactionContextManager.java
@@ -22,6 +22,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
+import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.support.DefaultTransactionDefinition;
/**
@@ -57,12 +58,15 @@ public class TransactionContextManager {
* @param propagation The transaction propagation to use.
*/
public TransactionContext getNewTransaction(int propagation) {
- TransactionContext context = new
TransactionContext(realTransactionManager.getTransaction(
- new DefaultTransactionDefinition(propagation)), this);
+ DefaultTransactionDefinition transactionDefinition = new
DefaultTransactionDefinition(propagation);
+ // The TxnStore default isolation level is READ_COMMITTED
+ transactionDefinition.setIsolationLevel(Isolation.READ_COMMITTED.value());
+ TransactionContext context = new TransactionContext(
+ realTransactionManager.getTransaction(transactionDefinition), this);
contexts.set(context);
return context;
}
-
+
public TransactionContext getActiveTransaction() {
return contexts.get();
}
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/service/AcidHouseKeeperService.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/service/AcidHouseKeeperService.java
index 86799e90621..836b85851e7 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/service/AcidHouseKeeperService.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/service/AcidHouseKeeperService.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.MetastoreTaskThread;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.txn.NoMutex;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.slf4j.Logger;
@@ -46,6 +47,7 @@ public class AcidHouseKeeperService implements
MetastoreTaskThread {
protected TxnStore txnHandler;
protected String serviceName;
protected Map<FailableRunnable<MetaException>, String> tasks;
+ private boolean shouldUseMutex = true;
public AcidHouseKeeperService() {
serviceName = this.getClass().getSimpleName();
@@ -78,19 +80,14 @@ public class AcidHouseKeeperService implements
MetastoreTaskThread {
@Override
public void run() {
- TxnStore.MutexAPI.LockHandle handle = null;
- try {
- handle =
txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.HouseKeeper.name());
+ TxnStore.MutexAPI mutex = shouldUseMutex ? txnHandler.getMutexAPI() : new
NoMutex();
+ try (AutoCloseable closeable =
mutex.acquireLock(TxnStore.MUTEX_KEY.HouseKeeper.name())) {
LOG.info("Starting to run {}", serviceName);
long start = System.currentTimeMillis();
cleanTheHouse();
LOG.debug("Total time {} took: {} seconds.", serviceName,
elapsedSince(start));
} catch (Exception e) {
LOG.error("Unexpected exception in thread: {}, message: {}",
Thread.currentThread().getName(), e.getMessage(), e);
- } finally {
- if (handle != null) {
- handle.releaseLocks();
- }
}
}
@@ -107,4 +104,9 @@ public class AcidHouseKeeperService implements
MetastoreTaskThread {
private long elapsedSince(long start) {
return (System.currentTimeMillis() - start) / 1000;
}
+
+ @Override
+ public void enforceMutex(boolean enableMutex) {
+ this.shouldUseMutex = enableMutex;
+ }
}
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/service/AcidTxnCleanerService.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/service/AcidTxnCleanerService.java
index 06f284faee0..766ef7b67d8 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/service/AcidTxnCleanerService.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/service/AcidTxnCleanerService.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.metastore.txn.service;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.MetastoreTaskThread;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.txn.NoMutex;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.slf4j.Logger;
@@ -37,6 +38,7 @@ public class AcidTxnCleanerService implements
MetastoreTaskThread {
private Configuration conf;
private TxnStore txnHandler;
+ private boolean shouldUseMutex = true;
@Override
public void setConf(Configuration configuration) {
@@ -56,22 +58,22 @@ public class AcidTxnCleanerService implements
MetastoreTaskThread {
@Override
public void run() {
- TxnStore.MutexAPI.LockHandle handle = null;
- try {
- handle =
txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.TxnCleaner.name());
+ TxnStore.MutexAPI mutex = shouldUseMutex ? txnHandler.getMutexAPI() : new
NoMutex();
+ try (AutoCloseable closeable =
mutex.acquireLock(TxnStore.MUTEX_KEY.TxnCleaner.name())) {
long start = System.currentTimeMillis();
txnHandler.cleanEmptyAbortedAndCommittedTxns();
LOG.debug("Txn cleaner service took: {} seconds.", elapsedSince(start));
} catch (Exception e) {
LOG.error("Unexpected exception in thread: {}, message: {}",
Thread.currentThread().getName(), e.getMessage(), e);
- } finally {
- if (handle != null) {
- handle.releaseLocks();
- }
}
}
private long elapsedSince(long start) {
return (System.currentTimeMillis() - start) / 1000;
}
+
+ @Override
+ public void enforceMutex(boolean enableMutex) {
+ this.shouldUseMutex = enableMutex;
+ }
}