This is an automated email from the ASF dual-hosted git repository.

asinkovits pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 1db1da8042 HIVE-26176: Create a new connection pool for compaction 
(CompactionTxnHandler) (Antal Sinkovits, reviewed by Denys Kuzmenko and Peter 
Vary)
1db1da8042 is described below

commit 1db1da804271f65d4a63572cdd48f24425c1615c
Author: Antal Sinkovits <[email protected]>
AuthorDate: Thu Apr 28 12:33:24 2022 +0200

    HIVE-26176: Create a new connection pool for compaction 
(CompactionTxnHandler) (Antal Sinkovits, reviewed by Denys Kuzmenko and Peter 
Vary)
    
    Closes #3223
---
 .../hadoop/hive/metastore/conf/MetastoreConf.java  |  3 +
 .../metastore/datasource/DataSourceProvider.java   | 12 +++-
 .../datasource/DbCPDataSourceProvider.java         |  5 +-
 .../datasource/HikariCPDataSourceProvider.java     | 11 ++--
 .../hive/metastore/txn/CompactionTxnHandler.java   | 67 +++++++++++++---------
 .../hadoop/hive/metastore/txn/TxnHandler.java      | 11 ++--
 6 files changed, 65 insertions(+), 44 deletions(-)

diff --git 
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index 6b337aab94..a2f0e17a75 100644
--- 
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++ 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -625,6 +625,9 @@ public class MetastoreConf {
             "hive.compactor.cleaner.retry.retentionTime", 300, 
TimeUnit.SECONDS, new TimeValidator(TimeUnit.SECONDS),
             "Initial value of the cleaner retry retention time. The delay has 
a backoff, and calculated the following way: " +
             "pow(2, number_of_failed_attempts) * 
HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME."),
+    
HIVE_COMPACTOR_CONNECTION_POOLING_MAX_CONNECTIONS("metastore.compactor.connectionPool.maxPoolSize",
+            "hive.compactor.connectionPool.maxPoolSize", 10,
+            "Specify the maximum number of connections in the connection pool 
used by the compactor."),
     CONNECTION_DRIVER("javax.jdo.option.ConnectionDriverName",
         "javax.jdo.option.ConnectionDriverName", 
"org.apache.derby.jdbc.EmbeddedDriver",
         "Driver class name for a JDBC metastore"),
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/DataSourceProvider.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/DataSourceProvider.java
index 29633b500d..4e5803ead0 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/DataSourceProvider.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/DataSourceProvider.java
@@ -32,7 +32,17 @@ public interface DataSourceProvider {
    * @param hdpConfig
    * @return the new connection pool
    */
-  DataSource create(Configuration hdpConfig) throws SQLException;
+  default DataSource create(Configuration hdpConfig) throws SQLException {
+    int maxPoolSize = MetastoreConf.getIntVar(hdpConfig, 
MetastoreConf.ConfVars.CONNECTION_POOLING_MAX_CONNECTIONS);
+    return create(hdpConfig, maxPoolSize);
+  }
+
+  /**
+   * @param hdpConfig
+   * @param maxPoolSize the maximum size of the connection pool
+   * @return the new connection pool
+   */
+  DataSource create(Configuration hdpConfig, int maxPoolSize) throws 
SQLException;
 
   /**
    * Get the declared pooling type string. This is used to check against the 
constant in
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/DbCPDataSourceProvider.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/DbCPDataSourceProvider.java
index 4069a9350c..476a3d846f 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/DbCPDataSourceProvider.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/DbCPDataSourceProvider.java
@@ -57,7 +57,7 @@ public class DbCPDataSourceProvider implements 
DataSourceProvider {
 
   @SuppressWarnings({ "rawtypes", "unchecked" })
   @Override
-  public DataSource create(Configuration hdpConfig) throws SQLException {
+  public DataSource create(Configuration hdpConfig, int maxPoolSize) throws 
SQLException {
     LOG.debug("Creating dbcp connection pool for the MetaStore");
 
     String driverUrl = DataSourceProvider.getMetastoreJdbcDriverUrl(hdpConfig);
@@ -77,9 +77,6 @@ public class DbCPDataSourceProvider implements 
DataSourceProvider {
       dbcpDs.setConnectionProperties(kv.getKey() + "=" + kv.getValue());
     }
 
-    int maxPoolSize = hdpConfig.getInt(
-            
MetastoreConf.ConfVars.CONNECTION_POOLING_MAX_CONNECTIONS.getVarname(),
-            ((Long) 
MetastoreConf.ConfVars.CONNECTION_POOLING_MAX_CONNECTIONS.getDefaultVal()).intValue());
     long connectionTimeout = hdpConfig.getLong(CONNECTION_TIMEOUT_PROPERTY, 
30000L);
     int connectionMaxIlde = hdpConfig.getInt(CONNECTION_MAX_IDLE_PROPERTY, 8);
     int connectionMinIlde = hdpConfig.getInt(CONNECTION_MIN_IDLE_PROPERTY, 0);
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/HikariCPDataSourceProvider.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/HikariCPDataSourceProvider.java
index 58b2bcfb4d..a48ce47ce1 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/HikariCPDataSourceProvider.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/HikariCPDataSourceProvider.java
@@ -44,16 +44,13 @@ public class HikariCPDataSourceProvider implements 
DataSourceProvider {
   private static final String LEAK_DETECTION_THRESHOLD = HIKARI + 
".leakDetectionThreshold";
 
   @Override
-  public DataSource create(Configuration hdpConfig) throws SQLException {
+  public DataSource create(Configuration hdpConfig, int maxPoolSize) throws 
SQLException {
     LOG.debug("Creating Hikari connection pool for the MetaStore");
 
     String driverUrl = DataSourceProvider.getMetastoreJdbcDriverUrl(hdpConfig);
     String user = DataSourceProvider.getMetastoreJdbcUser(hdpConfig);
     String passwd = DataSourceProvider.getMetastoreJdbcPasswd(hdpConfig);
 
-    int maxPoolSize = MetastoreConf.getIntVar(hdpConfig,
-        MetastoreConf.ConfVars.CONNECTION_POOLING_MAX_CONNECTIONS);
-
     Properties properties = replacePrefix(
         DataSourceProvider.getPrefixedProperties(hdpConfig, HIKARI));
     long connectionTimeout = hdpConfig.getLong(CONNECTION_TIMEOUT_PROPERTY, 
30000L);
@@ -75,14 +72,14 @@ public class HikariCPDataSourceProvider implements 
DataSourceProvider {
     config.setConnectionTimeout(connectionTimeout);
 
     DatabaseProduct dbProduct =  
DatabaseProduct.determineDatabaseProduct(driverUrl, hdpConfig);
-    
+
     String s = dbProduct.getPrepareTxnStmt();
     if (s!= null) {
       config.setConnectionInitSql(s);
     }
-    
+
     Map<String, String> props = dbProduct.getDataSourceProperties();
-    
+
     for ( Map.Entry<String, String> kv : props.entrySet()) {
       config.addDataSourceProperty(kv.getKey(), kv.getValue());
     }
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index ac22ab0563..5753c7863e 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hive.metastore.txn;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.classification.RetrySemantics;
 import org.apache.hadoop.hive.metastore.MetaStoreListenerNotifier;
 import org.apache.hadoop.hive.metastore.api.CompactionType;
@@ -31,6 +32,7 @@ import org.apache.hadoop.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.sql.DataSource;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
@@ -55,6 +57,8 @@ class CompactionTxnHandler extends TxnHandler {
   static final private String CLASS_NAME = 
CompactionTxnHandler.class.getName();
   static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
 
+  private static DataSource connPoolCompaction;
+
   private static final String SELECT_COMPACTION_QUEUE_BY_TXN_ID =
       "SELECT \"CQ_ID\", \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\", "
           + "\"CQ_STATE\", \"CQ_TYPE\", \"CQ_TBLPROPERTIES\", 
\"CQ_WORKER_ID\", \"CQ_START\", \"CQ_RUN_AS\", "
@@ -82,6 +86,17 @@ class CompactionTxnHandler extends TxnHandler {
   public CompactionTxnHandler() {
   }
 
+  @Override
+  public void setConf(Configuration conf) {
+    super.setConf(conf);
+    synchronized (CompactionTxnHandler.class) {
+      if (connPoolCompaction == null) {
+        int maxPoolSize = MetastoreConf.getIntVar(conf, 
ConfVars.HIVE_COMPACTOR_CONNECTION_POOLING_MAX_CONNECTIONS);
+        connPoolCompaction = setupJdbcConnectionPool(conf, maxPoolSize);
+      }
+    }
+  }
+
   /**
    * This will look through the completed_txn_components table and look for 
partitions or tables
    * that may be ready for compaction.  Also, look through txns and 
txn_components tables for
@@ -107,7 +122,7 @@ class CompactionTxnHandler extends TxnHandler {
     ResultSet rs = null;
     try {
       try {
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED, 
connPoolCompaction);
         stmt = dbConn.createStatement();
 
         // Check for completed transactions
@@ -217,7 +232,7 @@ class CompactionTxnHandler extends TxnHandler {
       Statement updStmt = null;
       ResultSet rs = null;
       try {
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED, 
connPoolCompaction);
         stmt = dbConn.createStatement();
         String query = "SELECT \"CQ_ID\", \"CQ_DATABASE\", \"CQ_TABLE\", 
\"CQ_PARTITION\", " +
           "\"CQ_TYPE\", \"CQ_TBLPROPERTIES\" FROM \"COMPACTION_QUEUE\" WHERE 
\"CQ_STATE\" = '" + INITIATED_STATE + "'";
@@ -301,7 +316,7 @@ class CompactionTxnHandler extends TxnHandler {
       Connection dbConn = null;
       Statement stmt = null;
       try {
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED, 
connPoolCompaction);
         stmt = dbConn.createStatement();
         String s = "UPDATE \"COMPACTION_QUEUE\" SET \"CQ_STATE\" = '" + 
READY_FOR_CLEANING + "', "
             + "\"CQ_WORKER_ID\" = NULL"
@@ -343,7 +358,7 @@ class CompactionTxnHandler extends TxnHandler {
     try {
       List<CompactionInfo> rc = new ArrayList<>();
 
-      try (Connection dbConn = 
getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+      try (Connection dbConn = 
getDbConn(Connection.TRANSACTION_READ_COMMITTED, connPoolCompaction);
           Statement stmt = dbConn.createStatement()) {
         /*
          * By filtering on minOpenTxnWaterMark, we will only cleanup after 
every transaction is committed, that could see
@@ -417,7 +432,7 @@ class CompactionTxnHandler extends TxnHandler {
     try {
       Connection dbConn = null;
       try {
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED, 
connPoolCompaction);
         long now = getDbTime(dbConn);
         setCleanerStart(dbConn, info, now);
       } catch (SQLException e) {
@@ -449,7 +464,7 @@ class CompactionTxnHandler extends TxnHandler {
     try {
       Connection dbConn = null;
       try {
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED, 
connPoolCompaction);
         setCleanerStart(dbConn, info, -1L);
       } catch (SQLException e) {
         LOG.error("Unable to clear the cleaner start time for compaction 
record  " + e.getMessage());
@@ -515,7 +530,7 @@ class CompactionTxnHandler extends TxnHandler {
       PreparedStatement pStmt = null;
       ResultSet rs = null;
       try {
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED, 
connPoolCompaction);
         String s = "INSERT INTO \"COMPLETED_COMPACTIONS\"(\"CC_ID\", 
\"CC_DATABASE\", "
             + "\"CC_TABLE\", \"CC_PARTITION\", \"CC_STATE\", \"CC_TYPE\", 
\"CC_TBLPROPERTIES\", \"CC_WORKER_ID\", "
             + "\"CC_START\", \"CC_END\", \"CC_RUN_AS\", 
\"CC_HIGHEST_WRITE_ID\", \"CC_META_INFO\", "
@@ -658,7 +673,7 @@ class CompactionTxnHandler extends TxnHandler {
 
         // We query for minimum values in all the queries and they can only 
increase by any concurrent
         // operations. So, READ COMMITTED is sufficient.
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED, 
connPoolCompaction);
         stmt = dbConn.createStatement();
 
         // First need to find the min_uncommitted_txnid which is currently 
seen by any open transactions.
@@ -713,7 +728,7 @@ class CompactionTxnHandler extends TxnHandler {
       Connection dbConn = null;
       Statement stmt = null;
       try {
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED, 
connPoolCompaction);
         stmt = dbConn.createStatement();
 
         String s;
@@ -814,7 +829,7 @@ class CompactionTxnHandler extends TxnHandler {
       try {
         //Aborted and committed are terminal states, so nothing about the txn 
can change
         //after that, so READ COMMITTED is sufficient.
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED, 
connPoolCompaction);
         stmt = dbConn.createStatement();
         /*
          * Only delete aborted / committed transaction in a way that 
guarantees two things:
@@ -885,7 +900,7 @@ class CompactionTxnHandler extends TxnHandler {
       Connection dbConn = null;
       Statement stmt = null;
       try {
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED, 
connPoolCompaction);
         stmt = dbConn.createStatement();
         String s = "UPDATE \"COMPACTION_QUEUE\" SET \"CQ_WORKER_ID\" = NULL, 
\"CQ_START\" = NULL, \"CQ_STATE\" = '"
           + INITIATED_STATE+ "' WHERE \"CQ_STATE\" = '" + WORKING_STATE + "' 
AND \"CQ_WORKER_ID\" LIKE '"
@@ -931,7 +946,7 @@ class CompactionTxnHandler extends TxnHandler {
       Connection dbConn = null;
       Statement stmt = null;
       try {
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED, 
connPoolCompaction);
         long latestValidStart = getDbTime(dbConn) - timeout;
         stmt = dbConn.createStatement();
         String s = "UPDATE \"COMPACTION_QUEUE\" SET \"CQ_WORKER_ID\" = NULL, 
\"CQ_START\" = NULL, \"CQ_STATE\" = '"
@@ -979,7 +994,7 @@ class CompactionTxnHandler extends TxnHandler {
     ResultSet rs = null;
     try {
       try {
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED, 
connPoolCompaction);
         String quote = getIdentifierQuoteString(dbConn);
         StringBuilder bldr = new StringBuilder();
         bldr.append("SELECT 
").append(quote).append("COLUMN_NAME").append(quote)
@@ -1035,7 +1050,7 @@ class CompactionTxnHandler extends TxnHandler {
     Statement stmt = null;
     try {
       try {
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED, 
connPoolCompaction);
         stmt = dbConn.createStatement();
         String sqlText = "UPDATE \"COMPACTION_QUEUE\" SET 
\"CQ_HIGHEST_WRITE_ID\" = " +
             ci.highestWriteId + ", \"CQ_RUN_AS\" = " + quoteString(ci.runAs) + 
", \"CQ_TXN_ID\" = " + compactionTxnId +
@@ -1175,7 +1190,7 @@ class CompactionTxnHandler extends TxnHandler {
     int refusedRetention = MetastoreConf.getIntVar(conf, 
ConfVars.COMPACTOR_HISTORY_RETENTION_REFUSED);
     try {
       try {
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED, 
connPoolCompaction);
         stmt = dbConn.createStatement();
         /* cc_id is monotonically increasing so for any entity sorts in order 
of compaction history,
         thus this query groups by entity and withing group sorts most recent 
first */
@@ -1281,7 +1296,7 @@ class CompactionTxnHandler extends TxnHandler {
     ResultSet rs = null;
     try {
       try {
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED, 
connPoolCompaction);
         pStmt = dbConn.prepareStatement("SELECT \"CC_STATE\", 
\"CC_ENQUEUE_TIME\" FROM \"COMPLETED_COMPACTIONS\" WHERE " +
           "\"CC_DATABASE\" = ? AND " +
           "\"CC_TABLE\" = ? " +
@@ -1342,7 +1357,7 @@ class CompactionTxnHandler extends TxnHandler {
       PreparedStatement pStmt = null;
       ResultSet rs = null;
       try {
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED, 
connPoolCompaction);
         stmt = dbConn.createStatement();
         pStmt = dbConn.prepareStatement("SELECT \"CQ_ID\", \"CQ_DATABASE\", 
\"CQ_TABLE\", \"CQ_PARTITION\", "
                 + "\"CQ_STATE\", \"CQ_TYPE\", \"CQ_TBLPROPERTIES\", 
\"CQ_WORKER_ID\", \"CQ_START\", \"CQ_RUN_AS\", "
@@ -1440,7 +1455,7 @@ class CompactionTxnHandler extends TxnHandler {
   @RetrySemantics.CannotRetry
   public void setCleanerRetryRetentionTimeOnError(CompactionInfo info) throws 
MetaException {
     try {
-      try (Connection dbConn = 
getDbConn(Connection.TRANSACTION_READ_COMMITTED)) {
+      try (Connection dbConn = 
getDbConn(Connection.TRANSACTION_READ_COMMITTED, connPoolCompaction)) {
         try (PreparedStatement stmt = dbConn.prepareStatement("UPDATE 
\"COMPACTION_QUEUE\" " +
                 "SET \"CQ_RETRY_RETENTION\" = ?, \"CQ_ERROR_MESSAGE\"= ? WHERE 
\"CQ_ID\" = ?")) {
           stmt.setLong(1, info.retryRetention);
@@ -1479,7 +1494,7 @@ class CompactionTxnHandler extends TxnHandler {
       Connection dbConn = null;
       Statement stmt = null;
       try {
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED, 
connPoolCompaction);
         stmt = dbConn.createStatement();
         String s = "UPDATE \"COMPACTION_QUEUE\" SET \"CQ_HADOOP_JOB_ID\" = " + 
quoteString(hadoopJobId)
             + " WHERE \"CQ_ID\" = " + id;
@@ -1508,7 +1523,7 @@ class CompactionTxnHandler extends TxnHandler {
     Connection dbConn = null;
     try {
       try {
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED, 
connPoolCompaction);
         return getMinOpenTxnIdWaterMark(dbConn);
       } catch (SQLException e) {
         LOG.error("Unable to getMinOpenTxnIdForCleaner", e);
@@ -1540,7 +1555,7 @@ class CompactionTxnHandler extends TxnHandler {
     Connection dbConn = null;
     try {
       try {
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED, 
connPoolCompaction);
         long minOpenTxn;
         try (Statement stmt = dbConn.createStatement()) {
           try (ResultSet rs = stmt.executeQuery("SELECT 
MIN(\"MHL_MIN_OPEN_TXNID\") FROM \"MIN_HISTORY_LEVEL\"")) {
@@ -1602,7 +1617,7 @@ class CompactionTxnHandler extends TxnHandler {
     Connection dbConn = null;
     try {
       try {
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED, 
connPoolCompaction);
         return getCompactionByTxnId(dbConn, txnId);
       } catch (SQLException e) {
         LOG.error("Unable to getCompactionByTxnId", e);
@@ -1638,7 +1653,7 @@ class CompactionTxnHandler extends TxnHandler {
     Connection dbConn = null;
     try {
       try {
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED, 
connPoolCompaction);
         boolean updateRes;
         CompactionMetricsData prevMetricsData = getCompactionMetricsData(data, 
dbConn);
         if (data.getMetricValue() >= data.getThreshold()) {
@@ -1676,7 +1691,7 @@ class CompactionTxnHandler extends TxnHandler {
     List<CompactionMetricsData> metricsDataList = new ArrayList<>();
     try {
       try {
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED, 
connPoolCompaction);
         for (CompactionMetricsData.MetricType type : 
CompactionMetricsData.MetricType.values()) {
           String query = sqlGenerator.addLimitClause(limit, 
NO_SELECT_COMPACTION_METRICS_CACHE_FOR_TYPE_QUERY);
           try (PreparedStatement pstmt = dbConn.prepareStatement(query)) {
@@ -1715,7 +1730,7 @@ class CompactionTxnHandler extends TxnHandler {
     Connection dbConn = null;
     try {
       try {
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED, 
connPoolCompaction);
         return getCompactionMetricsData(new 
CompactionMetricsData.Builder().dbName(dbName).tblName(tblName)
             .partitionName(partitionName).metricType(type).build(), dbConn);
       } catch (SQLException e) {
@@ -1764,7 +1779,7 @@ class CompactionTxnHandler extends TxnHandler {
     Connection dbConn = null;
     try {
       try {
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED, 
connPoolCompaction);
         removeCompactionMetricsData(dbConn, dbName, tblName, partitionName, 
type);
       } catch (SQLException e) {
         rollbackDBConn(dbConn);
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index b190b4bdf9..a94579fced 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -373,10 +373,9 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
     this.conf = conf;
 
     int maxPoolSize = MetastoreConf.getIntVar(conf, 
ConfVars.CONNECTION_POOLING_MAX_CONNECTIONS);
-    long getConnectionTimeoutMs = 30000;
     synchronized (TxnHandler.class) {
       if (connPool == null) {
-        connPool = setupJdbcConnectionPool(conf, maxPoolSize, 
getConnectionTimeoutMs);
+        connPool = setupJdbcConnectionPool(conf, maxPoolSize);
       }
 
       if (connPoolMutex == null) {
@@ -387,7 +386,7 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
            connection from connPool first, then connPoolMutex.  All others, go 
in the opposite
            order (not very elegant...).  So number of connection requests for 
connPoolMutex cannot
            exceed (size of connPool + MUTEX_KEY.values().length - 1).*/
-        connPoolMutex = setupJdbcConnectionPool(conf, maxPoolSize + 
MUTEX_KEY.values().length, getConnectionTimeoutMs);
+        connPoolMutex = setupJdbcConnectionPool(conf, maxPoolSize + 
MUTEX_KEY.values().length);
       }
 
       if (dbProduct == null) {
@@ -4676,7 +4675,7 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
     return getDbConn(isolationLevel, connPool);
   }
 
-  private Connection getDbConn(int isolationLevel, DataSource connPool) throws 
SQLException {
+  protected Connection getDbConn(int isolationLevel, DataSource connPool) 
throws SQLException {
     Connection dbConn = null;
     try {
       dbConn = connPool.getConnection();
@@ -5836,11 +5835,11 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
     }
   }
 
-  private synchronized static DataSource setupJdbcConnectionPool(Configuration 
conf, int maxPoolSize, long getConnectionTimeoutMs) {
+  protected synchronized static DataSource 
setupJdbcConnectionPool(Configuration conf, int maxPoolSize) {
     DataSourceProvider dsp = 
DataSourceProviderFactory.tryGetDataSourceProviderOrNull(conf);
     if (dsp != null) {
       try {
-        return dsp.create(conf);
+        return dsp.create(conf, maxPoolSize);
       } catch (SQLException e) {
         LOG.error("Unable to instantiate JDBC connection pooling", e);
         throw new RuntimeException(e);

Reply via email to