This is an automated email from the ASF dual-hosted git repository.

dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new e1025c87d8a HIVE-26177: Amendment: Initialize datanucleus compaction 
pool only on HMS Leader & pool sizing (Denys Kuzmenko, reviewed by Peter Vary)
e1025c87d8a is described below

commit e1025c87d8ad664dda56382ff7e7490137ec3413
Author: Denys Kuzmenko <[email protected]>
AuthorDate: Fri Jun 17 09:43:33 2022 +0200

    HIVE-26177: Amendment: Initialize datanucleus compaction pool only on HMS 
Leader & pool sizing (Denys Kuzmenko, reviewed by Peter Vary)
    
    Closes #3372
---
 .../MetastoreHousekeepingLeaderTestBase.java       |   1 +
 .../org/apache/hadoop/hive/ql/TestAcidOnTez.java   |   7 +-
 .../hive/ql/txn/compactor/CompactorOnTezTest.java  |   1 +
 .../txn/compactor/TestCleanerWithReplication.java  |   2 +
 .../hive/ql/txn/compactor/TestCompactor.java       |   2 +
 .../hadoop/hive/ql/TxnCommandsBaseForTests.java    |   2 +
 .../ql/lockmgr/DbTxnManagerEndToEndTestBase.java   |   1 +
 .../hive/ql/txn/compactor/CompactorTest.java       |   1 +
 .../hadoop/hive/metastore/conf/MetastoreConf.java  |   2 +-
 .../hadoop/hive/metastore/HiveMetaStore.java       |  23 ++-
 .../hive/metastore/PersistenceManagerProvider.java | 195 +++++++++++----------
 .../hadoop/hive/metastore/txn/TxnHandler.java      |  10 +-
 12 files changed, 133 insertions(+), 114 deletions(-)

diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/MetastoreHousekeepingLeaderTestBase.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/MetastoreHousekeepingLeaderTestBase.java
index d165bb2e6c5..c59384c9780 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/MetastoreHousekeepingLeaderTestBase.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/MetastoreHousekeepingLeaderTestBase.java
@@ -58,6 +58,7 @@ class MetastoreHousekeepingLeaderTestBase {
     MetaStoreTestUtils.setConfForStandloneMode(conf);
     MetastoreConf.setVar(conf, ConfVars.THRIFT_BIND_HOST, "localhost");
     MetastoreConf.setVar(conf, 
ConfVars.METASTORE_HOUSEKEEPING_LEADER_HOSTNAME, leaderHostName);
+    MetastoreConf.setBoolVar(conf, 
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON, true);
 
     addHouseKeepingThreadConfigs();
 
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java
index e9f204db292..52696384987 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java
@@ -46,6 +46,7 @@ import 
org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
 import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
 import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
 import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
@@ -118,9 +119,9 @@ public class TestAcidOnTez {
     hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false);
     hiveConf.setVar(HiveConf.ConfVars.HIVEMAPREDMODE, "nonstrict");
     hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT, 
HiveInputFormat.class.getName());
-    hiveConf
-        .setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
-            
"org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
+    hiveConf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, 
+        
"org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
+    MetastoreConf.setBoolVar(hiveConf, 
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON, true);
     TestTxnDbUtil.setConfValues(hiveConf);
     hiveConf.setInt(MRJobConfig.MAP_MEMORY_MB, 1024);
     hiveConf.setInt(MRJobConfig.REDUCE_MEMORY_MB, 1024);
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java
index a43fd54389b..10e88ca97ba 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java
@@ -92,6 +92,7 @@ public abstract class CompactorOnTezTest {
     hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT, 
HiveInputFormat.class.getName());
     hiveConf.setVar(HiveConf.ConfVars.HIVEFETCHTASKCONVERSION, "none");
     MetastoreConf.setTimeVar(hiveConf, 
MetastoreConf.ConfVars.TXN_OPENTXN_TIMEOUT, 2, TimeUnit.SECONDS);
+    MetastoreConf.setBoolVar(hiveConf, 
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON, true);
 
     TestTxnDbUtil.setConfValues(hiveConf);
     TestTxnDbUtil.cleanDb(hiveConf);
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java
index 653c08ca0c6..f38eb8b02bb 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java
@@ -31,6 +31,7 @@ import 
org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
 import org.apache.hadoop.hive.metastore.api.Table;
 import static 
