This is an automated email from the ASF dual-hosted git repository.
asinkovits pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 1db1da8042 HIVE-26176: Create a new connection pool for compaction
(CompactionTxnHandler) (Antal Sinkovits, reviewed by Denys Kuzmenko and Peter
Vary)
1db1da8042 is described below
commit 1db1da804271f65d4a63572cdd48f24425c1615c
Author: Antal Sinkovits <[email protected]>
AuthorDate: Thu Apr 28 12:33:24 2022 +0200
HIVE-26176: Create a new connection pool for compaction
(CompactionTxnHandler) (Antal Sinkovits, reviewed by Denys Kuzmenko and Peter
Vary)
Closes #3223
---
.../hadoop/hive/metastore/conf/MetastoreConf.java | 3 +
.../metastore/datasource/DataSourceProvider.java | 12 +++-
.../datasource/DbCPDataSourceProvider.java | 5 +-
.../datasource/HikariCPDataSourceProvider.java | 11 ++--
.../hive/metastore/txn/CompactionTxnHandler.java | 67 +++++++++++++---------
.../hadoop/hive/metastore/txn/TxnHandler.java | 11 ++--
6 files changed, 65 insertions(+), 44 deletions(-)
diff --git
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index 6b337aab94..a2f0e17a75 100644
---
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -625,6 +625,9 @@ public class MetastoreConf {
"hive.compactor.cleaner.retry.retentionTime", 300,
TimeUnit.SECONDS, new TimeValidator(TimeUnit.SECONDS),
"Initial value of the cleaner retry retention time. The delay has
a backoff, and calculated the following way: " +
"pow(2, number_of_failed_attempts) *
HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME."),
+
HIVE_COMPACTOR_CONNECTION_POOLING_MAX_CONNECTIONS("metastore.compactor.connectionPool.maxPoolSize",
+ "hive.compactor.connectionPool.maxPoolSize", 10,
+ "Specify the maximum number of connections in the connection pool
used by the compactor."),
CONNECTION_DRIVER("javax.jdo.option.ConnectionDriverName",
"javax.jdo.option.ConnectionDriverName",
"org.apache.derby.jdbc.EmbeddedDriver",
"Driver class name for a JDBC metastore"),
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/DataSourceProvider.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/DataSourceProvider.java
index 29633b500d..4e5803ead0 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/DataSourceProvider.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/DataSourceProvider.java
@@ -32,7 +32,17 @@ public interface DataSourceProvider {
* @param hdpConfig
* @return the new connection pool
*/
- DataSource create(Configuration hdpConfig) throws SQLException;
+ default DataSource create(Configuration hdpConfig) throws SQLException {
+ int maxPoolSize = MetastoreConf.getIntVar(hdpConfig,
MetastoreConf.ConfVars.CONNECTION_POOLING_MAX_CONNECTIONS);
+ return create(hdpConfig, maxPoolSize);
+ }
+
+ /**
+ * @param hdpConfig
+ * @param maxPoolSize the maximum size of the connection pool
+ * @return the new connection pool
+ */
+ DataSource create(Configuration hdpConfig, int maxPoolSize) throws
SQLException;
/**
* Get the declared pooling type string. This is used to check against the
constant in
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/DbCPDataSourceProvider.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/DbCPDataSourceProvider.java
index 4069a9350c..476a3d846f 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/DbCPDataSourceProvider.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/DbCPDataSourceProvider.java
@@ -57,7 +57,7 @@ public class DbCPDataSourceProvider implements
DataSourceProvider {
@SuppressWarnings({ "rawtypes", "unchecked" })
@Override
- public DataSource create(Configuration hdpConfig) throws SQLException {
+ public DataSource create(Configuration hdpConfig, int maxPoolSize) throws
SQLException {
LOG.debug("Creating dbcp connection pool for the MetaStore");
String driverUrl = DataSourceProvider.getMetastoreJdbcDriverUrl(hdpConfig);
@@ -77,9 +77,6 @@ public class DbCPDataSourceProvider implements
DataSourceProvider {
dbcpDs.setConnectionProperties(kv.getKey() + "=" + kv.getValue());
}
- int maxPoolSize = hdpConfig.getInt(
-
MetastoreConf.ConfVars.CONNECTION_POOLING_MAX_CONNECTIONS.getVarname(),
- ((Long)
MetastoreConf.ConfVars.CONNECTION_POOLING_MAX_CONNECTIONS.getDefaultVal()).intValue());
long connectionTimeout = hdpConfig.getLong(CONNECTION_TIMEOUT_PROPERTY,
30000L);
int connectionMaxIlde = hdpConfig.getInt(CONNECTION_MAX_IDLE_PROPERTY, 8);
int connectionMinIlde = hdpConfig.getInt(CONNECTION_MIN_IDLE_PROPERTY, 0);
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/HikariCPDataSourceProvider.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/HikariCPDataSourceProvider.java
index 58b2bcfb4d..a48ce47ce1 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/HikariCPDataSourceProvider.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/datasource/HikariCPDataSourceProvider.java
@@ -44,16 +44,13 @@ public class HikariCPDataSourceProvider implements
DataSourceProvider {
private static final String LEAK_DETECTION_THRESHOLD = HIKARI +
".leakDetectionThreshold";
@Override
- public DataSource create(Configuration hdpConfig) throws SQLException {
+ public DataSource create(Configuration hdpConfig, int maxPoolSize) throws
SQLException {
LOG.debug("Creating Hikari connection pool for the MetaStore");
String driverUrl = DataSourceProvider.getMetastoreJdbcDriverUrl(hdpConfig);
String user = DataSourceProvider.getMetastoreJdbcUser(hdpConfig);
String passwd = DataSourceProvider.getMetastoreJdbcPasswd(hdpConfig);
- int maxPoolSize = MetastoreConf.getIntVar(hdpConfig,
- MetastoreConf.ConfVars.CONNECTION_POOLING_MAX_CONNECTIONS);
-
Properties properties = replacePrefix(
DataSourceProvider.getPrefixedProperties(hdpConfig, HIKARI));
long connectionTimeout = hdpConfig.getLong(CONNECTION_TIMEOUT_PROPERTY,
30000L);
@@ -75,14 +72,14 @@ public class HikariCPDataSourceProvider implements
DataSourceProvider {
config.setConnectionTimeout(connectionTimeout);
DatabaseProduct dbProduct =
DatabaseProduct.determineDatabaseProduct(driverUrl, hdpConfig);
-
+
String s = dbProduct.getPrepareTxnStmt();
if (s!= null) {
config.setConnectionInitSql(s);
}
-
+
Map<String, String> props = dbProduct.getDataSourceProperties();
-
+
for ( Map.Entry<String, String> kv : props.entrySet()) {
config.addDataSourceProperty(kv.getKey(), kv.getValue());
}
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index ac22ab0563..5753c7863e 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hive.metastore.txn;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.classification.RetrySemantics;
import org.apache.hadoop.hive.metastore.MetaStoreListenerNotifier;
import org.apache.hadoop.hive.metastore.api.CompactionType;
@@ -31,6 +32,7 @@ import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
@@ -55,6 +57,8 @@ class CompactionTxnHandler extends TxnHandler {
static final private String CLASS_NAME =
CompactionTxnHandler.class.getName();
static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
+ private static DataSource connPoolCompaction;
+
private static final String SELECT_COMPACTION_QUEUE_BY_TXN_ID =
"SELECT \"CQ_ID\", \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\", "
+ "\"CQ_STATE\", \"CQ_TYPE\", \"CQ_TBLPROPERTIES\",
\"CQ_WORKER_ID\", \"CQ_START\", \"CQ_RUN_AS\", "
@@ -82,6 +86,17 @@ class CompactionTxnHandler extends TxnHandler {
public CompactionTxnHandler() {
}
+ @Override
+ public void setConf(Configuration conf) {
+ super.setConf(conf);
+ synchronized (CompactionTxnHandler.class) {
+ if (connPoolCompaction == null) {
+ int maxPoolSize = MetastoreConf.getIntVar(conf,
ConfVars.HIVE_COMPACTOR_CONNECTION_POOLING_MAX_CONNECTIONS);
+ connPoolCompaction = setupJdbcConnectionPool(conf, maxPoolSize);
+ }
+ }
+ }
+
/**
* This will look through the completed_txn_components table and look for
partitions or tables
* that may be ready for compaction. Also, look through txns and
txn_components tables for
@@ -107,7 +122,7 @@ class CompactionTxnHandler extends TxnHandler {
ResultSet rs = null;
try {
try {
- dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED,
connPoolCompaction);
stmt = dbConn.createStatement();
// Check for completed transactions
@@ -217,7 +232,7 @@ class CompactionTxnHandler extends TxnHandler {
Statement updStmt = null;
ResultSet rs = null;
try {
- dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED,
connPoolCompaction);
stmt = dbConn.createStatement();
String query = "SELECT \"CQ_ID\", \"CQ_DATABASE\", \"CQ_TABLE\",
\"CQ_PARTITION\", " +
"\"CQ_TYPE\", \"CQ_TBLPROPERTIES\" FROM \"COMPACTION_QUEUE\" WHERE
\"CQ_STATE\" = '" + INITIATED_STATE + "'";
@@ -301,7 +316,7 @@ class CompactionTxnHandler extends TxnHandler {
Connection dbConn = null;
Statement stmt = null;
try {
- dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED,
connPoolCompaction);
stmt = dbConn.createStatement();
String s = "UPDATE \"COMPACTION_QUEUE\" SET \"CQ_STATE\" = '" +
READY_FOR_CLEANING + "', "
+ "\"CQ_WORKER_ID\" = NULL"
@@ -343,7 +358,7 @@ class CompactionTxnHandler extends TxnHandler {
try {
List<CompactionInfo> rc = new ArrayList<>();
- try (Connection dbConn =
getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ try (Connection dbConn =
getDbConn(Connection.TRANSACTION_READ_COMMITTED, connPoolCompaction);
Statement stmt = dbConn.createStatement()) {
/*
* By filtering on minOpenTxnWaterMark, we will only cleanup after
every transaction is committed, that could see
@@ -417,7 +432,7 @@ class CompactionTxnHandler extends TxnHandler {
try {
Connection dbConn = null;
try {
- dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED,
connPoolCompaction);
long now = getDbTime(dbConn);
setCleanerStart(dbConn, info, now);
} catch (SQLException e) {
@@ -449,7 +464,7 @@ class CompactionTxnHandler extends TxnHandler {
try {
Connection dbConn = null;
try {
- dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED,
connPoolCompaction);
setCleanerStart(dbConn, info, -1L);
} catch (SQLException e) {
LOG.error("Unable to clear the cleaner start time for compaction
record " + e.getMessage());
@@ -515,7 +530,7 @@ class CompactionTxnHandler extends TxnHandler {
PreparedStatement pStmt = null;
ResultSet rs = null;
try {
- dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED,
connPoolCompaction);
String s = "INSERT INTO \"COMPLETED_COMPACTIONS\"(\"CC_ID\",
\"CC_DATABASE\", "
+ "\"CC_TABLE\", \"CC_PARTITION\", \"CC_STATE\", \"CC_TYPE\",
\"CC_TBLPROPERTIES\", \"CC_WORKER_ID\", "
+ "\"CC_START\", \"CC_END\", \"CC_RUN_AS\",
\"CC_HIGHEST_WRITE_ID\", \"CC_META_INFO\", "
@@ -658,7 +673,7 @@ class CompactionTxnHandler extends TxnHandler {
// We query for minimum values in all the queries and they can only
increase by any concurrent
// operations. So, READ COMMITTED is sufficient.
- dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED,
connPoolCompaction);
stmt = dbConn.createStatement();
// First need to find the min_uncommitted_txnid which is currently
seen by any open transactions.
@@ -713,7 +728,7 @@ class CompactionTxnHandler extends TxnHandler {
Connection dbConn = null;
Statement stmt = null;
try {
- dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED,
connPoolCompaction);
stmt = dbConn.createStatement();
String s;
@@ -814,7 +829,7 @@ class CompactionTxnHandler extends TxnHandler {
try {
//Aborted and committed are terminal states, so nothing about the txn
can change
//after that, so READ COMMITTED is sufficient.
- dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED,
connPoolCompaction);
stmt = dbConn.createStatement();
/*
* Only delete aborted / committed transaction in a way that
guarantees two things:
@@ -885,7 +900,7 @@ class CompactionTxnHandler extends TxnHandler {
Connection dbConn = null;
Statement stmt = null;
try {
- dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED,
connPoolCompaction);
stmt = dbConn.createStatement();
String s = "UPDATE \"COMPACTION_QUEUE\" SET \"CQ_WORKER_ID\" = NULL,
\"CQ_START\" = NULL, \"CQ_STATE\" = '"
+ INITIATED_STATE+ "' WHERE \"CQ_STATE\" = '" + WORKING_STATE + "'
AND \"CQ_WORKER_ID\" LIKE '"
@@ -931,7 +946,7 @@ class CompactionTxnHandler extends TxnHandler {
Connection dbConn = null;
Statement stmt = null;
try {
- dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED,
connPoolCompaction);
long latestValidStart = getDbTime(dbConn) - timeout;
stmt = dbConn.createStatement();
String s = "UPDATE \"COMPACTION_QUEUE\" SET \"CQ_WORKER_ID\" = NULL,
\"CQ_START\" = NULL, \"CQ_STATE\" = '"
@@ -979,7 +994,7 @@ class CompactionTxnHandler extends TxnHandler {
ResultSet rs = null;
try {
try {
- dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED,
connPoolCompaction);
String quote = getIdentifierQuoteString(dbConn);
StringBuilder bldr = new StringBuilder();
bldr.append("SELECT
").append(quote).append("COLUMN_NAME").append(quote)
@@ -1035,7 +1050,7 @@ class CompactionTxnHandler extends TxnHandler {
Statement stmt = null;
try {
try {
- dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED,
connPoolCompaction);
stmt = dbConn.createStatement();
String sqlText = "UPDATE \"COMPACTION_QUEUE\" SET
\"CQ_HIGHEST_WRITE_ID\" = " +
ci.highestWriteId + ", \"CQ_RUN_AS\" = " + quoteString(ci.runAs) +
", \"CQ_TXN_ID\" = " + compactionTxnId +
@@ -1175,7 +1190,7 @@ class CompactionTxnHandler extends TxnHandler {
int refusedRetention = MetastoreConf.getIntVar(conf,
ConfVars.COMPACTOR_HISTORY_RETENTION_REFUSED);
try {
try {
- dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED,
connPoolCompaction);
stmt = dbConn.createStatement();
/* cc_id is monotonically increasing so for any entity sorts in order
of compaction history,
thus this query groups by entity and withing group sorts most recent
first */
@@ -1281,7 +1296,7 @@ class CompactionTxnHandler extends TxnHandler {
ResultSet rs = null;
try {
try {
- dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED,
connPoolCompaction);
pStmt = dbConn.prepareStatement("SELECT \"CC_STATE\",
\"CC_ENQUEUE_TIME\" FROM \"COMPLETED_COMPACTIONS\" WHERE " +
"\"CC_DATABASE\" = ? AND " +
"\"CC_TABLE\" = ? " +
@@ -1342,7 +1357,7 @@ class CompactionTxnHandler extends TxnHandler {
PreparedStatement pStmt = null;
ResultSet rs = null;
try {
- dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED,
connPoolCompaction);
stmt = dbConn.createStatement();
pStmt = dbConn.prepareStatement("SELECT \"CQ_ID\", \"CQ_DATABASE\",
\"CQ_TABLE\", \"CQ_PARTITION\", "
+ "\"CQ_STATE\", \"CQ_TYPE\", \"CQ_TBLPROPERTIES\",
\"CQ_WORKER_ID\", \"CQ_START\", \"CQ_RUN_AS\", "
@@ -1440,7 +1455,7 @@ class CompactionTxnHandler extends TxnHandler {
@RetrySemantics.CannotRetry
public void setCleanerRetryRetentionTimeOnError(CompactionInfo info) throws
MetaException {
try {
- try (Connection dbConn =
getDbConn(Connection.TRANSACTION_READ_COMMITTED)) {
+ try (Connection dbConn =
getDbConn(Connection.TRANSACTION_READ_COMMITTED, connPoolCompaction)) {
try (PreparedStatement stmt = dbConn.prepareStatement("UPDATE
\"COMPACTION_QUEUE\" " +
"SET \"CQ_RETRY_RETENTION\" = ?, \"CQ_ERROR_MESSAGE\"= ? WHERE
\"CQ_ID\" = ?")) {
stmt.setLong(1, info.retryRetention);
@@ -1479,7 +1494,7 @@ class CompactionTxnHandler extends TxnHandler {
Connection dbConn = null;
Statement stmt = null;
try {
- dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED,
connPoolCompaction);
stmt = dbConn.createStatement();
String s = "UPDATE \"COMPACTION_QUEUE\" SET \"CQ_HADOOP_JOB_ID\" = " +
quoteString(hadoopJobId)
+ " WHERE \"CQ_ID\" = " + id;
@@ -1508,7 +1523,7 @@ class CompactionTxnHandler extends TxnHandler {
Connection dbConn = null;
try {
try {
- dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED,
connPoolCompaction);
return getMinOpenTxnIdWaterMark(dbConn);
} catch (SQLException e) {
LOG.error("Unable to getMinOpenTxnIdForCleaner", e);
@@ -1540,7 +1555,7 @@ class CompactionTxnHandler extends TxnHandler {
Connection dbConn = null;
try {
try {
- dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED,
connPoolCompaction);
long minOpenTxn;
try (Statement stmt = dbConn.createStatement()) {
try (ResultSet rs = stmt.executeQuery("SELECT
MIN(\"MHL_MIN_OPEN_TXNID\") FROM \"MIN_HISTORY_LEVEL\"")) {
@@ -1602,7 +1617,7 @@ class CompactionTxnHandler extends TxnHandler {
Connection dbConn = null;
try {
try {
- dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED,
connPoolCompaction);
return getCompactionByTxnId(dbConn, txnId);
} catch (SQLException e) {
LOG.error("Unable to getCompactionByTxnId", e);
@@ -1638,7 +1653,7 @@ class CompactionTxnHandler extends TxnHandler {
Connection dbConn = null;
try {
try {
- dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED,
connPoolCompaction);
boolean updateRes;
CompactionMetricsData prevMetricsData = getCompactionMetricsData(data,
dbConn);
if (data.getMetricValue() >= data.getThreshold()) {
@@ -1676,7 +1691,7 @@ class CompactionTxnHandler extends TxnHandler {
List<CompactionMetricsData> metricsDataList = new ArrayList<>();
try {
try {
- dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED,
connPoolCompaction);
for (CompactionMetricsData.MetricType type :
CompactionMetricsData.MetricType.values()) {
String query = sqlGenerator.addLimitClause(limit,
NO_SELECT_COMPACTION_METRICS_CACHE_FOR_TYPE_QUERY);
try (PreparedStatement pstmt = dbConn.prepareStatement(query)) {
@@ -1715,7 +1730,7 @@ class CompactionTxnHandler extends TxnHandler {
Connection dbConn = null;
try {
try {
- dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED,
connPoolCompaction);
return getCompactionMetricsData(new
CompactionMetricsData.Builder().dbName(dbName).tblName(tblName)
.partitionName(partitionName).metricType(type).build(), dbConn);
} catch (SQLException e) {
@@ -1764,7 +1779,7 @@ class CompactionTxnHandler extends TxnHandler {
Connection dbConn = null;
try {
try {
- dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED,
connPoolCompaction);
removeCompactionMetricsData(dbConn, dbName, tblName, partitionName,
type);
} catch (SQLException e) {
rollbackDBConn(dbConn);
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index b190b4bdf9..a94579fced 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -373,10 +373,9 @@ abstract class TxnHandler implements TxnStore,
TxnStore.MutexAPI {
this.conf = conf;
int maxPoolSize = MetastoreConf.getIntVar(conf,
ConfVars.CONNECTION_POOLING_MAX_CONNECTIONS);
- long getConnectionTimeoutMs = 30000;
synchronized (TxnHandler.class) {
if (connPool == null) {
- connPool = setupJdbcConnectionPool(conf, maxPoolSize,
getConnectionTimeoutMs);
+ connPool = setupJdbcConnectionPool(conf, maxPoolSize);
}
if (connPoolMutex == null) {
@@ -387,7 +386,7 @@ abstract class TxnHandler implements TxnStore,
TxnStore.MutexAPI {
connection from connPool first, then connPoolMutex. All others, go
in the opposite
order (not very elegant...). So number of connection requests for
connPoolMutex cannot
exceed (size of connPool + MUTEX_KEY.values().length - 1).*/
- connPoolMutex = setupJdbcConnectionPool(conf, maxPoolSize +
MUTEX_KEY.values().length, getConnectionTimeoutMs);
+ connPoolMutex = setupJdbcConnectionPool(conf, maxPoolSize +
MUTEX_KEY.values().length);
}
if (dbProduct == null) {
@@ -4676,7 +4675,7 @@ abstract class TxnHandler implements TxnStore,
TxnStore.MutexAPI {
return getDbConn(isolationLevel, connPool);
}
- private Connection getDbConn(int isolationLevel, DataSource connPool) throws
SQLException {
+ protected Connection getDbConn(int isolationLevel, DataSource connPool)
throws SQLException {
Connection dbConn = null;
try {
dbConn = connPool.getConnection();
@@ -5836,11 +5835,11 @@ abstract class TxnHandler implements TxnStore,
TxnStore.MutexAPI {
}
}
- private synchronized static DataSource setupJdbcConnectionPool(Configuration
conf, int maxPoolSize, long getConnectionTimeoutMs) {
+ protected synchronized static DataSource
setupJdbcConnectionPool(Configuration conf, int maxPoolSize) {
DataSourceProvider dsp =
DataSourceProviderFactory.tryGetDataSourceProviderOrNull(conf);
if (dsp != null) {
try {
- return dsp.create(conf);
+ return dsp.create(conf, maxPoolSize);
} catch (SQLException e) {
LOG.error("Unable to instantiate JDBC connection pooling", e);
throw new RuntimeException(e);