This is an automated email from the ASF dual-hosted git repository.
dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new e1025c87d8a HIVE-26177: Amendment: Initialize datanucleus compaction
pool only on HMS Leader & pool sizing (Denys Kuzmenko, reviewed by Peter Vary)
e1025c87d8a is described below
commit e1025c87d8ad664dda56382ff7e7490137ec3413
Author: Denys Kuzmenko <[email protected]>
AuthorDate: Fri Jun 17 09:43:33 2022 +0200
HIVE-26177: Amendment: Initialize datanucleus compaction pool only on HMS
Leader & pool sizing (Denys Kuzmenko, reviewed by Peter Vary)
Closes #3372
---
.../MetastoreHousekeepingLeaderTestBase.java | 1 +
.../org/apache/hadoop/hive/ql/TestAcidOnTez.java | 7 +-
.../hive/ql/txn/compactor/CompactorOnTezTest.java | 1 +
.../txn/compactor/TestCleanerWithReplication.java | 2 +
.../hive/ql/txn/compactor/TestCompactor.java | 2 +
.../hadoop/hive/ql/TxnCommandsBaseForTests.java | 2 +
.../ql/lockmgr/DbTxnManagerEndToEndTestBase.java | 1 +
.../hive/ql/txn/compactor/CompactorTest.java | 1 +
.../hadoop/hive/metastore/conf/MetastoreConf.java | 2 +-
.../hadoop/hive/metastore/HiveMetaStore.java | 23 ++-
.../hive/metastore/PersistenceManagerProvider.java | 195 +++++++++++----------
.../hadoop/hive/metastore/txn/TxnHandler.java | 10 +-
12 files changed, 133 insertions(+), 114 deletions(-)
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/MetastoreHousekeepingLeaderTestBase.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/MetastoreHousekeepingLeaderTestBase.java
index d165bb2e6c5..c59384c9780 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/MetastoreHousekeepingLeaderTestBase.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/MetastoreHousekeepingLeaderTestBase.java
@@ -58,6 +58,7 @@ class MetastoreHousekeepingLeaderTestBase {
MetaStoreTestUtils.setConfForStandloneMode(conf);
MetastoreConf.setVar(conf, ConfVars.THRIFT_BIND_HOST, "localhost");
MetastoreConf.setVar(conf,
ConfVars.METASTORE_HOUSEKEEPING_LEADER_HOSTNAME, leaderHostName);
+ MetastoreConf.setBoolVar(conf,
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON, true);
addHouseKeepingThreadConfigs();
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java
index e9f204db292..52696384987 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java
@@ -46,6 +46,7 @@ import
org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
@@ -118,9 +119,9 @@ public class TestAcidOnTez {
hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false);
hiveConf.setVar(HiveConf.ConfVars.HIVEMAPREDMODE, "nonstrict");
hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT,
HiveInputFormat.class.getName());
- hiveConf
- .setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
-
"org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
+ hiveConf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
+
"org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
+ MetastoreConf.setBoolVar(hiveConf,
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON, true);
TestTxnDbUtil.setConfValues(hiveConf);
hiveConf.setInt(MRJobConfig.MAP_MEMORY_MB, 1024);
hiveConf.setInt(MRJobConfig.REDUCE_MEMORY_MB, 1024);
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java
index a43fd54389b..10e88ca97ba 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java
@@ -92,6 +92,7 @@ public abstract class CompactorOnTezTest {
hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT,
HiveInputFormat.class.getName());
hiveConf.setVar(HiveConf.ConfVars.HIVEFETCHTASKCONVERSION, "none");
MetastoreConf.setTimeVar(hiveConf,
MetastoreConf.ConfVars.TXN_OPENTXN_TIMEOUT, 2, TimeUnit.SECONDS);
+ MetastoreConf.setBoolVar(hiveConf,
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON, true);
TestTxnDbUtil.setConfValues(hiveConf);
TestTxnDbUtil.cleanDb(hiveConf);
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java
index 653c08ca0c6..f38eb8b02bb 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java
@@ -31,6 +31,7 @@ import
org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
import org.apache.hadoop.hive.metastore.api.Table;
import static
org.apache.hadoop.hive.common.repl.ReplConst.SOURCE_OF_REPLICATION;
import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
@@ -62,6 +63,7 @@ public class TestCleanerWithReplication extends CompactorTest
{
TestTxnDbUtil.cleanDb(conf);
conf.set("fs.defaultFS", fs.getUri().toString());
conf.setBoolVar(HiveConf.ConfVars.REPLCMENABLED, true);
+ MetastoreConf.setBoolVar(conf,
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON, true);
TestTxnDbUtil.prepDb(conf);
ms = new HiveMetaStoreClient(conf);
txnHandler = TxnUtils.getTxnStore(conf);
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index 0127d881c4d..5673446be77 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -65,6 +65,7 @@ import
org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
@@ -139,6 +140,7 @@ public class TestCompactor {
hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT,
HiveInputFormat.class.getName());
hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false);
hiveConf.setBoolVar(HiveConf.ConfVars.HIVEOPTIMIZEMETADATAQUERIES, false);
+ MetastoreConf.setBoolVar(hiveConf,
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON, true);
TestTxnDbUtil.setConfValues(hiveConf);
TestTxnDbUtil.cleanDb(hiveConf);
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
index 4c0190d5f94..51403db3cc9 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
@@ -127,6 +127,8 @@ public abstract class TxnCommandsBaseForTests {
HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.SPLIT_UPDATE, true);
hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, false);
hiveConf.setBoolean("mapred.input.dir.recursive", true);
+ MetastoreConf.setBoolVar(hiveConf,
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON, true);
+
TestTxnDbUtil.setConfValues(hiveConf);
txnHandler = TxnUtils.getTxnStore(hiveConf);
TestTxnDbUtil.prepDb(hiveConf);
diff --git
a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java
b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java
index f9fc37dbad8..9d95239f354 100644
---
a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java
+++
b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java
@@ -59,6 +59,7 @@ public abstract class DbTxnManagerEndToEndTestBase {
HiveConf.setBoolVar(conf, HiveConf.ConfVars.TXN_MERGE_INSERT_X_LOCK, true);
MetastoreConf.setVar(conf, MetastoreConf.ConfVars.WAREHOUSE,
getWarehouseDir());
+ MetastoreConf.setBoolVar(conf,
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON, true);
TestTxnDbUtil.setConfValues(conf);
}
diff --git
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
index d353f202b59..78cf8617f44 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
@@ -130,6 +130,7 @@ public abstract class CompactorTest {
protected void setup(HiveConf conf) throws Exception {
this.conf = conf;
MetastoreConf.setTimeVar(conf, MetastoreConf.ConfVars.TXN_OPENTXN_TIMEOUT,
2, TimeUnit.SECONDS);
+ MetastoreConf.setBoolVar(conf,
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON, true);
TestTxnDbUtil.setConfValues(conf);
TestTxnDbUtil.cleanDb(conf);
TestTxnDbUtil.prepDb(conf);
diff --git
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index 850d675e307..b59a006c98c 100644
---
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -629,7 +629,7 @@ public class MetastoreConf {
"Initial value of the cleaner retry retention time. The delay has
a backoff, and calculated the following way: " +
"pow(2, number_of_failed_attempts) *
HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME."),
HIVE_COMPACTOR_CONNECTION_POOLING_MAX_CONNECTIONS("metastore.compactor.connectionPool.maxPoolSize",
- "hive.compactor.connectionPool.maxPoolSize", 10,
+ "hive.compactor.connectionPool.maxPoolSize", 5,
"Specify the maximum number of connections in the connection pool
used by the compactor."),
CONNECTION_DRIVER("javax.jdo.option.ConnectionDriverName",
"javax.jdo.option.ConnectionDriverName",
"org.apache.derby.jdbc.EmbeddedDriver",
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index d2a5837daa4..445ea044401 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -555,6 +555,12 @@ public class HiveMetaStore extends ThriftHiveMetastore {
protocolFactory = new TBinaryProtocol.Factory();
inputProtoFactory = new TBinaryProtocol.Factory(true, true,
maxMessageSize, maxMessageSize);
}
+
+ msHost = MetastoreConf.getVar(conf, ConfVars.THRIFT_BIND_HOST);
+ if (msHost != null && !msHost.trim().isEmpty()) {
+ LOG.info("Binding host " + msHost + " for metastore server");
+ }
+
IHMSHandler handler = newRetryingHMSHandler(baseHandler, conf);
TServerSocket serverSocket;
if (useSasl) {
@@ -571,12 +577,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
LOG.info("Starting DB backed MetaStore Server");
}
}
-
- msHost = MetastoreConf.getVar(conf, ConfVars.THRIFT_BIND_HOST);
- if (msHost != null && !msHost.trim().isEmpty()) {
- LOG.info("Binding host " + msHost + " for metastore server");
- }
-
+
if (!useSSL) {
serverSocket = SecurityUtils.getServerSocket(msHost, port);
} else {
@@ -721,7 +722,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
Condition startCondition = metaStoreThreadsLock.newCondition();
AtomicBoolean startedServing = new AtomicBoolean();
startMetaStoreThreads(conf, metaStoreThreadsLock, startCondition,
startedServing,
- isMetastoreHousekeepingLeader(conf, getServerHostName()),
startedBackgroundThreads);
+ isMetaStoreHousekeepingLeader(conf), startedBackgroundThreads);
signalOtherThreadsToStart(thriftServer, metaStoreThreadsLock,
startCondition, startedServing);
}
@@ -773,10 +774,9 @@ public class HiveMetaStore extends ThriftHiveMetastore {
}
}
- private static boolean isMetastoreHousekeepingLeader(Configuration conf,
String serverHost) {
- String leaderHost =
- MetastoreConf.getVar(conf,
-
MetastoreConf.ConfVars.METASTORE_HOUSEKEEPING_LEADER_HOSTNAME);
+ static boolean isMetaStoreHousekeepingLeader(Configuration conf) throws
Exception {
+ String leaderHost = MetastoreConf.getVar(conf,
MetastoreConf.ConfVars.METASTORE_HOUSEKEEPING_LEADER_HOSTNAME);
+ String serverHost = getServerHostName();
// For the sake of backward compatibility, when the current HMS becomes
the leader when no
// leader is specified.
@@ -785,7 +785,6 @@ public class HiveMetaStore extends ThriftHiveMetastore {
"housekeeping threads.");
return true;
}
-
LOG.info(ConfVars.METASTORE_HOUSEKEEPING_LEADER_HOSTNAME + " is set to " +
leaderHost);
return leaderHost.trim().equals(serverHost);
}
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PersistenceManagerProvider.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PersistenceManagerProvider.java
index d5b55b6e554..1c2edbbbad8 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PersistenceManagerProvider.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PersistenceManagerProvider.java
@@ -69,6 +69,8 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;
+import static
org.apache.hadoop.hive.metastore.HiveMetaStore.isMetaStoreHousekeepingLeader;
+
/**
* This class is a wrapper class around PersistenceManagerFactory and its
properties
* These objects are static and need to be carefully modified together such
that there are no
@@ -160,14 +162,14 @@ public class PersistenceManagerProvider {
boolean readLockAcquired = true;
try {
// if pmf properties change, need to update, release read lock and take
write lock
- if (prop == null || pmf == null || !propsFromConf.equals(prop)) {
+ if (pmf == null || !propsFromConf.equals(prop)) {
pmfReadLock.unlock();
readLockAcquired = false;
pmfWriteLock.lock();
try {
// check if we need to update pmf again here in case some other
thread already did it
// for us after releasing readlock and before acquiring write lock
above
- if (prop == null || pmf == null || !propsFromConf.equals(prop)) {
+ if (pmf == null || !propsFromConf.equals(prop)) {
// OK, now we really need to re-initialize pmf and pmf properties
if (LOG.isInfoEnabled()) {
LOG.info("Updating the pmf due to property change");
@@ -195,7 +197,7 @@ public class PersistenceManagerProvider {
}
}
if (pmf != null) {
- clearOutPmfClassLoaderCache();
+ clearOutPmfClassLoaderCache(pmf);
if (!forTwoMetastoreTesting) {
// close the underlying connection pool to avoid leaks
LOG.debug("Closing PersistenceManagerFactory");
@@ -210,8 +212,15 @@ public class PersistenceManagerProvider {
retryInterval = MetastoreConf
.getTimeVar(conf, ConfVars.HMS_HANDLER_INTERVAL,
TimeUnit.MILLISECONDS);
// init PMF with retry logic
- pmf = retry(() -> initPMF(conf, true));
- compactorPmf = retry(() -> initPMF(conf, false));
+ pmf = retry(() -> initPMF(conf, false));
+ try {
+ if (isMetaStoreHousekeepingLeader(conf) &&
MetastoreConf.getBoolVar(conf, ConfVars.COMPACTOR_INITIATOR_ON)
+ && compactorPmf == null) {
+ compactorPmf = retry(() -> initPMF(conf, true));
+ }
+ } catch (Exception e) {
+ LOG.error(e.getMessage());
+ }
}
// downgrade by acquiring read lock before releasing write lock
pmfReadLock.lock();
@@ -231,21 +240,24 @@ public class PersistenceManagerProvider {
DataSourceProvider dsp =
DataSourceProviderFactory.tryGetDataSourceProviderOrNull(conf);
PersistenceManagerFactory pmf;
+ // Any preexisting datanucleus property should be passed along
+ Map<Object, Object> dsProp = new HashMap<>(prop);
+ int maxPoolSize = -1;
+
+ if (forCompactor) {
+ maxPoolSize = MetastoreConf.getIntVar(conf,
ConfVars.HIVE_COMPACTOR_CONNECTION_POOLING_MAX_CONNECTIONS);
+ dsProp.put(ConfVars.CONNECTION_POOLING_MAX_CONNECTIONS.getVarname(),
maxPoolSize);
+ }
if (dsp == null) {
- pmf = JDOHelper.getPersistenceManagerFactory(prop);
+ pmf = JDOHelper.getPersistenceManagerFactory(dsProp);
} else {
try {
- DataSource ds =
- forCompactor ? dsp.create(conf, MetastoreConf.getIntVar(conf,
ConfVars.HIVE_COMPACTOR_CONNECTION_POOLING_MAX_CONNECTIONS)) :
- dsp.create(conf);
- Map<Object, Object> dsProperties = new HashMap<>();
- //Any preexisting datanucleus property should be passed along
- dsProperties.putAll(prop);
- dsProperties.put(PropertyNames.PROPERTY_CONNECTION_FACTORY, ds);
- dsProperties.put(PropertyNames.PROPERTY_CONNECTION_FACTORY2, ds);
- dsProperties.put(ConfVars.MANAGER_FACTORY_CLASS.getVarname(),
+ DataSource ds = (maxPoolSize > 0) ? dsp.create(conf, maxPoolSize) :
dsp.create(conf);
+ dsProp.put(PropertyNames.PROPERTY_CONNECTION_FACTORY, ds);
+ dsProp.put(PropertyNames.PROPERTY_CONNECTION_FACTORY2, ds);
+ dsProp.put(ConfVars.MANAGER_FACTORY_CLASS.getVarname(),
"org.datanucleus.api.jdo.JDOPersistenceManagerFactory");
- pmf = JDOHelper.getPersistenceManagerFactory(dsProperties);
+ pmf = JDOHelper.getPersistenceManagerFactory(dsProp);
} catch (SQLException e) {
LOG.warn("Could not create PersistenceManagerFactory using "
+ "connection pool properties, will fall back", e);
@@ -291,84 +303,89 @@ public class PersistenceManagerProvider {
public static void clearOutPmfClassLoaderCache() {
pmfWriteLock.lock();
try {
- if ((pmf == null) || (!(pmf instanceof JDOPersistenceManagerFactory))) {
- return;
- }
- // NOTE : This is hacky, and this section of code is fragile depending
on DN code varnames
- // so it's likely to stop working at some time in the future, especially
if we upgrade DN
- // versions, so we actively need to find a better way to make sure the
leak doesn't happen
- // instead of just clearing out the cache after every call.
- JDOPersistenceManagerFactory jdoPmf = (JDOPersistenceManagerFactory) pmf;
- NucleusContext nc = jdoPmf.getNucleusContext();
- try {
- Field pmCache = pmf.getClass().getDeclaredField("pmCache");
- pmCache.setAccessible(true);
- Set<JDOPersistenceManager> pmSet = (Set<JDOPersistenceManager>)
pmCache.get(pmf);
- for (JDOPersistenceManager pm : pmSet) {
- org.datanucleus.ExecutionContext ec = pm.getExecutionContext();
- if (ec instanceof org.datanucleus.ExecutionContextThreadedImpl) {
- ClassLoaderResolver clr =
- ((org.datanucleus.ExecutionContextThreadedImpl)
ec).getClassLoaderResolver();
- clearClr(clr);
- }
- }
- org.datanucleus.plugin.PluginManager pluginManager =
- jdoPmf.getNucleusContext().getPluginManager();
- Field registryField =
pluginManager.getClass().getDeclaredField("registry");
- registryField.setAccessible(true);
- org.datanucleus.plugin.PluginRegistry registry =
- (org.datanucleus.plugin.PluginRegistry)
registryField.get(pluginManager);
- if (registry instanceof
org.datanucleus.plugin.NonManagedPluginRegistry) {
- org.datanucleus.plugin.NonManagedPluginRegistry nRegistry =
- (org.datanucleus.plugin.NonManagedPluginRegistry) registry;
- Field clrField = nRegistry.getClass().getDeclaredField("clr");
- clrField.setAccessible(true);
- ClassLoaderResolver clr = (ClassLoaderResolver)
clrField.get(nRegistry);
+ clearOutPmfClassLoaderCache(pmf);
+ clearOutPmfClassLoaderCache(compactorPmf);
+ } finally {
+ pmfWriteLock.unlock();
+ }
+ }
+
+ private static void clearOutPmfClassLoaderCache (PersistenceManagerFactory
pmf) {
+ if (!(pmf instanceof JDOPersistenceManagerFactory)) {
+ return;
+ }
+ // NOTE : This is hacky, and this section of code is fragile depending on
DN code varnames
+ // so it's likely to stop working at some time in the future, especially
if we upgrade DN
+ // versions, so we actively need to find a better way to make sure the
leak doesn't happen
+ // instead of just clearing out the cache after every call.
+ JDOPersistenceManagerFactory jdoPmf = (JDOPersistenceManagerFactory) pmf;
+ NucleusContext nc = jdoPmf.getNucleusContext();
+ try {
+ Field pmCache = pmf.getClass().getDeclaredField("pmCache");
+ pmCache.setAccessible(true);
+ Set<JDOPersistenceManager> pmSet = (Set<JDOPersistenceManager>)
pmCache.get(pmf);
+ for (JDOPersistenceManager pm : pmSet) {
+ org.datanucleus.ExecutionContext ec = pm.getExecutionContext();
+ if (ec instanceof org.datanucleus.ExecutionContextThreadedImpl) {
+ ClassLoaderResolver clr =
+ ((org.datanucleus.ExecutionContextThreadedImpl)
ec).getClassLoaderResolver();
clearClr(clr);
}
- if (nc instanceof org.datanucleus.PersistenceNucleusContextImpl) {
- org.datanucleus.PersistenceNucleusContextImpl pnc =
- (org.datanucleus.PersistenceNucleusContextImpl) nc;
- org.datanucleus.store.types.TypeManagerImpl tm =
- (org.datanucleus.store.types.TypeManagerImpl)
pnc.getTypeManager();
- Field clrField = tm.getClass().getDeclaredField("clr");
+ }
+ org.datanucleus.plugin.PluginManager pluginManager =
+ jdoPmf.getNucleusContext().getPluginManager();
+ Field registryField =
pluginManager.getClass().getDeclaredField("registry");
+ registryField.setAccessible(true);
+ org.datanucleus.plugin.PluginRegistry registry =
+ (org.datanucleus.plugin.PluginRegistry)
registryField.get(pluginManager);
+ if (registry instanceof org.datanucleus.plugin.NonManagedPluginRegistry)
{
+ org.datanucleus.plugin.NonManagedPluginRegistry nRegistry =
+ (org.datanucleus.plugin.NonManagedPluginRegistry) registry;
+ Field clrField = nRegistry.getClass().getDeclaredField("clr");
+ clrField.setAccessible(true);
+ ClassLoaderResolver clr = (ClassLoaderResolver)
clrField.get(nRegistry);
+ clearClr(clr);
+ }
+ if (nc instanceof org.datanucleus.PersistenceNucleusContextImpl) {
+ org.datanucleus.PersistenceNucleusContextImpl pnc =
+ (org.datanucleus.PersistenceNucleusContextImpl) nc;
+ org.datanucleus.store.types.TypeManagerImpl tm =
+ (org.datanucleus.store.types.TypeManagerImpl) pnc.getTypeManager();
+ Field clrField = tm.getClass().getDeclaredField("clr");
+ clrField.setAccessible(true);
+ ClassLoaderResolver clr = (ClassLoaderResolver) clrField.get(tm);
+ clearClr(clr);
+ Field storeMgrField = pnc.getClass().getDeclaredField("storeMgr");
+ storeMgrField.setAccessible(true);
+ org.datanucleus.store.rdbms.RDBMSStoreManager storeMgr =
+ (org.datanucleus.store.rdbms.RDBMSStoreManager)
storeMgrField.get(pnc);
+ Field backingStoreField =
+ storeMgr.getClass().getDeclaredField("backingStoreByMemberName");
+ backingStoreField.setAccessible(true);
+ Map<String, Store> backingStoreByMemberName =
+ (Map<String, Store>) backingStoreField.get(storeMgr);
+ for (Store store : backingStoreByMemberName.values()) {
+ org.datanucleus.store.rdbms.scostore.BaseContainerStore baseStore =
+ (org.datanucleus.store.rdbms.scostore.BaseContainerStore) store;
+ clrField =
org.datanucleus.store.rdbms.scostore.BaseContainerStore.class
+ .getDeclaredField("clr");
clrField.setAccessible(true);
- ClassLoaderResolver clr = (ClassLoaderResolver) clrField.get(tm);
- clearClr(clr);
- Field storeMgrField = pnc.getClass().getDeclaredField("storeMgr");
- storeMgrField.setAccessible(true);
- org.datanucleus.store.rdbms.RDBMSStoreManager storeMgr =
- (org.datanucleus.store.rdbms.RDBMSStoreManager)
storeMgrField.get(pnc);
- Field backingStoreField =
- storeMgr.getClass().getDeclaredField("backingStoreByMemberName");
- backingStoreField.setAccessible(true);
- Map<String, Store> backingStoreByMemberName =
- (Map<String, Store>) backingStoreField.get(storeMgr);
- for (Store store : backingStoreByMemberName.values()) {
- org.datanucleus.store.rdbms.scostore.BaseContainerStore baseStore =
- (org.datanucleus.store.rdbms.scostore.BaseContainerStore)
store;
- clrField =
org.datanucleus.store.rdbms.scostore.BaseContainerStore.class
- .getDeclaredField("clr");
- clrField.setAccessible(true);
- clr = (ClassLoaderResolver) clrField.get(baseStore);
- clearClr(clr);
- }
- }
- Field classLoaderResolverMap =
-
AbstractNucleusContext.class.getDeclaredField("classLoaderResolverMap");
- classLoaderResolverMap.setAccessible(true);
- Map<String, ClassLoaderResolver> loaderMap =
- (Map<String, ClassLoaderResolver>) classLoaderResolverMap.get(nc);
- for (ClassLoaderResolver clr : loaderMap.values()) {
+ clr = (ClassLoaderResolver) clrField.get(baseStore);
clearClr(clr);
}
- classLoaderResolverMap.set(nc, new HashMap<String,
ClassLoaderResolver>());
- LOG.debug("Removed cached classloaders from DataNucleus
NucleusContext");
- } catch (Exception e) {
- LOG.warn("Failed to remove cached classloaders from DataNucleus
NucleusContext", e);
}
- } finally {
- pmfWriteLock.unlock();
+ Field classLoaderResolverMap =
+
AbstractNucleusContext.class.getDeclaredField("classLoaderResolverMap");
+ classLoaderResolverMap.setAccessible(true);
+ Map<String, ClassLoaderResolver> loaderMap =
+ (Map<String, ClassLoaderResolver>) classLoaderResolverMap.get(nc);
+ for (ClassLoaderResolver clr : loaderMap.values()) {
+ clearClr(clr);
+ }
+ classLoaderResolverMap.set(nc, new HashMap<String,
ClassLoaderResolver>());
+ LOG.debug("Removed cached classloaders from DataNucleus NucleusContext");
+ } catch (Exception e) {
+ LOG.warn("Failed to remove cached classloaders from DataNucleus
NucleusContext", e);
}
}
@@ -411,7 +428,7 @@ public class PersistenceManagerProvider {
public static PersistenceManager getPersistenceManager(boolean forCompactor)
{
pmfReadLock.lock();
try {
- if ((!forCompactor && pmf == null) || (forCompactor && compactorPmf ==
null)) {
+ if (pmf == null || (forCompactor && compactorPmf == null)) {
throw new RuntimeException(
"Cannot create PersistenceManager. PersistenceManagerFactory is
not yet initialized");
}
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 74e854944b8..a52b9aec0a7 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -379,16 +379,8 @@ abstract class TxnHandler implements TxnStore,
TxnStore.MutexAPI {
if (connPool == null) {
connPool = setupJdbcConnectionPool(conf, maxPoolSize);
}
-
if (connPoolMutex == null) {
- /*the mutex pools should ideally be somewhat larger since some
operations require 1
- connection from each pool and we want to avoid taking a connection
from primary pool
- and then blocking because mutex pool is empty. There is only 1
thread in any HMS trying
- to mutex on each MUTEX_KEY except MUTEX_KEY.CheckLock. The
CheckLock operation gets a
- connection from connPool first, then connPoolMutex. All others, go
in the opposite
- order (not very elegant...). So number of connection requests for
connPoolMutex cannot
- exceed (size of connPool + MUTEX_KEY.values().length - 1).*/
- connPoolMutex = setupJdbcConnectionPool(conf, maxPoolSize +
MUTEX_KEY.values().length);
+ connPoolMutex = setupJdbcConnectionPool(conf, maxPoolSize);
}
if (dbProduct == null) {