org.apache.hadoop.hive.common.repl.ReplConst.SOURCE_OF_REPLICATION;
 import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
@@ -62,6 +63,7 @@ public class TestCleanerWithReplication extends CompactorTest 
{
     TestTxnDbUtil.cleanDb(conf);
     conf.set("fs.defaultFS", fs.getUri().toString());
     conf.setBoolVar(HiveConf.ConfVars.REPLCMENABLED, true);
+    MetastoreConf.setBoolVar(conf, 
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON, true);
     TestTxnDbUtil.prepDb(conf);
     ms = new HiveMetaStoreClient(conf);
     txnHandler = TxnUtils.getTxnStore(conf);
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index 0127d881c4d..5673446be77 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -65,6 +65,7 @@ import 
org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
 import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
 import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
@@ -139,6 +140,7 @@ public class TestCompactor {
     hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT, 
HiveInputFormat.class.getName());
     hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false);
     hiveConf.setBoolVar(HiveConf.ConfVars.HIVEOPTIMIZEMETADATAQUERIES, false);
+    MetastoreConf.setBoolVar(hiveConf, 
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON, true);
 
     TestTxnDbUtil.setConfValues(hiveConf);
     TestTxnDbUtil.cleanDb(hiveConf);
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java 
b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
index 4c0190d5f94..51403db3cc9 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
@@ -127,6 +127,8 @@ public abstract class TxnCommandsBaseForTests {
     HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.SPLIT_UPDATE, true);
     hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, false);
     hiveConf.setBoolean("mapred.input.dir.recursive", true);
+    MetastoreConf.setBoolVar(hiveConf, 
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON, true);
+      
     TestTxnDbUtil.setConfValues(hiveConf);
     txnHandler = TxnUtils.getTxnStore(hiveConf);
     TestTxnDbUtil.prepDb(hiveConf);
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java
 
