This is an automated email from the ASF dual-hosted git repository.

dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b01a607091 HIVE-27958: Refactor DirectSqlUpdatePart class (Wechar Yu, 
reviewed by Attila Turoczy, Denys Kuzmenko)
4b01a607091 is described below

commit 4b01a607091581ac9bdb372f8b47c1efca4d4bb4
Author: Wechar Yu <yuwq1...@gmail.com>
AuthorDate: Tue Feb 6 17:15:18 2024 +0800

    HIVE-27958: Refactor DirectSqlUpdatePart class (Wechar Yu, reviewed by 
Attila Turoczy, Denys Kuzmenko)
    
    Closes #5003
---
 .../hadoop/hive/metastore/DatabaseProduct.java     |  23 +++
 .../hadoop/hive/metastore/DirectSqlUpdatePart.java | 192 +++++++--------------
 .../hive/metastore/txn/retry/SqlRetryHandler.java  |  27 +--
 3 files changed, 87 insertions(+), 155 deletions(-)

diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DatabaseProduct.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DatabaseProduct.java
index 642057bd69a..b2b20503d24 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DatabaseProduct.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DatabaseProduct.java
@@ -27,6 +27,7 @@ import java.util.EnumMap;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Stream;
 
 import org.apache.commons.lang3.exception.ExceptionUtils;
@@ -57,6 +58,11 @@ public class DatabaseProduct implements Configurable {
           DeadlineException.class
   };
 
+  /**
+   * Derby specific concurrency control
+   */
+  private static final ReentrantLock derbyLock = new ReentrantLock(true);
+
   public enum DbType {DERBY, MYSQL, POSTGRES, ORACLE, SQLSERVER, CUSTOM, 
UNDEFINED};
   public DbType dbType;
 
@@ -776,4 +782,21 @@ public class DatabaseProduct implements Configurable {
   public void setConf(Configuration c) {
     myConf = c;
   }
+
+  /**
+   * lockInternal() and {@link #unlockInternal()} are used to serialize those 
operations that require
+   * Select ... For Update to sequence operations properly.  In practice that 
means when running
+   * with Derby database.  See more notes at class level.
+   */
+  public void lockInternal() {
+    if (isDERBY()) {
+      derbyLock.lock();
+    }
+  }
+
+  public void unlockInternal() {
+    if (isDERBY()) {
+      derbyLock.unlock();
+    }
+  }
 }
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DirectSqlUpdatePart.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DirectSqlUpdatePart.java
index 67c293ee64f..441ce26ac6d 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DirectSqlUpdatePart.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DirectSqlUpdatePart.java
@@ -67,7 +67,6 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 
 import static 
