This is an automated email from the ASF dual-hosted git repository.

dengzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new a9479256387 HIVE-28578: Concurrency issue in 
updateTableColumnStatistics (#6159)
a9479256387 is described below

commit a9479256387d7f3a392e31da9c372eaa92375f27
Author: dengzh <[email protected]>
AuthorDate: Fri Nov 14 14:25:31 2025 +0800

    HIVE-28578: Concurrency issue in updateTableColumnStatistics (#6159)
---
 .../hadoop/hive/metastore/conf/MetastoreConf.java  |   8 +
 .../hadoop/hive/metastore/DatabaseProduct.java     |  73 +++++-
 .../apache/hadoop/hive/metastore/ObjectStore.java  | 260 ++++++++-------------
 .../hadoop/hive/metastore/tools/SQLGenerator.java  |   6 +-
 .../hive/metastore/utils/RetryingExecutor.java     | 134 +++++++++++
 .../hadoop/hive/metastore/DummyCustomRDBMS.java    |   2 +-
 .../hadoop/hive/metastore/TestObjectStore.java     |   9 +-
 7 files changed, 323 insertions(+), 169 deletions(-)

diff --git 
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index 71e99b0d32e..0b6fd083123 100644
--- 
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++ 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -1309,6 +1309,14 @@ public enum ConfVars {
             + "  seqprefix: adds a 'N_' prefix to the table name to get a 
unique location (table,1_table,2_table,...)\n"
             + "  prohibit: do not consider alternate locations; throw error if 
the default is not available\n"
             + "  force: use the default location even in case the directory is 
already available"),
+    METASTORE_S4U_NOWAIT_MAX_RETRIES("metastore.s4u.nowait.max.retries",
+        "hive.metastore.s4u.nowait.max.retries", 100,
+        "Number of retries required to acquire a row lock immediately without 
waiting."),
+    METASTORE_S4U_NOWAIT_RETRY_SLEEP_INTERVAL(
+        "metastore.s4u.nowait.retry.sleep.interval",
+        "hive.metastore.s4u.nowait.retry.sleep.interval", 300, 
TimeUnit.MILLISECONDS,
+        "Sleep interval between retries to acquire a row lock immediately 
described part of property "
+            + METASTORE_S4U_NOWAIT_MAX_RETRIES.name()),
 
     MULTITHREADED("javax.jdo.option.Multithreaded", 
"javax.jdo.option.Multithreaded", true,
         "Set this to true if multiple threads access metastore through JDO 
concurrently."),
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DatabaseProduct.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DatabaseProduct.java
index 2b012b9a9bb..686c1e9c371 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DatabaseProduct.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DatabaseProduct.java
@@ -32,6 +32,7 @@
 import java.util.stream.Stream;
 
 import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.metastore.api.MetaException;
@@ -66,7 +67,9 @@ public class DatabaseProduct implements Configurable {
    */
   private static final ReentrantLock derbyLock = new ReentrantLock(true);
 
-  public enum DbType {DERBY, MYSQL, POSTGRES, ORACLE, SQLSERVER, CUSTOM, 
UNDEFINED};
+  public enum DbType {
+    DERBY, MYSQL, POSTGRES, ORACLE, SQLSERVER, CUSTOM, UNDEFINED
+  };
   static public DbType dbType;
 
   // Singleton instance
@@ -75,6 +78,11 @@ public enum DbType {DERBY, MYSQL, POSTGRES, ORACLE, 
SQLSERVER, CUSTOM, UNDEFINED
   Configuration myConf;
 
   private String productName;
+
+  private String dbVersion;
+
+  private Pair<Integer, Integer> versionNums;
+
   /**
    * Protected constructor for singleton class
    */
@@ -92,7 +100,10 @@ public static DatabaseProduct 
determineDatabaseProduct(DataSource connPool,
       Configuration conf) {
     try (Connection conn = connPool.getConnection()) {
       String s = conn.getMetaData().getDatabaseProductName();
-      return determineDatabaseProduct(s, conf);
+      String version = conn.getMetaData().getDatabaseProductVersion();
+      int majorVersion = conn.getMetaData().getDatabaseMajorVersion();
+      int minorVersion = conn.getMetaData().getDatabaseMinorVersion();
+      return determineDatabaseProduct(s, version, Pair.of(majorVersion, 
minorVersion), conf);
     } catch (SQLException e) {
       throw new IllegalStateException("Unable to get database product name", 
e);
     }
@@ -103,8 +114,12 @@ public static DatabaseProduct 
determineDatabaseProduct(DataSource connPool,
    * @param productName string to defer database connection
    * @return database product type
    */
-  public static DatabaseProduct determineDatabaseProduct(String productName,
-      Configuration conf) {
+  public static DatabaseProduct determineDatabaseProduct(String productName, 
Configuration configuration) {
+    return determineDatabaseProduct(productName, null, null, configuration);
+  }
+
+  private static DatabaseProduct determineDatabaseProduct(String productName,
+      String version, Pair<Integer, Integer> versionNums, Configuration conf) {
     DbType dbt;
 
     Preconditions.checkNotNull(conf, "Configuration is null");
@@ -117,6 +132,12 @@ public static DatabaseProduct 
determineDatabaseProduct(String productName,
         dbt = DbType.CUSTOM;
       }
       Preconditions.checkState(theDatabaseProduct.dbType == dbt);
+      if (theDatabaseProduct.dbVersion == null && version != null) {
+        theDatabaseProduct.dbVersion = version;
+      }
+      if (theDatabaseProduct.versionNums == null && versionNums != null) {
+        theDatabaseProduct.versionNums = versionNums;
+      }
       return theDatabaseProduct;
     }
 
@@ -160,6 +181,12 @@ public static DatabaseProduct 
determineDatabaseProduct(String productName,
 
         theDatabaseProduct.dbType = dbt;
         theDatabaseProduct.productName = productName;
+        if (version != null) {
+          theDatabaseProduct.dbVersion = version;
+        }
+        if (versionNums != null) {
+          theDatabaseProduct.versionNums = versionNums;
+        }
       }
     }
     return theDatabaseProduct;
@@ -424,24 +451,34 @@ public String isWithinCheckInterval(String expr, long 
intervalInSeconds) throws
     return condition;
   }
 
-  public String addForUpdateClause(String selectStatement) throws 
MetaException {
+  public String addForUpdateClause(String selectStatement, boolean noWait) 
throws MetaException {
     switch (dbType) {
     case DERBY:
       //https://db.apache.org/derby/docs/10.1/ref/rrefsqlj31783.html
       //sadly in Derby, FOR UPDATE doesn't meant what it should
       return selectStatement;
-    case MYSQL:
-      //http://dev.mysql.com/doc/refman/5.7/en/select.html
     case ORACLE:
       //https://docs.oracle.com/cd/E17952_01/refman-5.6-en/select.html
     case POSTGRES:
       //http://www.postgresql.org/docs/9.0/static/sql-select.html
     case CUSTOM: // ANSI SQL
+      return selectStatement + " for update" + (noWait ? " NOWAIT" : "");
+    case MYSQL:
+      //http://dev.mysql.com/doc/refman/5.7/en/select.html
+      if (noWait) {
+        if (canMySQLSupportNoWait()) {
+          return selectStatement + " for update NOWAIT";
+        } else {
+          int selectLength = "select".length();
+          return selectStatement.trim().substring(0, selectLength) + " /*+ 
MAX_EXECUTION_TIME(300) */ " +
+              selectStatement.trim().substring(selectLength) + " for update";
+        }
+      }
       return selectStatement + " for update";
     case SQLSERVER:
       //https://msdn.microsoft.com/en-us/library/ms189499.aspx
       //https://msdn.microsoft.com/en-us/library/ms187373.aspx
-      String modifier = " with (updlock)";
+      String modifier = " with (updlock" + (noWait ? ",NOWAIT" : "") + ")";
       int wherePos = selectStatement.toUpperCase().indexOf(" WHERE ");
       if (wherePos < 0) {
         return selectStatement + modifier;
@@ -455,6 +492,26 @@ public String addForUpdateClause(String selectStatement) 
throws MetaException {
     }
   }
 
+  private boolean canMySQLSupportNoWait() {
+    if (versionNums == null) {
+      // Cannot determine the real version of back db
+      return false;
+    }
+    // Prior to MySQL 8.0.1, the NOWAIT clause for row locking was not 
supported directly in the s4u syntax.
+    // Use the MAX_EXECUTION_TIME to ensure the s4u does not run indefinitely.
+    String dbName = productName.replaceAll("\\s+", "").toLowerCase();
+    boolean isMariaDB = dbName.contains(MARIADB_NAME) ||
+        (dbVersion != null && dbVersion.toLowerCase().contains(MARIADB_NAME));
+    if (isMariaDB) {
+      // 
https://mariadb.com/docs/release-notes/community-server/old-releases/release-notes-mariadb-10-3-series/mariadb-1030-release-notes
+      return (versionNums.getLeft() >= 10 && versionNums.getRight() > 2);
+    } else {
+      // 
https://dev.mysql.com/blog-archive/mysql-8-0-1-using-skip-locked-and-nowait-to-handle-hot-rows/
+      return versionNums.getLeft() > 8 ||
+          (versionNums.getLeft() == 8 && dbVersion != null && 
dbVersion.compareToIgnoreCase("8.0.1") >= 0);
+    }
+  }
+
   /**
    * Add a limit clause to a given query
    * @param numRows
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index 2da8955d5dd..659f47e7035 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -52,9 +52,10 @@
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Lock;
+import java.util.function.Consumer;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
@@ -67,7 +68,6 @@
 import javax.jdo.datastore.JDOConnection;
 import javax.jdo.identity.IntIdentity;
 
-import com.google.common.util.concurrent.Striped;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -254,6 +254,7 @@
 import org.apache.hadoop.hive.metastore.utils.FileUtils;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.utils.RetryingExecutor;
 import org.apache.thrift.TException;
 import org.datanucleus.ExecutionContext;
 import org.datanucleus.api.jdo.JDOPersistenceManager;
@@ -344,8 +345,6 @@ private enum TXN_STATUS {
   private boolean areTxnStatsSupported = false;
   private PropertyStore propertyStore;
 
-  private static Striped<Lock> tablelocks;
-
   public ObjectStore() {
   }
 
@@ -395,15 +394,6 @@ public void setConf(Configuration conf) {
     } else {
       LOG.debug("Initialized ObjectStore");
     }
-
-    if (tablelocks == null) {
-      synchronized (ObjectStore.class) {
-        if (tablelocks == null) {
-          int numTableLocks = MetastoreConf.getIntVar(conf, 
ConfVars.METASTORE_NUM_STRIPED_TABLE_LOCKS);
-          tablelocks = Striped.lazyWeakLock(numTableLocks);
-        }
-      }
-    }
   }
 
   @SuppressWarnings("nls")
@@ -9165,98 +9155,97 @@ private void writeMPartitionColumnStatistics(Table 
table, Partition partition,
     }
   }
 
-  /**
-   * Get table's column stats
-   *
-   * @return Map of column name and its stats
-   */
-  private Map<String, MTableColumnStatistics> getPartitionColStats(Table 
table, List<String> colNames, String engine)
-      throws MetaException {
-    Map<String, MTableColumnStatistics> statsMap = Maps.newHashMap();
-    List<MTableColumnStatistics> stats = getMTableColumnStatistics(table, 
colNames, engine);
-    for (MTableColumnStatistics cStat : stats) {
-      statsMap.put(cStat.getColName(), cStat);
-    }
-    return statsMap;
-  }
-  
   @Override
   public Map<String, String> updateTableColumnStatistics(ColumnStatistics 
colStats, String validWriteIds, long writeId)
       throws NoSuchObjectException, MetaException, InvalidObjectException, 
InvalidInputException {
     boolean committed = false;
-
     List<ColumnStatisticsObj> statsObjs = colStats.getStatsObj();
     ColumnStatisticsDesc statsDesc = colStats.getStatsDesc();
-    
-    Lock tableLock = getTableLockFor(statsDesc.getDbName(), 
statsDesc.getTableName());
-    tableLock.lock();
+    long start = System.currentTimeMillis();
+    String catName = statsDesc.isSetCatName() ? statsDesc.getCatName() : 
getDefaultCatalog(conf);
     try {
       openTransaction();
       // DataNucleus objects get detached all over the place for no (real) 
reason.
       // So let's not use them anywhere unless absolutely necessary.
-      String catName = statsDesc.isSetCatName() ? statsDesc.getCatName() : 
getDefaultCatalog(conf);
       MTable mTable = ensureGetMTable(catName, statsDesc.getDbName(), 
statsDesc.getTableName());
-      Table table = convertToTable(mTable);
-      List<String> colNames = new ArrayList<>();
-      for (ColumnStatisticsObj statsObj : statsObjs) {
-        colNames.add(statsObj.getColName());
-      }
-
-      Map<String, MTableColumnStatistics> oldStats = 
getPartitionColStats(table, colNames, colStats.getEngine());
-
-      for (ColumnStatisticsObj statsObj : statsObjs) {
-        MTableColumnStatistics mStatsObj = 
StatObjectConverter.convertToMTableColumnStatistics(
-          mTable, statsDesc,
-          statsObj, colStats.getEngine());
-        writeMTableColumnStatistics(table, mStatsObj, 
oldStats.get(statsObj.getColName()));
-        // There is no need to add colname again, otherwise we will get 
duplicate colNames.
-      }
-
-      // TODO: (HIVE-20109) ideally the col stats stats should be in colstats, 
not in the table!
-      // Set the table properties
-      // No need to check again if it exists.
-      String dbname = table.getDbName();
-      String name = table.getTableName();
-      MTable oldt = mTable;
-      Map<String, String> newParams = new HashMap<>(table.getParameters());
-      StatsSetupConst.setColumnStatsState(newParams, colNames);
-      boolean isTxn = TxnUtils.isTransactionalTable(oldt.getParameters());
-      if (isTxn) {
-        if (!areTxnStatsSupported) {
-          StatsSetupConst.setBasicStatsState(newParams, StatsSetupConst.FALSE);
-        } else {
-          String errorMsg = verifyStatsChangeCtx(TableName.getDbTable(dbname, 
name),
-            oldt.getParameters(), newParams, writeId, validWriteIds, true);
-          if (errorMsg != null) {
-            throw new MetaException(errorMsg);
-          }
-          if (!isCurrentStatsValidForTheQuery(oldt, validWriteIds, true)) {
-            // Make sure we set the flag to invalid regardless of the current 
value.
+      int maxRetries = MetastoreConf.getIntVar(conf, 
ConfVars.METASTORE_S4U_NOWAIT_MAX_RETRIES);
+      long sleepInterval = MetastoreConf.getTimeVar(conf,
+          ConfVars.METASTORE_S4U_NOWAIT_RETRY_SLEEP_INTERVAL, 
TimeUnit.MILLISECONDS);
+      Map<String, String> result = new RetryingExecutor<>(maxRetries, () -> {
+        Ref<Exception> exceptionRef = new Ref<>();
+        String savePoint = "uts_" + ThreadLocalRandom.current().nextInt(10000) 
+ "_" + System.nanoTime();
+        setTransactionSavePoint(savePoint);
+        executePlainSQL(
+            sqlGenerator.addForUpdateNoWait("SELECT \"TBL_ID\" FROM \"TBLS\" 
WHERE \"TBL_ID\" = " + mTable.getId()),
+            exception -> {
+              rollbackTransactionToSavePoint(savePoint);
+              exceptionRef.t = exception;
+            });
+        if (exceptionRef.t != null) {
+          throw new RetryingExecutor.RetryException(exceptionRef.t);
+        }
+        pm.refresh(mTable);
+        Table table = convertToTable(mTable);
+        List<String> colNames = new ArrayList<>();
+        for (ColumnStatisticsObj statsObj : statsObjs) {
+          colNames.add(statsObj.getColName());
+        }
+
+        Map<String, MTableColumnStatistics> oldStats = Maps.newHashMap();
+        List<MTableColumnStatistics> stats = getMTableColumnStatistics(table, 
colNames, colStats.getEngine());
+        for (MTableColumnStatistics cStat : stats) {
+          oldStats.put(cStat.getColName(), cStat);
+        }
+
+        for (ColumnStatisticsObj statsObj : statsObjs) {
+          MTableColumnStatistics mStatsObj = 
StatObjectConverter.convertToMTableColumnStatistics(mTable, statsDesc,
+              statsObj, colStats.getEngine());
+          writeMTableColumnStatistics(table, mStatsObj, 
oldStats.get(statsObj.getColName()));
+          // There is no need to add colname again, otherwise we will get 
duplicate colNames.
+        }
+
+        // TODO: (HIVE-20109) ideally the col stats stats should be in 
colstats, not in the table!
+        // Set the table properties
+        // No need to check again if it exists.
+        String dbname = table.getDbName();
+        String name = table.getTableName();
+        MTable oldt = mTable;
+        Map<String, String> newParams = new HashMap<>(table.getParameters());
+        StatsSetupConst.setColumnStatsState(newParams, colNames);
+        boolean isTxn = TxnUtils.isTransactionalTable(oldt.getParameters());
+        if (isTxn) {
+          if (!areTxnStatsSupported) {
             StatsSetupConst.setBasicStatsState(newParams, 
StatsSetupConst.FALSE);
-            LOG.info("Removed COLUMN_STATS_ACCURATE from the parameters of the 
table "
-              + dbname + "." + name);
+          } else {
+            String errorMsg = 
verifyStatsChangeCtx(TableName.getDbTable(dbname, name), oldt.getParameters(), 
newParams,
+                writeId, validWriteIds, true);
+            if (errorMsg != null) {
+              throw new MetaException(errorMsg);
+            }
+            if (!isCurrentStatsValidForTheQuery(oldt, validWriteIds, true)) {
+              // Make sure we set the flag to invalid regardless of the 
current value.
+              StatsSetupConst.setBasicStatsState(newParams, 
StatsSetupConst.FALSE);
+              LOG.info("Removed COLUMN_STATS_ACCURATE from the parameters of 
the table " + dbname + "." + name);
+            }
+            oldt.setWriteId(writeId);
           }
-          oldt.setWriteId(writeId);
         }
-      }
-      oldt.setParameters(newParams);
-
+        oldt.setParameters(newParams);
+        return newParams;
+      }).onRetry(e -> e instanceof RetryingExecutor.RetryException)
+        
.commandName("updateTableColumnStatistics").sleepInterval(sleepInterval, 
interval ->
+              ThreadLocalRandom.current().nextLong(sleepInterval) + 30).run();
       committed = commitTransaction();
       // TODO: similar to update...Part, this used to do "return committed;"; 
makes little sense.
-      return committed ? newParams : null;
+      return committed ? result : null;
     } finally {
-      try {
-        rollbackAndCleanup(committed, null);
-      } finally {
-        tableLock.unlock();
-      }
+      LOG.debug("{} updateTableColumnStatistics took {}ms, success: {}",
+          new TableName(catName, statsDesc.getDbName(), 
statsDesc.getTableName()),
+          System.currentTimeMillis() - start, committed);
+      rollbackAndCleanup(committed, null);
     }
   }
 
-  private Lock getTableLockFor(String dbName, String tblName) {
-    return tablelocks.get(dbName + "." + tblName);
-  }
-
   /**
    * Get partition's column stats
    *
@@ -11091,93 +11080,50 @@ public List<WriteEventInfo> getAllWriteEventInfo(long 
txnId, String dbName, Stri
     return writeEventInfoList;
   }
 
-  private void prepareQuotes() throws SQLException {
+  private void executePlainSQL(String sql, Consumer<Exception> 
exceptionConsumer)
+      throws SQLException {
     String s = dbType.getPrepareTxnStmt();
-    if (s != null) {
-      assert pm.currentTransaction().isActive();
-      JDOConnection jdoConn = pm.getDataStoreConnection();
-      try (Statement statement = ((Connection) 
jdoConn.getNativeConnection()).createStatement()) {
+    assert pm.currentTransaction().isActive();
+    JDOConnection jdoConn = pm.getDataStoreConnection();
+    Connection conn = (Connection) jdoConn.getNativeConnection();
+    try (Statement statement = conn.createStatement()) {
+      if (s != null) {
         statement.execute(s);
-      } finally {
-        jdoConn.close();
       }
+      try {
+        statement.execute(sql);
+      } catch (SQLException e) {
+        if (exceptionConsumer != null) {
+          exceptionConsumer.accept(e);
+        } else {
+          throw e;
+        }
+      }
+    } finally {
+      jdoConn.close();
     }
   }
 
   private void lockNotificationSequenceForUpdate() throws MetaException {
+    int maxRetries =
+        MetastoreConf.getIntVar(conf, 
ConfVars.NOTIFICATION_SEQUENCE_LOCK_MAX_RETRIES);
+    long sleepInterval = MetastoreConf.getTimeVar(conf,
+        ConfVars.NOTIFICATION_SEQUENCE_LOCK_RETRY_SLEEP_INTERVAL, 
TimeUnit.MILLISECONDS);
     if (sqlGenerator.getDbProduct().isDERBY() && directSql != null) {
       // Derby doesn't allow FOR UPDATE to lock the row being selected (See 
https://db.apache
       // .org/derby/docs/10.1/ref/rrefsqlj31783.html) . So lock the whole 
table. Since there's
       // only one row in the table, this shouldn't cause any performance 
degradation.
-      new RetryingExecutor(conf, () -> {
+      new RetryingExecutor<Void>(maxRetries, () -> {
         directSql.lockDbTable("NOTIFICATION_SEQUENCE");
-      }).run();
+        return null;
+      
}).commandName("lockNotificationSequenceForUpdate").sleepInterval(sleepInterval).run();
     } else {
       String selectQuery = "select \"NEXT_EVENT_ID\" from 
\"NOTIFICATION_SEQUENCE\"";
       String lockingQuery = sqlGenerator.addForUpdateClause(selectQuery);
-      new RetryingExecutor(conf, () -> {
-        prepareQuotes();
-        try (QueryWrapper query = new 
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", lockingQuery))) {
-          query.setUnique(true);
-          // only need to execute it to get db Lock
-          query.execute();
-        }
-      }).run();
-    }
-  }
-
-  static class RetryingExecutor {
-    interface Command {
-      void process() throws Exception;
-    }
-
-    private static Logger LOG = 
LoggerFactory.getLogger(RetryingExecutor.class);
-    private final int maxRetries;
-    private final long sleepInterval;
-    private int currentRetries = 0;
-    private final Command command;
-
-    RetryingExecutor(Configuration config, Command command) {
-      this.maxRetries =
-          MetastoreConf.getIntVar(config, 
ConfVars.NOTIFICATION_SEQUENCE_LOCK_MAX_RETRIES);
-      this.sleepInterval = MetastoreConf.getTimeVar(config,
-          ConfVars.NOTIFICATION_SEQUENCE_LOCK_RETRY_SLEEP_INTERVAL, 
TimeUnit.MILLISECONDS);
-      this.command = command;
-    }
-
-    public void run() throws MetaException {
-      while (true) {
-        try {
-          command.process();
-          break;
-        } catch (Exception e) {
-          LOG.info(
-              "Attempting to acquire the DB log notification lock: {} out of 
{}" +
-                " retries", currentRetries, maxRetries, e);
-          if (currentRetries >= maxRetries) {
-            String message =
-                "Couldn't acquire the DB log notification lock because we 
reached the maximum"
-                    + " # of retries: " + maxRetries
-                    + " retries. If this happens too often, then is 
recommended to "
-                    + "increase the maximum number of retries on the"
-                    + " hive.notification.sequence.lock.max.retries 
configuration";
-            LOG.error(message, e);
-            throw new MetaException(message + " :: " + e.getMessage());
-          }
-          currentRetries++;
-          try {
-            Thread.sleep(sleepInterval);
-          } catch (InterruptedException e1) {
-            String msg = "Couldn't acquire the DB notification log lock on " + 
currentRetries
-                + " retry, because the following error: ";
-            LOG.error(msg, e1);
-            throw new MetaException(msg + e1.getMessage());
-          }
-        }
-      }
-    }
-    public long getSleepInterval() {
-      return sleepInterval;
+      new RetryingExecutor<Void>(maxRetries, () -> {
+        executePlainSQL(lockingQuery, null);
+        return null;
+      
}).commandName("lockNotificationSequenceForUpdate").sleepInterval(sleepInterval).run();
     }
   }
 
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java
index b793345dd94..7b3382265b2 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java
@@ -127,7 +127,11 @@ private List<String> createInsertValuesStmt(String 
tblColumns, List<String> rows
    * construct.  If the DB doesn't support, return original select.
    */
   public String addForUpdateClause(String selectStatement) throws 
MetaException {
-    return dbProduct.addForUpdateClause(selectStatement);
+    return dbProduct.addForUpdateClause(selectStatement, false);
+  }
+
+  public String addForUpdateNoWait(String selectStatement) throws 
MetaException {
+    return dbProduct.addForUpdateClause(selectStatement, true);
   }
 
   /**
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/RetryingExecutor.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/RetryingExecutor.java
new file mode 100644
index 00000000000..dcaa10be831
--- /dev/null
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/RetryingExecutor.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.utils;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.util.concurrent.Callable;
+import java.util.function.Function;
+import java.util.function.Predicate;
+
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RetryingExecutor<T> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(RetryingExecutor.class);
+
+  private final int maxRetries;
+  private long sleepInterval = 1000;
+  private final Callable<T> command;
+  private Predicate<Exception> retryPolicy;
+  private int currentRetries = 0;
+  private String commandName;
+  private Function<Long, Long> sleepIntervalFunc;
+
+  public RetryingExecutor(int maxRetries, Callable<T> command) {
+    this.maxRetries = maxRetries;
+    this.command = command;
+    // default commandName unless specified
+    this.commandName = StackWalker.getInstance()
+        .walk(frames -> frames
+            .skip(1)
+            .findFirst()
+            .map(StackWalker.StackFrame::getMethodName)).get();
+  }
+
+  public RetryingExecutor<T> onRetry(Predicate<Exception> retryPolicy) {
+    this.retryPolicy = retryPolicy;
+    return this;
+  }
+
+  public RetryingExecutor<T> commandName(String name) {
+    this.commandName = name;
+    return this;
+  }
+
+  public RetryingExecutor<T> sleepInterval(long sleepInterval) {
+    return sleepInterval(sleepInterval, null);
+  }
+
+  public RetryingExecutor<T> sleepInterval(long sleepInterval,
+      Function<Long, Long> sleepIntervalFunc) {
+    this.sleepInterval = sleepInterval;
+    this.sleepIntervalFunc = sleepIntervalFunc;
+    return this;
+  }
+
+  public T run() throws MetaException {
+    while (true) {
+      try {
+        return command.call();
+      } catch (Exception e) {
+        checkException(e);
+        LOG.info("Attempting to retry the command:{} in {} out of {} retries",
+            commandName, currentRetries, maxRetries, e);
+        if (currentRetries >= maxRetries) {
+          String message = "Couldn't finish the command: " + commandName +
+              " because we reached the maximum of retries: " + maxRetries;
+          LOG.error(message, e);
+          throw new MetaException(message + " :: " + e.getMessage());
+        }
+        currentRetries++;
+        try {
+          Thread.sleep(getSleepInterval());
+        } catch (InterruptedException e1) {
+          String msg = "Couldn't run the command: " + commandName + " in " + 
currentRetries +
+              " retry, because the following error: ";
+          LOG.error(msg, e1);
+          throw new MetaException(msg + e1.getMessage());
+        }
+      }
+    }
+  }
+
+  private void checkException(Exception e) throws MetaException {
+    if (retryPolicy != null && !retryPolicy.test(e)) {
+      String message = "See a fatal exception, avoid to retry the command:" + 
commandName;
+      LOG.info(message, e);
+      String errorMessage = ExceptionUtils.getMessage(e);
+      if (e instanceof InvocationTargetException || e instanceof 
UndeclaredThrowableException) {
+        errorMessage = ExceptionUtils.getMessage(e.getCause());
+      }
+      Throwable rootCause = ExceptionUtils.getRootCause(e);
+      errorMessage += (rootCause == null ? "" : ("\nRoot cause: " + 
rootCause));
+      throw new MetaException(message + " :: " + errorMessage);
+    }
+  }
+
+  public static class RetryException extends Exception {
+    private static final long serialVersionUID = 1L;
+
+    public RetryException(Exception ex) {
+      super(ex);
+    }
+
+    public RetryException(String msg) {
+      super(msg);
+    }
+  }
+
+  public long getSleepInterval() {
+    if (sleepIntervalFunc != null) {
+      this.sleepInterval = sleepIntervalFunc.apply(sleepInterval);
+    }
+    return sleepInterval;
+  }
+}
diff --git 
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/DummyCustomRDBMS.java
 
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/DummyCustomRDBMS.java
index 444edd8812f..aa2ba60934a 100644
--- 
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/DummyCustomRDBMS.java
+++ 
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/DummyCustomRDBMS.java
@@ -74,7 +74,7 @@ public String isWithinCheckInterval(String expr, long 
intervalInSeconds) {
     return "DummyIsWithin";
   }
   @Override
-  public String addForUpdateClause(String selectStatement) {
+  public String addForUpdateClause(String selectStatement, boolean noWait) {
     return selectStatement + " for update";
   }
   @Override
diff --git 
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestObjectStore.java
 
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestObjectStore.java
index 07af321300e..f755aec6b19 100644
--- 
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestObjectStore.java
+++ 
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestObjectStore.java
@@ -21,7 +21,6 @@
 import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
-import org.apache.hadoop.hive.metastore.ObjectStore.RetryingExecutor;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.metastore.annotation.MetastoreUnitTest;
 import org.apache.hadoop.hive.metastore.api.Catalog;
@@ -81,6 +80,7 @@
 import org.apache.hadoop.hive.metastore.model.MNotificationNextId;
 import org.apache.hadoop.hive.metastore.model.MTable;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils;
+import org.apache.hadoop.hive.metastore.utils.RetryingExecutor;
 import org.junit.Assert;
 import org.junit.Assume;
 import org.junit.Before;
@@ -1273,7 +1273,12 @@ public void testQueryCloseOnError() throws Exception {
 
   @Test
   public void testRetryingExecutorSleep() throws Exception {
-    RetryingExecutor re = new 
ObjectStore.RetryingExecutor(MetastoreConf.newMetastoreConf(), null);
+    int maxRetries =
+        MetastoreConf.getIntVar(conf, 
ConfVars.NOTIFICATION_SEQUENCE_LOCK_MAX_RETRIES);
+    long sleepInterval = MetastoreConf.getTimeVar(conf,
+        ConfVars.NOTIFICATION_SEQUENCE_LOCK_RETRY_SLEEP_INTERVAL, 
TimeUnit.MILLISECONDS);
+    RetryingExecutor<Void> re = new RetryingExecutor<Void>(maxRetries, null)
+        .sleepInterval(sleepInterval);
     Assert.assertTrue("invalid sleep value", re.getSleepInterval() >= 0);
   }
 

Reply via email to