This is an automated email from the ASF dual-hosted git repository.
dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 4b01a607091 HIVE-27958: Refactor DirectSqlUpdatePart class (Wechar Yu,
reviewed by Attila Turoczy, Denys Kuzmenko)
4b01a607091 is described below
commit 4b01a607091581ac9bdb372f8b47c1efca4d4bb4
Author: Wechar Yu <[email protected]>
AuthorDate: Tue Feb 6 17:15:18 2024 +0800
HIVE-27958: Refactor DirectSqlUpdatePart class (Wechar Yu, reviewed by
Attila Turoczy, Denys Kuzmenko)
Closes #5003
---
.../hadoop/hive/metastore/DatabaseProduct.java | 23 +++
.../hadoop/hive/metastore/DirectSqlUpdatePart.java | 192 +++++++--------------
.../hive/metastore/txn/retry/SqlRetryHandler.java | 27 +--
3 files changed, 87 insertions(+), 155 deletions(-)
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DatabaseProduct.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DatabaseProduct.java
index 642057bd69a..b2b20503d24 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DatabaseProduct.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DatabaseProduct.java
@@ -27,6 +27,7 @@ import java.util.EnumMap;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Stream;
import org.apache.commons.lang3.exception.ExceptionUtils;
@@ -57,6 +58,11 @@ public class DatabaseProduct implements Configurable {
DeadlineException.class
};
+ /**
+ * Derby specific concurrency control
+ */
+ private static final ReentrantLock derbyLock = new ReentrantLock(true);
+
public enum DbType {DERBY, MYSQL, POSTGRES, ORACLE, SQLSERVER, CUSTOM,
UNDEFINED};
public DbType dbType;
@@ -776,4 +782,21 @@ public class DatabaseProduct implements Configurable {
public void setConf(Configuration c) {
myConf = c;
}
+
+ /**
+ * lockInternal() and {@link #unlockInternal()} are used to serialize those
operations that require
+ * Select ... For Update to sequence operations properly. In practice that
means when running
+ * with Derby database. See more notes at class level.
+ */
+ public void lockInternal() {
+ if (isDERBY()) {
+ derbyLock.lock();
+ }
+ }
+
+ public void unlockInternal() {
+ if (isDERBY()) {
+ derbyLock.unlock();
+ }
+ }
}
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DirectSqlUpdatePart.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DirectSqlUpdatePart.java
index 67c293ee64f..441ce26ac6d 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DirectSqlUpdatePart.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DirectSqlUpdatePart.java
@@ -67,7 +67,6 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import static
org.apache.hadoop.hive.common.StatsSetupConst.COLUMN_STATS_ACCURATE;
@@ -92,8 +91,6 @@ class DirectSqlUpdatePart {
private final int maxBatchSize;
private final SQLGenerator sqlGenerator;
- private static final ReentrantLock derbyLock = new ReentrantLock(true);
-
public DirectSqlUpdatePart(PersistenceManager pm, Configuration conf,
DatabaseProduct dbType, int batchSize) {
this.pm = pm;
@@ -103,23 +100,6 @@ class DirectSqlUpdatePart {
sqlGenerator = new SQLGenerator(dbType, conf);
}
- /**
- * {@link #lockInternal()} and {@link #unlockInternal()} are used to
serialize those operations that require
- * Select ... For Update to sequence operations properly. In practice that
means when running
- * with Derby database. See more notes at class level.
- */
- private void lockInternal() {
- if(dbType.isDERBY()) {
- derbyLock.lock();
- }
- }
-
- private void unlockInternal() {
- if(dbType.isDERBY()) {
- derbyLock.unlock();
- }
- }
-
void rollbackDBConn(Connection dbConn) {
try {
if (dbConn != null && !dbConn.isClosed()) dbConn.rollback();
@@ -138,43 +118,16 @@ class DirectSqlUpdatePart {
}
}
- void closeStmt(Statement stmt) {
- try {
- if (stmt != null && !stmt.isClosed()) stmt.close();
- } catch (SQLException e) {
- LOG.warn("Failed to close statement ", e);
- }
- }
-
- void close(ResultSet rs) {
- try {
- if (rs != null && !rs.isClosed()) {
- rs.close();
- }
- }
- catch(SQLException ex) {
- LOG.warn("Failed to close statement ", ex);
- }
- }
-
static String quoteString(String input) {
return "'" + input + "'";
}
- void close(ResultSet rs, Statement stmt, JDOConnection dbConn) {
- close(rs);
- closeStmt(stmt);
- closeDbConn(dbConn);
- }
-
private void populateInsertUpdateMap(Map<PartitionInfo, ColumnStatistics>
statsPartInfoMap,
Map<PartColNameInfo,
MPartitionColumnStatistics> updateMap,
Map<PartColNameInfo,
MPartitionColumnStatistics>insertMap,
Connection dbConn, Table tbl) throws
SQLException, MetaException, NoSuchObjectException {
StringBuilder prefix = new StringBuilder();
StringBuilder suffix = new StringBuilder();
- Statement statement = null;
- ResultSet rs = null;
List<String> queries = new ArrayList<>();
Set<PartColNameInfo> selectedParts = new HashSet<>();
@@ -186,16 +139,14 @@ class DirectSqlUpdatePart {
TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix,
partIdList, "\"PART_ID\"", true, false);
- for (String query : queries) {
- try {
- statement = dbConn.createStatement();
- LOG.debug("Going to execute query " + query);
- rs = statement.executeQuery(query);
- while (rs.next()) {
- selectedParts.add(new PartColNameInfo(rs.getLong(1),
rs.getString(2), rs.getString(3)));
+ try (Statement statement = dbConn.createStatement()) {
+ for (String query : queries) {
+ LOG.debug("Execute query: " + query);
+ try (ResultSet rs = statement.executeQuery(query)) {
+ while (rs.next()) {
+ selectedParts.add(new PartColNameInfo(rs.getLong(1),
rs.getString(2), rs.getString(3)));
+ }
}
- } finally {
- close(rs, statement, null);
}
}
@@ -246,13 +197,13 @@ class DirectSqlUpdatePart {
partIds.add(partColNameInfo.partitionId);
pst.addBatch();
if (partIds.size() == maxBatchSize) {
- LOG.debug("Going to execute updates on part: {}", partIds);
+ LOG.debug("Execute updates on part: {}", partIds);
verifyUpdates(pst.executeBatch(), partIds);
partIds = new ArrayList<>();
}
}
if (!partIds.isEmpty()) {
- LOG.debug("Going to execute updates on part: {}", partIds);
+ LOG.debug("Execute updates on part: {}", partIds);
verifyUpdates(pst.executeBatch(), partIds);
}
}
@@ -270,7 +221,6 @@ class DirectSqlUpdatePart {
private void insertIntoPartColStatTable(Map<PartColNameInfo,
MPartitionColumnStatistics> insertMap,
long maxCsId,
Connection dbConn) throws
SQLException, MetaException, NoSuchObjectException {
- PreparedStatement preparedStatement = null;
int numRows = 0;
String insert = "INSERT INTO \"PART_COL_STATS\" (\"CS_ID\", \"CAT_NAME\",
\"DB_NAME\","
+ "\"TABLE_NAME\", \"PARTITION_NAME\", \"COLUMN_NAME\",
\"COLUMN_TYPE\", \"PART_ID\","
@@ -279,8 +229,7 @@ class DirectSqlUpdatePart {
+ " \"HISTOGRAM\", \"AVG_COL_LEN\", \"MAX_COL_LEN\",
\"NUM_TRUES\", \"NUM_FALSES\", \"LAST_ANALYZED\", \"ENGINE\") values "
+ "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?)";
- try {
- preparedStatement = dbConn.prepareStatement(insert);
+ try (PreparedStatement preparedStatement =
dbConn.prepareStatement(insert)) {
for (Map.Entry entry : insertMap.entrySet()) {
PartColNameInfo partColNameInfo = (PartColNameInfo) entry.getKey();
Long partId = partColNameInfo.partitionId;
@@ -323,8 +272,6 @@ class DirectSqlUpdatePart {
if (numRows != 0) {
preparedStatement.executeBatch();
}
- } finally {
- closeStmt(preparedStatement);
}
}
@@ -332,8 +279,6 @@ class DirectSqlUpdatePart {
List<String> queries = new ArrayList<>();
StringBuilder prefix = new StringBuilder();
StringBuilder suffix = new StringBuilder();
- Statement statement = null;
- ResultSet rs = null;
prefix.append("select \"PART_ID\", \"PARAM_VALUE\" "
+ " from \"PARTITION_PARAMS\" where "
@@ -343,18 +288,17 @@ class DirectSqlUpdatePart {
partIdList, "\"PART_ID\"", true, false);
Map<Long, String> partIdToParaMap = new HashMap<>();
- for (String query : queries) {
- try {
- statement = dbConn.createStatement();
- LOG.debug("Going to execute query " + query);
- rs = statement.executeQuery(query);
- while (rs.next()) {
- partIdToParaMap.put(rs.getLong(1), rs.getString(2));
+ try (Statement statement = dbConn.createStatement()) {
+ for (String query : queries) {
+ LOG.debug("Execute query: " + query);
+ try (ResultSet rs = statement.executeQuery(query)) {
+ while (rs.next()) {
+ partIdToParaMap.put(rs.getLong(1), rs.getString(2));
+ }
}
- } finally {
- close(rs, statement, null);
}
}
+
return partIdToParaMap;
}
@@ -367,14 +311,10 @@ class DirectSqlUpdatePart {
TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix,
partIdList, "\"PART_ID\"", false, false);
- Statement statement = null;
- for (String query : queries) {
- try {
- statement = dbConn.createStatement();
- LOG.debug("Going to execute update " + query);
+ try (Statement statement = dbConn.createStatement()) {
+ for (String query : queries) {
+ LOG.debug("Execute update: " + query);
statement.executeUpdate(query);
- } finally {
- closeStmt(statement);
}
}
}
@@ -503,8 +443,6 @@ class DirectSqlUpdatePart {
List<String> queries = new ArrayList<>();
StringBuilder prefix = new StringBuilder();
StringBuilder suffix = new StringBuilder();
- Statement statement = null;
- ResultSet rs = null;
Map<PartitionInfo, ColumnStatistics> partitionInfoMap = new HashMap<>();
List<String> partKeys = partColStatsMap.keySet().stream().map(
@@ -516,20 +454,18 @@ class DirectSqlUpdatePart {
TxnUtils.buildQueryWithINClauseStrings(conf, queries, prefix, suffix,
partKeys, "\"PART_NAME\"", true, false);
- for (String query : queries) {
- // Select for update makes sure that the partitions are not modified
while the stats are getting updated.
- query = sqlGenerator.addForUpdateClause(query);
- try {
- statement = dbConn.createStatement();
- LOG.debug("Going to execute query <" + query + ">");
- rs = statement.executeQuery(query);
- while (rs.next()) {
- PartitionInfo partitionInfo = new PartitionInfo(rs.getLong(1),
- rs.getLong(2), rs.getString(3));
- partitionInfoMap.put(partitionInfo,
partColStatsMap.get(rs.getString(3)));
+ try (Statement statement = dbConn.createStatement()) {
+ for (String query : queries) {
+ // Select for update makes sure that the partitions are not modified
while the stats are getting updated.
+ query = sqlGenerator.addForUpdateClause(query);
+ LOG.debug("Execute query: " + query);
+ try (ResultSet rs = statement.executeQuery(query)) {
+ while (rs.next()) {
+ PartitionInfo partitionInfo = new PartitionInfo(rs.getLong(1),
+ rs.getLong(2), rs.getString(3));
+ partitionInfoMap.put(partitionInfo,
partColStatsMap.get(rs.getString(3)));
+ }
}
- } finally {
- close(rs, statement, null);
}
}
return partitionInfoMap;
@@ -556,7 +492,7 @@ class DirectSqlUpdatePart {
Connection dbConn = null;
boolean committed = false;
try {
- lockInternal();
+ dbType.lockInternal();
jdoConn = pm.getDataStoreConnection();
dbConn = (Connection) (jdoConn.getNativeConnection());
@@ -606,7 +542,7 @@ class DirectSqlUpdatePart {
rollbackDBConn(dbConn);
}
closeDbConn(jdoConn);
- unlockInternal();
+ dbType.unlockInternal();
}
}
@@ -615,15 +551,13 @@ class DirectSqlUpdatePart {
* @return The CD id before update.
*/
public long getNextCSIdForMPartitionColumnStatistics(long numStats) throws
MetaException {
- Statement statement = null;
- ResultSet rs = null;
long maxCsId = 0;
boolean committed = false;
Connection dbConn = null;
JDOConnection jdoConn = null;
try {
- lockInternal();
+ dbType.lockInternal();
jdoConn = pm.getDataStoreConnection();
dbConn = (Connection) (jdoConn.getNativeConnection());
@@ -637,43 +571,41 @@ class DirectSqlUpdatePart {
String query = sqlGenerator.addForUpdateClause("SELECT \"NEXT_VAL\"
FROM \"SEQUENCE_TABLE\" "
+ "WHERE \"SEQUENCE_NAME\"= "
+
quoteString("org.apache.hadoop.hive.metastore.model.MPartitionColumnStatistics"));
- LOG.debug("Going to execute query " + query);
- statement = dbConn.createStatement();
- rs = statement.executeQuery(query);
- if (rs.next()) {
- maxCsId = rs.getLong(1);
- } else if (insertDone) {
- throw new MetaException("Invalid state of SEQUENCE_TABLE for
MPartitionColumnStatistics");
- } else {
- insertDone = true;
- closeStmt(statement);
- statement = dbConn.createStatement();
- query = "INSERT INTO \"SEQUENCE_TABLE\" (\"SEQUENCE_NAME\",
\"NEXT_VAL\") VALUES ( "
- +
quoteString("org.apache.hadoop.hive.metastore.model.MPartitionColumnStatistics")
+ "," + 1
- + ")";
- try {
- statement.executeUpdate(query);
- } catch (SQLException e) {
- // If the record is already inserted by some other thread continue
to select.
- if (dbType.isDuplicateKeyError(e)) {
- continue;
+ LOG.debug("Execute query: " + query);
+ try (Statement statement = dbConn.createStatement();
+ ResultSet rs = statement.executeQuery(query)) {
+ if (rs.next()) {
+ maxCsId = rs.getLong(1);
+ } else if (insertDone) {
+ throw new MetaException("Invalid state of SEQUENCE_TABLE for
MPartitionColumnStatistics");
+ } else {
+ insertDone = true;
+ query = "INSERT INTO \"SEQUENCE_TABLE\" (\"SEQUENCE_NAME\",
\"NEXT_VAL\") VALUES ( "
+ +
quoteString("org.apache.hadoop.hive.metastore.model.MPartitionColumnStatistics")
+ "," + 1
+ + ")";
+ try {
+ statement.executeUpdate(query);
+ } catch (SQLException e) {
+ // If the record is already inserted by some other thread
continue to select.
+ if (dbType.isDuplicateKeyError(e)) {
+ continue;
+ }
+ LOG.error("Unable to insert into SEQUENCE_TABLE for
MPartitionColumnStatistics.", e);
+ throw e;
}
- LOG.error("Unable to insert into SEQUENCE_TABLE for
MPartitionColumnStatistics.", e);
- throw e;
- } finally {
- closeStmt(statement);
}
}
}
long nextMaxCsId = maxCsId + numStats + 1;
- closeStmt(statement);
- statement = dbConn.createStatement();
String query = "UPDATE \"SEQUENCE_TABLE\" SET \"NEXT_VAL\" = "
+ nextMaxCsId
+ " WHERE \"SEQUENCE_NAME\" = "
+
quoteString("org.apache.hadoop.hive.metastore.model.MPartitionColumnStatistics");
- statement.executeUpdate(query);
+
+ try (Statement statement = dbConn.createStatement()) {
+ statement.executeUpdate(query);
+ }
dbConn.commit();
committed = true;
return maxCsId;
@@ -685,8 +617,8 @@ class DirectSqlUpdatePart {
if (!committed) {
rollbackDBConn(dbConn);
}
- close(rs, statement, jdoConn);
- unlockInternal();
+ closeDbConn(jdoConn);
+ dbType.unlockInternal();
}
}
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/retry/SqlRetryHandler.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/retry/SqlRetryHandler.java
index 0b7127b6826..727ba0b1bf8 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/retry/SqlRetryHandler.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/retry/SqlRetryHandler.java
@@ -33,7 +33,6 @@ import org.springframework.jdbc.UncategorizedSQLException;
import java.sql.SQLException;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
/**
@@ -48,11 +47,6 @@ public class SqlRetryHandler {
private final StackThreadLocal<Object> threadLocal = new
StackThreadLocal<>();
- /**
- * Derby specific concurrency control
- */
- private static final ReentrantLock derbyLock = new ReentrantLock(true);
-
private final DatabaseProduct databaseProduct;
private final long deadlockRetryInterval;
private final long retryInterval;
@@ -122,14 +116,14 @@ public class SqlRetryHandler {
try {
if (properties.isLockInternally()) {
- lockInternal();
+ databaseProduct.lockInternal();
}
threadLocal.set(new Object());
return executeWithRetryInternal(properties, function);
} finally {
threadLocal.unset();
if (properties.isLockInternally()) {
- unlockInternal();
+ databaseProduct.unlockInternal();
}
}
}
@@ -269,21 +263,4 @@ public class SqlRetryHandler {
}
return false;
}
-
- /**
- * lockInternal() and {@link #unlockInternal()} are used to serialize those
operations that require
- * Select ... For Update to sequence operations properly. In practice that
means when running
- * with Derby database. See more notes at class level.
- */
- private void lockInternal() {
- if(databaseProduct.isDERBY()) {
- derbyLock.lock();
- }
- }
- private void unlockInternal() {
- if(databaseProduct.isDERBY()) {
- derbyLock.unlock();
- }
- }
-
}