org.apache.hadoop.hive.common.StatsSetupConst.COLUMN_STATS_ACCURATE;
@@ -92,8 +91,6 @@ class DirectSqlUpdatePart {
   private final int maxBatchSize;
   private final SQLGenerator sqlGenerator;
 
-  private static final ReentrantLock derbyLock = new ReentrantLock(true);
-
   public DirectSqlUpdatePart(PersistenceManager pm, Configuration conf,
                              DatabaseProduct dbType, int batchSize) {
     this.pm = pm;
@@ -103,23 +100,6 @@ class DirectSqlUpdatePart {
     sqlGenerator = new SQLGenerator(dbType, conf);
   }
 
-  /**
-   * {@link #lockInternal()} and {@link #unlockInternal()} are used to 
serialize those operations that require
-   * Select ... For Update to sequence operations properly.  In practice that 
means when running
-   * with Derby database.  See more notes at class level.
-   */
-  private void lockInternal() {
-    if(dbType.isDERBY()) {
-      derbyLock.lock();
-    }
-  }
-
-  private void unlockInternal() {
-    if(dbType.isDERBY()) {
-      derbyLock.unlock();
-    }
-  }
-
   void rollbackDBConn(Connection dbConn) {
     try {
       if (dbConn != null && !dbConn.isClosed()) dbConn.rollback();
@@ -138,43 +118,16 @@ class DirectSqlUpdatePart {
     }
   }
 
-  void closeStmt(Statement stmt) {
-    try {
-      if (stmt != null && !stmt.isClosed()) stmt.close();
-    } catch (SQLException e) {
-      LOG.warn("Failed to close statement ", e);
-    }
-  }
-
-  void close(ResultSet rs) {
-    try {
-      if (rs != null && !rs.isClosed()) {
-        rs.close();
-      }
-    }
-    catch(SQLException ex) {
-      LOG.warn("Failed to close statement ", ex);
-    }
-  }
-
   static String quoteString(String input) {
     return "'" + input + "'";
   }
 
-  void close(ResultSet rs, Statement stmt, JDOConnection dbConn) {
-    close(rs);
-    closeStmt(stmt);
-    closeDbConn(dbConn);
-  }
-
   private void populateInsertUpdateMap(Map<PartitionInfo, ColumnStatistics> 
statsPartInfoMap,
                                        Map<PartColNameInfo, 
MPartitionColumnStatistics> updateMap,
                                        Map<PartColNameInfo, 
MPartitionColumnStatistics>insertMap,
                                        Connection dbConn, Table tbl) throws 
SQLException, MetaException, NoSuchObjectException {
     StringBuilder prefix = new StringBuilder();
     StringBuilder suffix = new StringBuilder();
-    Statement statement = null;
-    ResultSet rs = null;
     List<String> queries = new ArrayList<>();
     Set<PartColNameInfo> selectedParts = new HashSet<>();
 
@@ -186,16 +139,14 @@ class DirectSqlUpdatePart {
     TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix,
             partIdList, "\"PART_ID\"", true, false);
 
-    for (String query : queries) {
-      try {
-        statement = dbConn.createStatement();
-        LOG.debug("Going to execute query " + query);
-        rs = statement.executeQuery(query);
-        while (rs.next()) {
-          selectedParts.add(new PartColNameInfo(rs.getLong(1), 
rs.getString(2), rs.getString(3)));
+    try (Statement statement = dbConn.createStatement()) {
+      for (String query : queries) {
+        LOG.debug("Execute query: " + query);
+        try (ResultSet rs = statement.executeQuery(query)) {
+          while (rs.next()) {
+            selectedParts.add(new PartColNameInfo(rs.getLong(1), 
rs.getString(2), rs.getString(3)));
+          }
         }
-      } finally {
-        close(rs, statement, null);
       }
     }
 
@@ -246,13 +197,13 @@ class DirectSqlUpdatePart {
           partIds.add(partColNameInfo.partitionId);
           pst.addBatch();
           if (partIds.size() == maxBatchSize) {
-            LOG.debug("Going to execute updates on part: {}", partIds);
+            LOG.debug("Execute updates on part: {}", partIds);
             verifyUpdates(pst.executeBatch(), partIds);
             partIds = new ArrayList<>();
           }
         }
         if (!partIds.isEmpty()) {
-          LOG.debug("Going to execute updates on part: {}", partIds);
+          LOG.debug("Execute updates on part: {}", partIds);
           verifyUpdates(pst.executeBatch(), partIds);
         }
       }
@@ -270,7 +221,6 @@ class DirectSqlUpdatePart {
   private void insertIntoPartColStatTable(Map<PartColNameInfo, 
MPartitionColumnStatistics> insertMap,
                                           long maxCsId,
                                           Connection dbConn) throws 
SQLException, MetaException, NoSuchObjectException {
-    PreparedStatement preparedStatement = null;
     int numRows = 0;
     String insert = "INSERT INTO \"PART_COL_STATS\" (\"CS_ID\", \"CAT_NAME\", 
\"DB_NAME\","
             + "\"TABLE_NAME\", \"PARTITION_NAME\", \"COLUMN_NAME\", 
\"COLUMN_TYPE\", \"PART_ID\","
@@ -279,8 +229,7 @@ class DirectSqlUpdatePart {
             + " \"HISTOGRAM\", \"AVG_COL_LEN\", \"MAX_COL_LEN\", 
\"NUM_TRUES\", \"NUM_FALSES\", \"LAST_ANALYZED\", \"ENGINE\") values "
             + "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 
?, ?, ?)";
 
-    try {
-      preparedStatement = dbConn.prepareStatement(insert);
+    try (PreparedStatement preparedStatement = 
dbConn.prepareStatement(insert)) {
       for (Map.Entry entry : insertMap.entrySet()) {
         PartColNameInfo partColNameInfo = (PartColNameInfo) entry.getKey();
         Long partId = partColNameInfo.partitionId;
@@ -323,8 +272,6 @@ class DirectSqlUpdatePart {
       if (numRows != 0) {
         preparedStatement.executeBatch();
       }
-    } finally {
-      closeStmt(preparedStatement);
     }
   }
 
@@ -332,8 +279,6 @@ class DirectSqlUpdatePart {
     List<String> queries = new ArrayList<>();
     StringBuilder prefix = new StringBuilder();
     StringBuilder suffix = new StringBuilder();
-    Statement statement = null;
-    ResultSet rs = null;
 
     prefix.append("select \"PART_ID\", \"PARAM_VALUE\" "
             + " from \"PARTITION_PARAMS\" where "
@@ -343,18 +288,17 @@ class DirectSqlUpdatePart {
             partIdList, "\"PART_ID\"", true, false);
 
     Map<Long, String> partIdToParaMap = new HashMap<>();
-    for (String query : queries) {
-      try {
-        statement = dbConn.createStatement();
-        LOG.debug("Going to execute query " + query);
-        rs = statement.executeQuery(query);
-        while (rs.next()) {
-          partIdToParaMap.put(rs.getLong(1), rs.getString(2));
+    try (Statement statement = dbConn.createStatement()) {
+      for (String query : queries) {
+        LOG.debug("Execute query: " + query);
+        try (ResultSet rs = statement.executeQuery(query)) {
+          while (rs.next()) {
+            partIdToParaMap.put(rs.getLong(1), rs.getString(2));
+          }
         }
-      } finally {
-        close(rs, statement, null);
       }
     }
+
     return partIdToParaMap;
   }
 
@@ -367,14 +311,10 @@ class DirectSqlUpdatePart {
     TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix,
             partIdList, "\"PART_ID\"", false, false);
 
-    Statement statement = null;
-    for (String query : queries) {
-      try {
-        statement = dbConn.createStatement();
-        LOG.debug("Going to execute update " + query);
+    try (Statement statement = dbConn.createStatement()) {
+      for (String query : queries) {
+        LOG.debug("Execute update: " + query);
         statement.executeUpdate(query);
-      } finally {
-        closeStmt(statement);
       }
     }
   }
@@ -503,8 +443,6 @@ class DirectSqlUpdatePart {
     List<String> queries = new ArrayList<>();
     StringBuilder prefix = new StringBuilder();
     StringBuilder suffix = new StringBuilder();
-    Statement statement = null;
-    ResultSet rs = null;
     Map<PartitionInfo, ColumnStatistics> partitionInfoMap = new HashMap<>();
 
     List<String> partKeys = partColStatsMap.keySet().stream().map(
@@ -516,20 +454,18 @@ class DirectSqlUpdatePart {
     TxnUtils.buildQueryWithINClauseStrings(conf, queries, prefix, suffix,
             partKeys, "\"PART_NAME\"", true, false);
 
-    for (String query : queries) {
-      // Select for update makes sure that the partitions are not modified 
while the stats are getting updated.
-      query = sqlGenerator.addForUpdateClause(query);
-      try {
-        statement = dbConn.createStatement();
-        LOG.debug("Going to execute query <" + query + ">");
-        rs = statement.executeQuery(query);
-        while (rs.next()) {
-          PartitionInfo partitionInfo = new PartitionInfo(rs.getLong(1),
-                  rs.getLong(2), rs.getString(3));
-          partitionInfoMap.put(partitionInfo, 
partColStatsMap.get(rs.getString(3)));
+    try (Statement statement = dbConn.createStatement()) {
+      for (String query : queries) {
+        // Select for update makes sure that the partitions are not modified 
while the stats are getting updated.
+        query = sqlGenerator.addForUpdateClause(query);
+        LOG.debug("Execute query: " + query);
+        try (ResultSet rs = statement.executeQuery(query)) {
+          while (rs.next()) {
+            PartitionInfo partitionInfo = new PartitionInfo(rs.getLong(1),
+                rs.getLong(2), rs.getString(3));
+            partitionInfoMap.put(partitionInfo, 
partColStatsMap.get(rs.getString(3)));
+          }
         }
-      } finally {
-        close(rs, statement, null);
       }
     }
     return partitionInfoMap;
@@ -556,7 +492,7 @@ class DirectSqlUpdatePart {
     Connection dbConn = null;
     boolean committed = false;
     try {
-      lockInternal();
+      dbType.lockInternal();
       jdoConn = pm.getDataStoreConnection();
       dbConn = (Connection) (jdoConn.getNativeConnection());
 
@@ -606,7 +542,7 @@ class DirectSqlUpdatePart {
         rollbackDBConn(dbConn);
       }
       closeDbConn(jdoConn);
-      unlockInternal();
+      dbType.unlockInternal();
     }
   }
 
@@ -615,15 +551,13 @@ class DirectSqlUpdatePart {
    * @return The CD id before update.
    */
   public long getNextCSIdForMPartitionColumnStatistics(long numStats) throws 
MetaException {
-    Statement statement = null;
-    ResultSet rs = null;
     long maxCsId = 0;
     boolean committed = false;
     Connection dbConn = null;
     JDOConnection jdoConn = null;
 
     try {
-      lockInternal();
+      dbType.lockInternal();
       jdoConn = pm.getDataStoreConnection();
       dbConn = (Connection) (jdoConn.getNativeConnection());
 
@@ -637,43 +571,41 @@ class DirectSqlUpdatePart {
         String query = sqlGenerator.addForUpdateClause("SELECT \"NEXT_VAL\" 
FROM \"SEQUENCE_TABLE\" "
                 + "WHERE \"SEQUENCE_NAME\"= "
                 + 
quoteString("org.apache.hadoop.hive.metastore.model.MPartitionColumnStatistics"));
-        LOG.debug("Going to execute query " + query);
-        statement = dbConn.createStatement();
-        rs = statement.executeQuery(query);
-        if (rs.next()) {
-          maxCsId = rs.getLong(1);
-        } else if (insertDone) {
-          throw new MetaException("Invalid state of SEQUENCE_TABLE for 
MPartitionColumnStatistics");
-        } else {
-          insertDone = true;
-          closeStmt(statement);
-          statement = dbConn.createStatement();
-          query = "INSERT INTO \"SEQUENCE_TABLE\" (\"SEQUENCE_NAME\", 
\"NEXT_VAL\")  VALUES ( "
-                  + 
quoteString("org.apache.hadoop.hive.metastore.model.MPartitionColumnStatistics")
 + "," + 1
-                  + ")";
-          try {
-            statement.executeUpdate(query);
-          } catch (SQLException e) {
-            // If the record is already inserted by some other thread continue 
to select.
-            if (dbType.isDuplicateKeyError(e)) {
-              continue;
+        LOG.debug("Execute query: " + query);
+        try (Statement statement = dbConn.createStatement();
+             ResultSet rs = statement.executeQuery(query)) {
+          if (rs.next()) {
+            maxCsId = rs.getLong(1);
+          } else if (insertDone) {
+            throw new MetaException("Invalid state of SEQUENCE_TABLE for 
MPartitionColumnStatistics");
+          } else {
+            insertDone = true;
+            query = "INSERT INTO \"SEQUENCE_TABLE\" (\"SEQUENCE_NAME\", 
\"NEXT_VAL\")  VALUES ( "
+                    + 
quoteString("org.apache.hadoop.hive.metastore.model.MPartitionColumnStatistics")
 + "," + 1
+                    + ")";
+            try {
+              statement.executeUpdate(query);
+            } catch (SQLException e) {
+              // If the record is already inserted by some other thread 
continue to select.
+              if (dbType.isDuplicateKeyError(e)) {
+                continue;
+              }
+              LOG.error("Unable to insert into SEQUENCE_TABLE for 
MPartitionColumnStatistics.", e);
+              throw e;
             }
-            LOG.error("Unable to insert into SEQUENCE_TABLE for 
MPartitionColumnStatistics.", e);
-            throw e;
-          } finally {
-            closeStmt(statement);
           }
         }
       }
 
       long nextMaxCsId = maxCsId + numStats + 1;
-      closeStmt(statement);
-      statement = dbConn.createStatement();
       String query = "UPDATE \"SEQUENCE_TABLE\" SET \"NEXT_VAL\" = "
               + nextMaxCsId
               + " WHERE \"SEQUENCE_NAME\" = "
               + 
quoteString("org.apache.hadoop.hive.metastore.model.MPartitionColumnStatistics");
-      statement.executeUpdate(query);
+
+      try (Statement statement = dbConn.createStatement()) {
+        statement.executeUpdate(query);
+      }
       dbConn.commit();
       committed = true;
       return maxCsId;
@@ -685,8 +617,8 @@ class DirectSqlUpdatePart {
       if (!committed) {
         rollbackDBConn(dbConn);
       }
-      close(rs, statement, jdoConn);
-      unlockInternal();
+      closeDbConn(jdoConn);
+      dbType.unlockInternal();
     }
   }
 
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/retry/SqlRetryHandler.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/retry/SqlRetryHandler.java
index 0b7127b6826..727ba0b1bf8 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/retry/SqlRetryHandler.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/retry/SqlRetryHandler.java
@@ -33,7 +33,6 @@ import org.springframework.jdbc.UncategorizedSQLException;
 import java.sql.SQLException;
 import java.util.Objects;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantLock;
 import java.util.regex.Pattern;
 
 /**
@@ -48,11 +47,6 @@ public class SqlRetryHandler {
 
   private final StackThreadLocal<Object> threadLocal = new 
StackThreadLocal<>();
 
-  /**
-   * Derby specific concurrency control
-   */
-  private static final ReentrantLock derbyLock = new ReentrantLock(true);
-
   private final DatabaseProduct databaseProduct;
   private final long deadlockRetryInterval;
   private final long retryInterval;
@@ -122,14 +116,14 @@ public class SqlRetryHandler {
     
     try {
       if (properties.isLockInternally()) {
-        lockInternal();
+        databaseProduct.lockInternal();
       }
       threadLocal.set(new Object());
       return executeWithRetryInternal(properties, function);
     } finally {
       threadLocal.unset();
       if (properties.isLockInternally()) {
-        unlockInternal();
+        databaseProduct.unlockInternal();
       }
     }
   }
@@ -269,21 +263,4 @@ public class SqlRetryHandler {
     }
     return false;
   }
-
-  /**
-   * lockInternal() and {@link #unlockInternal()} are used to serialize those 
operations that require
-   * Select ... For Update to sequence operations properly.  In practice that 
means when running
-   * with Derby database.  See more notes at class level.
-   */
-  private void lockInternal() {
-    if(databaseProduct.isDERBY()) {
-      derbyLock.lock();
-    }
-  }
-  private void unlockInternal() {
-    if(databaseProduct.isDERBY()) {
-      derbyLock.unlock();
-    }
-  }
-
 }

Reply via email to