b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java
index f9fc37dbad8..9d95239f354 100644
--- 
a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java
+++ 
b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java
@@ -59,6 +59,7 @@ public abstract class DbTxnManagerEndToEndTestBase {
     HiveConf.setBoolVar(conf, HiveConf.ConfVars.TXN_MERGE_INSERT_X_LOCK, true);
 
     MetastoreConf.setVar(conf, MetastoreConf.ConfVars.WAREHOUSE, 
getWarehouseDir());
+    MetastoreConf.setBoolVar(conf, 
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON, true);
     TestTxnDbUtil.setConfValues(conf);
   }
   
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java 
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
index d353f202b59..78cf8617f44 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
@@ -130,6 +130,7 @@ public abstract class CompactorTest {
   protected void setup(HiveConf conf) throws Exception {
     this.conf = conf;
     MetastoreConf.setTimeVar(conf, MetastoreConf.ConfVars.TXN_OPENTXN_TIMEOUT, 
2, TimeUnit.SECONDS);
+    MetastoreConf.setBoolVar(conf, 
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON, true);
     TestTxnDbUtil.setConfValues(conf);
     TestTxnDbUtil.cleanDb(conf);
     TestTxnDbUtil.prepDb(conf);
diff --git 
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index 850d675e307..b59a006c98c 100644
--- 
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++ 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -629,7 +629,7 @@ public class MetastoreConf {
             "Initial value of the cleaner retry retention time. The delay has 
a backoff, and calculated the following way: " +
             "pow(2, number_of_failed_attempts) * 
HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME."),
     
HIVE_COMPACTOR_CONNECTION_POOLING_MAX_CONNECTIONS("metastore.compactor.connectionPool.maxPoolSize",
-            "hive.compactor.connectionPool.maxPoolSize", 10,
+            "hive.compactor.connectionPool.maxPoolSize", 5,
             "Specify the maximum number of connections in the connection pool 
used by the compactor."),
     CONNECTION_DRIVER("javax.jdo.option.ConnectionDriverName",
         "javax.jdo.option.ConnectionDriverName", 
"org.apache.derby.jdbc.EmbeddedDriver",
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index d2a5837daa4..445ea044401 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -555,6 +555,12 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       protocolFactory = new TBinaryProtocol.Factory();
       inputProtoFactory = new TBinaryProtocol.Factory(true, true, 
maxMessageSize, maxMessageSize);
     }
+    
+    msHost = MetastoreConf.getVar(conf, ConfVars.THRIFT_BIND_HOST);
+    if (msHost != null && !msHost.trim().isEmpty()) {
+      LOG.info("Binding host " + msHost + " for metastore server");
+    }
+    
     IHMSHandler handler = newRetryingHMSHandler(baseHandler, conf);
     TServerSocket serverSocket;
     if (useSasl) {
@@ -571,12 +577,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         LOG.info("Starting DB backed MetaStore Server");
       }
     }
-
-    msHost = MetastoreConf.getVar(conf, ConfVars.THRIFT_BIND_HOST);
-    if (msHost != null && !msHost.trim().isEmpty()) {
-      LOG.info("Binding host " + msHost + " for metastore server");
-    }
-
+    
     if (!useSSL) {
       serverSocket = SecurityUtils.getServerSocket(msHost, port);
     } else {
@@ -721,7 +722,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       Condition startCondition = metaStoreThreadsLock.newCondition();
       AtomicBoolean startedServing = new AtomicBoolean();
       startMetaStoreThreads(conf, metaStoreThreadsLock, startCondition, 
startedServing,
-                isMetastoreHousekeepingLeader(conf, getServerHostName()), 
startedBackgroundThreads);
+          isMetaStoreHousekeepingLeader(conf), startedBackgroundThreads);
       signalOtherThreadsToStart(thriftServer, metaStoreThreadsLock, 
startCondition, startedServing);
     }
 
@@ -773,10 +774,9 @@ public class HiveMetaStore extends ThriftHiveMetastore {
     }
   }
 
-  private static boolean isMetastoreHousekeepingLeader(Configuration conf, 
String serverHost) {
-    String leaderHost =
-            MetastoreConf.getVar(conf,
-                    
MetastoreConf.ConfVars.METASTORE_HOUSEKEEPING_LEADER_HOSTNAME);
+  static boolean isMetaStoreHousekeepingLeader(Configuration conf) throws 
Exception {
+    String leaderHost = MetastoreConf.getVar(conf, 
MetastoreConf.ConfVars.METASTORE_HOUSEKEEPING_LEADER_HOSTNAME);
+    String serverHost = getServerHostName();
 
     // For the sake of backward compatibility, when the current HMS becomes 
the leader when no
     // leader is specified.
@@ -785,7 +785,6 @@ public class HiveMetaStore extends ThriftHiveMetastore {
               "housekeeping threads.");
       return true;
     }
-
     LOG.info(ConfVars.METASTORE_HOUSEKEEPING_LEADER_HOSTNAME + " is set to " + 
leaderHost);
     return leaderHost.trim().equals(serverHost);
   }
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PersistenceManagerProvider.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PersistenceManagerProvider.java
index d5b55b6e554..1c2edbbbad8 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PersistenceManagerProvider.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PersistenceManagerProvider.java
@@ -69,6 +69,8 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Supplier;
 
+import static 
org.apache.hadoop.hive.metastore.HiveMetaStore.isMetaStoreHousekeepingLeader;
+
 /**
  * This class is a wrapper class around PersistenceManagerFactory and its 
properties
  * These objects are static and need to be carefully modified together such 
that there are no
@@ -160,14 +162,14 @@ public class PersistenceManagerProvider {
     boolean readLockAcquired = true;
     try {
       // if pmf properties change, need to update, release read lock and take 
write lock
-      if (prop == null || pmf == null || !propsFromConf.equals(prop)) {
+      if (pmf == null || !propsFromConf.equals(prop)) {
         pmfReadLock.unlock();
         readLockAcquired = false;
         pmfWriteLock.lock();
         try {
           // check if we need to update pmf again here in case some other 
thread already did it
           // for us after releasing readlock and before acquiring write lock 
above
-          if (prop == null || pmf == null || !propsFromConf.equals(prop)) {
+          if (pmf == null || !propsFromConf.equals(prop)) {
             // OK, now we really need to re-initialize pmf and pmf properties
             if (LOG.isInfoEnabled()) {
               LOG.info("Updating the pmf due to property change");
@@ -195,7 +197,7 @@ public class PersistenceManagerProvider {
               }
             }
             if (pmf != null) {
-              clearOutPmfClassLoaderCache();
+              clearOutPmfClassLoaderCache(pmf);
               if (!forTwoMetastoreTesting) {
                 // close the underlying connection pool to avoid leaks
                 LOG.debug("Closing PersistenceManagerFactory");
@@ -210,8 +212,15 @@ public class PersistenceManagerProvider {
             retryInterval = MetastoreConf
                 .getTimeVar(conf, ConfVars.HMS_HANDLER_INTERVAL, 
TimeUnit.MILLISECONDS);
             // init PMF with retry logic
-            pmf = retry(() -> initPMF(conf, true));
-            compactorPmf = retry(() -> initPMF(conf, false));
+            pmf = retry(() -> initPMF(conf, false));
+            try {
+              if (isMetaStoreHousekeepingLeader(conf) && 
MetastoreConf.getBoolVar(conf, ConfVars.COMPACTOR_INITIATOR_ON)
+                    && compactorPmf == null) {
+                compactorPmf = retry(() -> initPMF(conf, true));
+              }
+            } catch (Exception e) {
+              LOG.error(e.getMessage());
+            }
           }
           // downgrade by acquiring read lock before releasing write lock
           pmfReadLock.lock();
@@ -231,21 +240,24 @@ public class PersistenceManagerProvider {
     DataSourceProvider dsp = 
DataSourceProviderFactory.tryGetDataSourceProviderOrNull(conf);
     PersistenceManagerFactory pmf;
 
+    // Any preexisting datanucleus property should be passed along
+    Map<Object, Object> dsProp = new HashMap<>(prop);
+    int maxPoolSize = -1;
+    
+    if (forCompactor) {
+      maxPoolSize = MetastoreConf.getIntVar(conf, 
ConfVars.HIVE_COMPACTOR_CONNECTION_POOLING_MAX_CONNECTIONS);
+      dsProp.put(ConfVars.CONNECTION_POOLING_MAX_CONNECTIONS.getVarname(), 
maxPoolSize);
+    }
     if (dsp == null) {
-      pmf = JDOHelper.getPersistenceManagerFactory(prop);
+      pmf = JDOHelper.getPersistenceManagerFactory(dsProp);
     } else {
       try {
-        DataSource ds =
-                forCompactor ? dsp.create(conf, MetastoreConf.getIntVar(conf, 
ConfVars.HIVE_COMPACTOR_CONNECTION_POOLING_MAX_CONNECTIONS)) :
-                        dsp.create(conf);
-        Map<Object, Object> dsProperties = new HashMap<>();
-        //Any preexisting datanucleus property should be passed along
-        dsProperties.putAll(prop);
-        dsProperties.put(PropertyNames.PROPERTY_CONNECTION_FACTORY, ds);
-        dsProperties.put(PropertyNames.PROPERTY_CONNECTION_FACTORY2, ds);
-        dsProperties.put(ConfVars.MANAGER_FACTORY_CLASS.getVarname(),
+        DataSource ds = (maxPoolSize > 0) ? dsp.create(conf, maxPoolSize) : 
dsp.create(conf);
+        dsProp.put(PropertyNames.PROPERTY_CONNECTION_FACTORY, ds);
+        dsProp.put(PropertyNames.PROPERTY_CONNECTION_FACTORY2, ds);
+        dsProp.put(ConfVars.MANAGER_FACTORY_CLASS.getVarname(),
             "org.datanucleus.api.jdo.JDOPersistenceManagerFactory");
-        pmf = JDOHelper.getPersistenceManagerFactory(dsProperties);
+        pmf = JDOHelper.getPersistenceManagerFactory(dsProp);
       } catch (SQLException e) {
         LOG.warn("Could not create PersistenceManagerFactory using "
             + "connection pool properties, will fall back", e);
@@ -291,84 +303,89 @@ public class PersistenceManagerProvider {
   public static void clearOutPmfClassLoaderCache() {
     pmfWriteLock.lock();
     try {
-      if ((pmf == null) || (!(pmf instanceof JDOPersistenceManagerFactory))) {
-        return;
-      }
-      // NOTE : This is hacky, and this section of code is fragile depending 
on DN code varnames
-      // so it's likely to stop working at some time in the future, especially 
if we upgrade DN
-      // versions, so we actively need to find a better way to make sure the 
leak doesn't happen
-      // instead of just clearing out the cache after every call.
-      JDOPersistenceManagerFactory jdoPmf = (JDOPersistenceManagerFactory) pmf;
-      NucleusContext nc = jdoPmf.getNucleusContext();
-      try {
-        Field pmCache = pmf.getClass().getDeclaredField("pmCache");
-        pmCache.setAccessible(true);
-        Set<JDOPersistenceManager> pmSet = (Set<JDOPersistenceManager>) 
pmCache.get(pmf);
-        for (JDOPersistenceManager pm : pmSet) {
-          org.datanucleus.ExecutionContext ec = pm.getExecutionContext();
-          if (ec instanceof org.datanucleus.ExecutionContextThreadedImpl) {
-            ClassLoaderResolver clr =
-                ((org.datanucleus.ExecutionContextThreadedImpl) 
ec).getClassLoaderResolver();
-            clearClr(clr);
-          }
-        }
-        org.datanucleus.plugin.PluginManager pluginManager =
-            jdoPmf.getNucleusContext().getPluginManager();
-        Field registryField = 
pluginManager.getClass().getDeclaredField("registry");
-        registryField.setAccessible(true);
-        org.datanucleus.plugin.PluginRegistry registry =
-            (org.datanucleus.plugin.PluginRegistry) 
registryField.get(pluginManager);
-        if (registry instanceof 
org.datanucleus.plugin.NonManagedPluginRegistry) {
-          org.datanucleus.plugin.NonManagedPluginRegistry nRegistry =
-              (org.datanucleus.plugin.NonManagedPluginRegistry) registry;
-          Field clrField = nRegistry.getClass().getDeclaredField("clr");
-          clrField.setAccessible(true);
-          ClassLoaderResolver clr = (ClassLoaderResolver) 
clrField.get(nRegistry);
+      clearOutPmfClassLoaderCache(pmf);
+      clearOutPmfClassLoaderCache(compactorPmf);
+    } finally {
+      pmfWriteLock.unlock();
+    }
+  }
+  
+  private static void clearOutPmfClassLoaderCache (PersistenceManagerFactory 
pmf) {
+    if (!(pmf instanceof JDOPersistenceManagerFactory)) {
+      return;
+    }
+    // NOTE : This is hacky, and this section of code is fragile depending on 
DN code varnames
+    // so it's likely to stop working at some time in the future, especially 
if we upgrade DN
+    // versions, so we actively need to find a better way to make sure the 
leak doesn't happen
+    // instead of just clearing out the cache after every call.
+    JDOPersistenceManagerFactory jdoPmf = (JDOPersistenceManagerFactory) pmf;
+    NucleusContext nc = jdoPmf.getNucleusContext();
+    try {
+      Field pmCache = pmf.getClass().getDeclaredField("pmCache");
+      pmCache.setAccessible(true);
+      Set<JDOPersistenceManager> pmSet = (Set<JDOPersistenceManager>) 
pmCache.get(pmf);
+      for (JDOPersistenceManager pm : pmSet) {
+        org.datanucleus.ExecutionContext ec = pm.getExecutionContext();
+        if (ec instanceof org.datanucleus.ExecutionContextThreadedImpl) {
+          ClassLoaderResolver clr =
+              ((org.datanucleus.ExecutionContextThreadedImpl) 
ec).getClassLoaderResolver();
           clearClr(clr);
         }
-        if (nc instanceof org.datanucleus.PersistenceNucleusContextImpl) {
-          org.datanucleus.PersistenceNucleusContextImpl pnc =
-              (org.datanucleus.PersistenceNucleusContextImpl) nc;
-          org.datanucleus.store.types.TypeManagerImpl tm =
-              (org.datanucleus.store.types.TypeManagerImpl) 
pnc.getTypeManager();
-          Field clrField = tm.getClass().getDeclaredField("clr");
+      }
+      org.datanucleus.plugin.PluginManager pluginManager =
+          jdoPmf.getNucleusContext().getPluginManager();
+      Field registryField = 
pluginManager.getClass().getDeclaredField("registry");
+      registryField.setAccessible(true);
+      org.datanucleus.plugin.PluginRegistry registry =
+          (org.datanucleus.plugin.PluginRegistry) 
registryField.get(pluginManager);
+      if (registry instanceof org.datanucleus.plugin.NonManagedPluginRegistry) 
{
+        org.datanucleus.plugin.NonManagedPluginRegistry nRegistry =
+            (org.datanucleus.plugin.NonManagedPluginRegistry) registry;
+        Field clrField = nRegistry.getClass().getDeclaredField("clr");
+        clrField.setAccessible(true);
+        ClassLoaderResolver clr = (ClassLoaderResolver) 
clrField.get(nRegistry);
+        clearClr(clr);
+      }
+      if (nc instanceof org.datanucleus.PersistenceNucleusContextImpl) {
+        org.datanucleus.PersistenceNucleusContextImpl pnc =
+            (org.datanucleus.PersistenceNucleusContextImpl) nc;
+        org.datanucleus.store.types.TypeManagerImpl tm =
+            (org.datanucleus.store.types.TypeManagerImpl) pnc.getTypeManager();
+        Field clrField = tm.getClass().getDeclaredField("clr");
+        clrField.setAccessible(true);
+        ClassLoaderResolver clr = (ClassLoaderResolver) clrField.get(tm);
+        clearClr(clr);
+        Field storeMgrField = pnc.getClass().getDeclaredField("storeMgr");
+        storeMgrField.setAccessible(true);
+        org.datanucleus.store.rdbms.RDBMSStoreManager storeMgr =
+            (org.datanucleus.store.rdbms.RDBMSStoreManager) 
storeMgrField.get(pnc);
+        Field backingStoreField =
+            storeMgr.getClass().getDeclaredField("backingStoreByMemberName");
+        backingStoreField.setAccessible(true);
+        Map<String, Store> backingStoreByMemberName =
+            (Map<String, Store>) backingStoreField.get(storeMgr);
+        for (Store store : backingStoreByMemberName.values()) {
+          org.datanucleus.store.rdbms.scostore.BaseContainerStore baseStore =
+              (org.datanucleus.store.rdbms.scostore.BaseContainerStore) store;
+          clrField = 
org.datanucleus.store.rdbms.scostore.BaseContainerStore.class
+              .getDeclaredField("clr");
           clrField.setAccessible(true);
-          ClassLoaderResolver clr = (ClassLoaderResolver) clrField.get(tm);
-          clearClr(clr);
-          Field storeMgrField = pnc.getClass().getDeclaredField("storeMgr");
-          storeMgrField.setAccessible(true);
-          org.datanucleus.store.rdbms.RDBMSStoreManager storeMgr =
-              (org.datanucleus.store.rdbms.RDBMSStoreManager) 
storeMgrField.get(pnc);
-          Field backingStoreField =
-              storeMgr.getClass().getDeclaredField("backingStoreByMemberName");
-          backingStoreField.setAccessible(true);
-          Map<String, Store> backingStoreByMemberName =
-              (Map<String, Store>) backingStoreField.get(storeMgr);
-          for (Store store : backingStoreByMemberName.values()) {
-            org.datanucleus.store.rdbms.scostore.BaseContainerStore baseStore =
-                (org.datanucleus.store.rdbms.scostore.BaseContainerStore) 
store;
-            clrField = 
org.datanucleus.store.rdbms.scostore.BaseContainerStore.class
-                .getDeclaredField("clr");
-            clrField.setAccessible(true);
-            clr = (ClassLoaderResolver) clrField.get(baseStore);
-            clearClr(clr);
-          }
-        }
-        Field classLoaderResolverMap =
-            
AbstractNucleusContext.class.getDeclaredField("classLoaderResolverMap");
-        classLoaderResolverMap.setAccessible(true);
-        Map<String, ClassLoaderResolver> loaderMap =
-            (Map<String, ClassLoaderResolver>) classLoaderResolverMap.get(nc);
-        for (ClassLoaderResolver clr : loaderMap.values()) {
+          clr = (ClassLoaderResolver) clrField.get(baseStore);
           clearClr(clr);
         }
-        classLoaderResolverMap.set(nc, new HashMap<String, 
ClassLoaderResolver>());
-        LOG.debug("Removed cached classloaders from DataNucleus 
NucleusContext");
-      } catch (Exception e) {
-        LOG.warn("Failed to remove cached classloaders from DataNucleus 
NucleusContext", e);
       }
-    } finally {
-      pmfWriteLock.unlock();
+      Field classLoaderResolverMap =
+          
AbstractNucleusContext.class.getDeclaredField("classLoaderResolverMap");
+      classLoaderResolverMap.setAccessible(true);
+      Map<String, ClassLoaderResolver> loaderMap =
+          (Map<String, ClassLoaderResolver>) classLoaderResolverMap.get(nc);
+      for (ClassLoaderResolver clr : loaderMap.values()) {
+        clearClr(clr);
+      }
+      classLoaderResolverMap.set(nc, new HashMap<String, 
ClassLoaderResolver>());
+      LOG.debug("Removed cached classloaders from DataNucleus NucleusContext");
+    } catch (Exception e) {
+      LOG.warn("Failed to remove cached classloaders from DataNucleus 
NucleusContext", e);
     }
   }
 
@@ -411,7 +428,7 @@ public class PersistenceManagerProvider {
   public static PersistenceManager getPersistenceManager(boolean forCompactor) 
{
     pmfReadLock.lock();
     try {
-      if ((!forCompactor && pmf == null) || (forCompactor && compactorPmf == 
null)) {
+      if (pmf == null || (forCompactor && compactorPmf == null)) {
         throw new RuntimeException(
             "Cannot create PersistenceManager. PersistenceManagerFactory is 
not yet initialized");
       }
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 74e854944b8..a52b9aec0a7 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -379,16 +379,8 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
       if (connPool == null) {
         connPool = setupJdbcConnectionPool(conf, maxPoolSize);
       }
-
       if (connPoolMutex == null) {
-        /*the mutex pools should ideally be somewhat larger since some 
operations require 1
-           connection from each pool and we want to avoid taking a connection 
from primary pool
-           and then blocking because mutex pool is empty.  There is only 1 
thread in any HMS trying
-           to mutex on each MUTEX_KEY except MUTEX_KEY.CheckLock.  The 
CheckLock operation gets a
-           connection from connPool first, then connPoolMutex.  All others, go 
in the opposite
-           order (not very elegant...).  So number of connection requests for 
connPoolMutex cannot
-           exceed (size of connPool + MUTEX_KEY.values().length - 1).*/
-        connPoolMutex = setupJdbcConnectionPool(conf, maxPoolSize + 
MUTEX_KEY.values().length);
+        connPoolMutex = setupJdbcConnectionPool(conf, maxPoolSize);
       }
 
       if (dbProduct == null) {

Reply via email to