This is an automated email from the ASF dual-hosted git repository.
dengzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new a9479256387 HIVE-28578: Concurrency issue in
updateTableColumnStatistics (#6159)
a9479256387 is described below
commit a9479256387d7f3a392e31da9c372eaa92375f27
Author: dengzh <[email protected]>
AuthorDate: Fri Nov 14 14:25:31 2025 +0800
HIVE-28578: Concurrency issue in updateTableColumnStatistics (#6159)
---
.../hadoop/hive/metastore/conf/MetastoreConf.java | 8 +
.../hadoop/hive/metastore/DatabaseProduct.java | 73 +++++-
.../apache/hadoop/hive/metastore/ObjectStore.java | 260 ++++++++-------------
.../hadoop/hive/metastore/tools/SQLGenerator.java | 6 +-
.../hive/metastore/utils/RetryingExecutor.java | 134 +++++++++++
.../hadoop/hive/metastore/DummyCustomRDBMS.java | 2 +-
.../hadoop/hive/metastore/TestObjectStore.java | 9 +-
7 files changed, 323 insertions(+), 169 deletions(-)
diff --git
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index 71e99b0d32e..0b6fd083123 100644
---
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -1309,6 +1309,14 @@ public enum ConfVars {
+ " seqprefix: adds a 'N_' prefix to the table name to get a
unique location (table,1_table,2_table,...)\n"
+ " prohibit: do not consider alternate locations; throw error if
the default is not available\n"
+ " force: use the default location even in case the directory is
already available"),
+ METASTORE_S4U_NOWAIT_MAX_RETRIES("metastore.s4u.nowait.max.retries",
+ "hive.metastore.s4u.nowait.max.retries", 100,
+ "Number of retries required to acquire a row lock immediately without
waiting."),
+ METASTORE_S4U_NOWAIT_RETRY_SLEEP_INTERVAL(
+ "metastore.s4u.nowait.retry.sleep.interval",
+ "hive.metastore.s4u.nowait.retry.sleep.interval", 300,
TimeUnit.MILLISECONDS,
+ "Sleep interval between retries to acquire a row lock immediately
described part of property "
+ + METASTORE_S4U_NOWAIT_MAX_RETRIES.name()),
MULTITHREADED("javax.jdo.option.Multithreaded",
"javax.jdo.option.Multithreaded", true,
"Set this to true if multiple threads access metastore through JDO
concurrently."),
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DatabaseProduct.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DatabaseProduct.java
index 2b012b9a9bb..686c1e9c371 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DatabaseProduct.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DatabaseProduct.java
@@ -32,6 +32,7 @@
import java.util.stream.Stream;
import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.api.MetaException;
@@ -66,7 +67,9 @@ public class DatabaseProduct implements Configurable {
*/
private static final ReentrantLock derbyLock = new ReentrantLock(true);
- public enum DbType {DERBY, MYSQL, POSTGRES, ORACLE, SQLSERVER, CUSTOM,
UNDEFINED};
+ public enum DbType {
+ DERBY, MYSQL, POSTGRES, ORACLE, SQLSERVER, CUSTOM, UNDEFINED
+ };
static public DbType dbType;
// Singleton instance
@@ -75,6 +78,11 @@ public enum DbType {DERBY, MYSQL, POSTGRES, ORACLE,
SQLSERVER, CUSTOM, UNDEFINED
Configuration myConf;
private String productName;
+
+ private String dbVersion;
+
+ private Pair<Integer, Integer> versionNums;
+
/**
* Protected constructor for singleton class
*/
@@ -92,7 +100,10 @@ public static DatabaseProduct
determineDatabaseProduct(DataSource connPool,
Configuration conf) {
try (Connection conn = connPool.getConnection()) {
String s = conn.getMetaData().getDatabaseProductName();
- return determineDatabaseProduct(s, conf);
+ String version = conn.getMetaData().getDatabaseProductVersion();
+ int majorVersion = conn.getMetaData().getDatabaseMajorVersion();
+ int minorVersion = conn.getMetaData().getDatabaseMinorVersion();
+ return determineDatabaseProduct(s, version, Pair.of(majorVersion,
minorVersion), conf);
} catch (SQLException e) {
throw new IllegalStateException("Unable to get database product name",
e);
}
@@ -103,8 +114,12 @@ public static DatabaseProduct
determineDatabaseProduct(DataSource connPool,
* @param productName string to defer database connection
* @return database product type
*/
- public static DatabaseProduct determineDatabaseProduct(String productName,
- Configuration conf) {
+ public static DatabaseProduct determineDatabaseProduct(String productName,
Configuration configuration) {
+ return determineDatabaseProduct(productName, null, null, configuration);
+ }
+
+ private static DatabaseProduct determineDatabaseProduct(String productName,
+ String version, Pair<Integer, Integer> versionNums, Configuration conf) {
DbType dbt;
Preconditions.checkNotNull(conf, "Configuration is null");
@@ -117,6 +132,12 @@ public static DatabaseProduct
determineDatabaseProduct(String productName,
dbt = DbType.CUSTOM;
}
Preconditions.checkState(theDatabaseProduct.dbType == dbt);
+ if (theDatabaseProduct.dbVersion == null && version != null) {
+ theDatabaseProduct.dbVersion = version;
+ }
+ if (theDatabaseProduct.versionNums == null && versionNums != null) {
+ theDatabaseProduct.versionNums = versionNums;
+ }
return theDatabaseProduct;
}
@@ -160,6 +181,12 @@ public static DatabaseProduct
determineDatabaseProduct(String productName,
theDatabaseProduct.dbType = dbt;
theDatabaseProduct.productName = productName;
+ if (version != null) {
+ theDatabaseProduct.dbVersion = version;
+ }
+ if (versionNums != null) {
+ theDatabaseProduct.versionNums = versionNums;
+ }
}
}
return theDatabaseProduct;
@@ -424,24 +451,34 @@ public String isWithinCheckInterval(String expr, long
intervalInSeconds) throws
return condition;
}
- public String addForUpdateClause(String selectStatement) throws
MetaException {
+ public String addForUpdateClause(String selectStatement, boolean noWait)
throws MetaException {
switch (dbType) {
case DERBY:
//https://db.apache.org/derby/docs/10.1/ref/rrefsqlj31783.html
//sadly in Derby, FOR UPDATE doesn't meant what it should
return selectStatement;
- case MYSQL:
- //http://dev.mysql.com/doc/refman/5.7/en/select.html
case ORACLE:
//https://docs.oracle.com/cd/E17952_01/refman-5.6-en/select.html
case POSTGRES:
//http://www.postgresql.org/docs/9.0/static/sql-select.html
case CUSTOM: // ANSI SQL
+ return selectStatement + " for update" + (noWait ? " NOWAIT" : "");
+ case MYSQL:
+ //http://dev.mysql.com/doc/refman/5.7/en/select.html
+ if (noWait) {
+ if (canMySQLSupportNoWait()) {
+ return selectStatement + " for update NOWAIT";
+ } else {
+ int selectLength = "select".length();
+ return selectStatement.trim().substring(0, selectLength) + " /*+
MAX_EXECUTION_TIME(300) */ " +
+ selectStatement.trim().substring(selectLength) + " for update";
+ }
+ }
return selectStatement + " for update";
case SQLSERVER:
//https://msdn.microsoft.com/en-us/library/ms189499.aspx
//https://msdn.microsoft.com/en-us/library/ms187373.aspx
- String modifier = " with (updlock)";
+ String modifier = " with (updlock" + (noWait ? ",NOWAIT" : "") + ")";
int wherePos = selectStatement.toUpperCase().indexOf(" WHERE ");
if (wherePos < 0) {
return selectStatement + modifier;
@@ -455,6 +492,26 @@ public String addForUpdateClause(String selectStatement)
throws MetaException {
}
}
+ private boolean canMySQLSupportNoWait() {
+ if (versionNums == null) {
+ // Cannot determine the real version of back db
+ return false;
+ }
+ // Prior to MySQL 8.0.1, the NOWAIT clause for row locking was not
supported directly in the s4u syntax.
+ // Use the MAX_EXECUTION_TIME to ensure the s4u does not run indefinitely.
+ String dbName = productName.replaceAll("\\s+", "").toLowerCase();
+ boolean isMariaDB = dbName.contains(MARIADB_NAME) ||
+ (dbVersion != null && dbVersion.toLowerCase().contains(MARIADB_NAME));
+ if (isMariaDB) {
+ //
https://mariadb.com/docs/release-notes/community-server/old-releases/release-notes-mariadb-10-3-series/mariadb-1030-release-notes
+ return (versionNums.getLeft() >= 10 && versionNums.getRight() > 2);
+ } else {
+ //
https://dev.mysql.com/blog-archive/mysql-8-0-1-using-skip-locked-and-nowait-to-handle-hot-rows/
+ return versionNums.getLeft() > 8 ||
+ (versionNums.getLeft() == 8 && dbVersion != null &&
dbVersion.compareToIgnoreCase("8.0.1") >= 0);
+ }
+ }
+
/**
* Add a limit clause to a given query
* @param numRows
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index 2da8955d5dd..659f47e7035 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -52,9 +52,10 @@
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Lock;
+import java.util.function.Consumer;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -67,7 +68,6 @@
import javax.jdo.datastore.JDOConnection;
import javax.jdo.identity.IntIdentity;
-import com.google.common.util.concurrent.Striped;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
@@ -254,6 +254,7 @@
import org.apache.hadoop.hive.metastore.utils.FileUtils;
import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.utils.RetryingExecutor;
import org.apache.thrift.TException;
import org.datanucleus.ExecutionContext;
import org.datanucleus.api.jdo.JDOPersistenceManager;
@@ -344,8 +345,6 @@ private enum TXN_STATUS {
private boolean areTxnStatsSupported = false;
private PropertyStore propertyStore;
- private static Striped<Lock> tablelocks;
-
public ObjectStore() {
}
@@ -395,15 +394,6 @@ public void setConf(Configuration conf) {
} else {
LOG.debug("Initialized ObjectStore");
}
-
- if (tablelocks == null) {
- synchronized (ObjectStore.class) {
- if (tablelocks == null) {
- int numTableLocks = MetastoreConf.getIntVar(conf,
ConfVars.METASTORE_NUM_STRIPED_TABLE_LOCKS);
- tablelocks = Striped.lazyWeakLock(numTableLocks);
- }
- }
- }
}
@SuppressWarnings("nls")
@@ -9165,98 +9155,97 @@ private void writeMPartitionColumnStatistics(Table
table, Partition partition,
}
}
- /**
- * Get table's column stats
- *
- * @return Map of column name and its stats
- */
- private Map<String, MTableColumnStatistics> getPartitionColStats(Table
table, List<String> colNames, String engine)
- throws MetaException {
- Map<String, MTableColumnStatistics> statsMap = Maps.newHashMap();
- List<MTableColumnStatistics> stats = getMTableColumnStatistics(table,
colNames, engine);
- for (MTableColumnStatistics cStat : stats) {
- statsMap.put(cStat.getColName(), cStat);
- }
- return statsMap;
- }
-
@Override
public Map<String, String> updateTableColumnStatistics(ColumnStatistics
colStats, String validWriteIds, long writeId)
throws NoSuchObjectException, MetaException, InvalidObjectException,
InvalidInputException {
boolean committed = false;
-
List<ColumnStatisticsObj> statsObjs = colStats.getStatsObj();
ColumnStatisticsDesc statsDesc = colStats.getStatsDesc();
-
- Lock tableLock = getTableLockFor(statsDesc.getDbName(),
statsDesc.getTableName());
- tableLock.lock();
+ long start = System.currentTimeMillis();
+ String catName = statsDesc.isSetCatName() ? statsDesc.getCatName() :
getDefaultCatalog(conf);
try {
openTransaction();
// DataNucleus objects get detached all over the place for no (real)
reason.
// So let's not use them anywhere unless absolutely necessary.
- String catName = statsDesc.isSetCatName() ? statsDesc.getCatName() :
getDefaultCatalog(conf);
MTable mTable = ensureGetMTable(catName, statsDesc.getDbName(),
statsDesc.getTableName());
- Table table = convertToTable(mTable);
- List<String> colNames = new ArrayList<>();
- for (ColumnStatisticsObj statsObj : statsObjs) {
- colNames.add(statsObj.getColName());
- }
-
- Map<String, MTableColumnStatistics> oldStats =
getPartitionColStats(table, colNames, colStats.getEngine());
-
- for (ColumnStatisticsObj statsObj : statsObjs) {
- MTableColumnStatistics mStatsObj =
StatObjectConverter.convertToMTableColumnStatistics(
- mTable, statsDesc,
- statsObj, colStats.getEngine());
- writeMTableColumnStatistics(table, mStatsObj,
oldStats.get(statsObj.getColName()));
- // There is no need to add colname again, otherwise we will get
duplicate colNames.
- }
-
- // TODO: (HIVE-20109) ideally the col stats stats should be in colstats,
not in the table!
- // Set the table properties
- // No need to check again if it exists.
- String dbname = table.getDbName();
- String name = table.getTableName();
- MTable oldt = mTable;
- Map<String, String> newParams = new HashMap<>(table.getParameters());
- StatsSetupConst.setColumnStatsState(newParams, colNames);
- boolean isTxn = TxnUtils.isTransactionalTable(oldt.getParameters());
- if (isTxn) {
- if (!areTxnStatsSupported) {
- StatsSetupConst.setBasicStatsState(newParams, StatsSetupConst.FALSE);
- } else {
- String errorMsg = verifyStatsChangeCtx(TableName.getDbTable(dbname,
name),
- oldt.getParameters(), newParams, writeId, validWriteIds, true);
- if (errorMsg != null) {
- throw new MetaException(errorMsg);
- }
- if (!isCurrentStatsValidForTheQuery(oldt, validWriteIds, true)) {
- // Make sure we set the flag to invalid regardless of the current
value.
+ int maxRetries = MetastoreConf.getIntVar(conf,
ConfVars.METASTORE_S4U_NOWAIT_MAX_RETRIES);
+ long sleepInterval = MetastoreConf.getTimeVar(conf,
+ ConfVars.METASTORE_S4U_NOWAIT_RETRY_SLEEP_INTERVAL,
TimeUnit.MILLISECONDS);
+ Map<String, String> result = new RetryingExecutor<>(maxRetries, () -> {
+ Ref<Exception> exceptionRef = new Ref<>();
+ String savePoint = "uts_" + ThreadLocalRandom.current().nextInt(10000)
+ "_" + System.nanoTime();
+ setTransactionSavePoint(savePoint);
+ executePlainSQL(
+ sqlGenerator.addForUpdateNoWait("SELECT \"TBL_ID\" FROM \"TBLS\"
WHERE \"TBL_ID\" = " + mTable.getId()),
+ exception -> {
+ rollbackTransactionToSavePoint(savePoint);
+ exceptionRef.t = exception;
+ });
+ if (exceptionRef.t != null) {
+ throw new RetryingExecutor.RetryException(exceptionRef.t);
+ }
+ pm.refresh(mTable);
+ Table table = convertToTable(mTable);
+ List<String> colNames = new ArrayList<>();
+ for (ColumnStatisticsObj statsObj : statsObjs) {
+ colNames.add(statsObj.getColName());
+ }
+
+ Map<String, MTableColumnStatistics> oldStats = Maps.newHashMap();
+ List<MTableColumnStatistics> stats = getMTableColumnStatistics(table,
colNames, colStats.getEngine());
+ for (MTableColumnStatistics cStat : stats) {
+ oldStats.put(cStat.getColName(), cStat);
+ }
+
+ for (ColumnStatisticsObj statsObj : statsObjs) {
+ MTableColumnStatistics mStatsObj =
StatObjectConverter.convertToMTableColumnStatistics(mTable, statsDesc,
+ statsObj, colStats.getEngine());
+ writeMTableColumnStatistics(table, mStatsObj,
oldStats.get(statsObj.getColName()));
+ // There is no need to add colname again, otherwise we will get
duplicate colNames.
+ }
+
+ // TODO: (HIVE-20109) ideally the col stats stats should be in
colstats, not in the table!
+ // Set the table properties
+ // No need to check again if it exists.
+ String dbname = table.getDbName();
+ String name = table.getTableName();
+ MTable oldt = mTable;
+ Map<String, String> newParams = new HashMap<>(table.getParameters());
+ StatsSetupConst.setColumnStatsState(newParams, colNames);
+ boolean isTxn = TxnUtils.isTransactionalTable(oldt.getParameters());
+ if (isTxn) {
+ if (!areTxnStatsSupported) {
StatsSetupConst.setBasicStatsState(newParams,
StatsSetupConst.FALSE);
- LOG.info("Removed COLUMN_STATS_ACCURATE from the parameters of the
table "
- + dbname + "." + name);
+ } else {
+ String errorMsg =
verifyStatsChangeCtx(TableName.getDbTable(dbname, name), oldt.getParameters(),
newParams,
+ writeId, validWriteIds, true);
+ if (errorMsg != null) {
+ throw new MetaException(errorMsg);
+ }
+ if (!isCurrentStatsValidForTheQuery(oldt, validWriteIds, true)) {
+ // Make sure we set the flag to invalid regardless of the
current value.
+ StatsSetupConst.setBasicStatsState(newParams,
StatsSetupConst.FALSE);
+ LOG.info("Removed COLUMN_STATS_ACCURATE from the parameters of
the table " + dbname + "." + name);
+ }
+ oldt.setWriteId(writeId);
}
- oldt.setWriteId(writeId);
}
- }
- oldt.setParameters(newParams);
-
+ oldt.setParameters(newParams);
+ return newParams;
+ }).onRetry(e -> e instanceof RetryingExecutor.RetryException)
+
.commandName("updateTableColumnStatistics").sleepInterval(sleepInterval,
interval ->
+ ThreadLocalRandom.current().nextLong(sleepInterval) + 30).run();
committed = commitTransaction();
// TODO: similar to update...Part, this used to do "return committed;";
makes little sense.
- return committed ? newParams : null;
+ return committed ? result : null;
} finally {
- try {
- rollbackAndCleanup(committed, null);
- } finally {
- tableLock.unlock();
- }
+ LOG.debug("{} updateTableColumnStatistics took {}ms, success: {}",
+ new TableName(catName, statsDesc.getDbName(),
statsDesc.getTableName()),
+ System.currentTimeMillis() - start, committed);
+ rollbackAndCleanup(committed, null);
}
}
- private Lock getTableLockFor(String dbName, String tblName) {
- return tablelocks.get(dbName + "." + tblName);
- }
-
/**
* Get partition's column stats
*
@@ -11091,93 +11080,50 @@ public List<WriteEventInfo> getAllWriteEventInfo(long
txnId, String dbName, Stri
return writeEventInfoList;
}
- private void prepareQuotes() throws SQLException {
+ private void executePlainSQL(String sql, Consumer<Exception>
exceptionConsumer)
+ throws SQLException {
String s = dbType.getPrepareTxnStmt();
- if (s != null) {
- assert pm.currentTransaction().isActive();
- JDOConnection jdoConn = pm.getDataStoreConnection();
- try (Statement statement = ((Connection)
jdoConn.getNativeConnection()).createStatement()) {
+ assert pm.currentTransaction().isActive();
+ JDOConnection jdoConn = pm.getDataStoreConnection();
+ Connection conn = (Connection) jdoConn.getNativeConnection();
+ try (Statement statement = conn.createStatement()) {
+ if (s != null) {
statement.execute(s);
- } finally {
- jdoConn.close();
}
+ try {
+ statement.execute(sql);
+ } catch (SQLException e) {
+ if (exceptionConsumer != null) {
+ exceptionConsumer.accept(e);
+ } else {
+ throw e;
+ }
+ }
+ } finally {
+ jdoConn.close();
}
}
private void lockNotificationSequenceForUpdate() throws MetaException {
+ int maxRetries =
+ MetastoreConf.getIntVar(conf,
ConfVars.NOTIFICATION_SEQUENCE_LOCK_MAX_RETRIES);
+ long sleepInterval = MetastoreConf.getTimeVar(conf,
+ ConfVars.NOTIFICATION_SEQUENCE_LOCK_RETRY_SLEEP_INTERVAL,
TimeUnit.MILLISECONDS);
if (sqlGenerator.getDbProduct().isDERBY() && directSql != null) {
// Derby doesn't allow FOR UPDATE to lock the row being selected (See
https://db.apache
// .org/derby/docs/10.1/ref/rrefsqlj31783.html) . So lock the whole
table. Since there's
// only one row in the table, this shouldn't cause any performance
degradation.
- new RetryingExecutor(conf, () -> {
+ new RetryingExecutor<Void>(maxRetries, () -> {
directSql.lockDbTable("NOTIFICATION_SEQUENCE");
- }).run();
+ return null;
+
}).commandName("lockNotificationSequenceForUpdate").sleepInterval(sleepInterval).run();
} else {
String selectQuery = "select \"NEXT_EVENT_ID\" from
\"NOTIFICATION_SEQUENCE\"";
String lockingQuery = sqlGenerator.addForUpdateClause(selectQuery);
- new RetryingExecutor(conf, () -> {
- prepareQuotes();
- try (QueryWrapper query = new
QueryWrapper(pm.newQuery("javax.jdo.query.SQL", lockingQuery))) {
- query.setUnique(true);
- // only need to execute it to get db Lock
- query.execute();
- }
- }).run();
- }
- }
-
- static class RetryingExecutor {
- interface Command {
- void process() throws Exception;
- }
-
- private static Logger LOG =
LoggerFactory.getLogger(RetryingExecutor.class);
- private final int maxRetries;
- private final long sleepInterval;
- private int currentRetries = 0;
- private final Command command;
-
- RetryingExecutor(Configuration config, Command command) {
- this.maxRetries =
- MetastoreConf.getIntVar(config,
ConfVars.NOTIFICATION_SEQUENCE_LOCK_MAX_RETRIES);
- this.sleepInterval = MetastoreConf.getTimeVar(config,
- ConfVars.NOTIFICATION_SEQUENCE_LOCK_RETRY_SLEEP_INTERVAL,
TimeUnit.MILLISECONDS);
- this.command = command;
- }
-
- public void run() throws MetaException {
- while (true) {
- try {
- command.process();
- break;
- } catch (Exception e) {
- LOG.info(
- "Attempting to acquire the DB log notification lock: {} out of
{}" +
- " retries", currentRetries, maxRetries, e);
- if (currentRetries >= maxRetries) {
- String message =
- "Couldn't acquire the DB log notification lock because we
reached the maximum"
- + " # of retries: " + maxRetries
- + " retries. If this happens too often, then is
recommended to "
- + "increase the maximum number of retries on the"
- + " hive.notification.sequence.lock.max.retries
configuration";
- LOG.error(message, e);
- throw new MetaException(message + " :: " + e.getMessage());
- }
- currentRetries++;
- try {
- Thread.sleep(sleepInterval);
- } catch (InterruptedException e1) {
- String msg = "Couldn't acquire the DB notification log lock on " +
currentRetries
- + " retry, because the following error: ";
- LOG.error(msg, e1);
- throw new MetaException(msg + e1.getMessage());
- }
- }
- }
- }
- public long getSleepInterval() {
- return sleepInterval;
+ new RetryingExecutor<Void>(maxRetries, () -> {
+ executePlainSQL(lockingQuery, null);
+ return null;
+
}).commandName("lockNotificationSequenceForUpdate").sleepInterval(sleepInterval).run();
}
}
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java
index b793345dd94..7b3382265b2 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java
@@ -127,7 +127,11 @@ private List<String> createInsertValuesStmt(String
tblColumns, List<String> rows
* construct. If the DB doesn't support, return original select.
*/
public String addForUpdateClause(String selectStatement) throws
MetaException {
- return dbProduct.addForUpdateClause(selectStatement);
+ return dbProduct.addForUpdateClause(selectStatement, false);
+ }
+
+ public String addForUpdateNoWait(String selectStatement) throws
MetaException {
+ return dbProduct.addForUpdateClause(selectStatement, true);
}
/**
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/RetryingExecutor.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/RetryingExecutor.java
new file mode 100644
index 00000000000..dcaa10be831
--- /dev/null
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/RetryingExecutor.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.utils;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.util.concurrent.Callable;
+import java.util.function.Function;
+import java.util.function.Predicate;
+
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RetryingExecutor<T> {
+ private static final Logger LOG =
LoggerFactory.getLogger(RetryingExecutor.class);
+
+ private final int maxRetries;
+ private long sleepInterval = 1000;
+ private final Callable<T> command;
+ private Predicate<Exception> retryPolicy;
+ private int currentRetries = 0;
+ private String commandName;
+ private Function<Long, Long> sleepIntervalFunc;
+
+ public RetryingExecutor(int maxRetries, Callable<T> command) {
+ this.maxRetries = maxRetries;
+ this.command = command;
+ // default commandName unless specified
+ this.commandName = StackWalker.getInstance()
+ .walk(frames -> frames
+ .skip(1)
+ .findFirst()
+ .map(StackWalker.StackFrame::getMethodName)).get();
+ }
+
+ public RetryingExecutor<T> onRetry(Predicate<Exception> retryPolicy) {
+ this.retryPolicy = retryPolicy;
+ return this;
+ }
+
+ public RetryingExecutor<T> commandName(String name) {
+ this.commandName = name;
+ return this;
+ }
+
+ public RetryingExecutor<T> sleepInterval(long sleepInterval) {
+ return sleepInterval(sleepInterval, null);
+ }
+
+ public RetryingExecutor<T> sleepInterval(long sleepInterval,
+ Function<Long, Long> sleepIntervalFunc) {
+ this.sleepInterval = sleepInterval;
+ this.sleepIntervalFunc = sleepIntervalFunc;
+ return this;
+ }
+
+ public T run() throws MetaException {
+ while (true) {
+ try {
+ return command.call();
+ } catch (Exception e) {
+ checkException(e);
+ LOG.info("Attempting to retry the command:{} in {} out of {} retries",
+ commandName, currentRetries, maxRetries, e);
+ if (currentRetries >= maxRetries) {
+ String message = "Couldn't finish the command: " + commandName +
+ " because we reached the maximum of retries: " + maxRetries;
+ LOG.error(message, e);
+ throw new MetaException(message + " :: " + e.getMessage());
+ }
+ currentRetries++;
+ try {
+ Thread.sleep(getSleepInterval());
+ } catch (InterruptedException e1) {
+ String msg = "Couldn't run the command: " + commandName + " in " +
currentRetries +
+ " retry, because the following error: ";
+ LOG.error(msg, e1);
+ throw new MetaException(msg + e1.getMessage());
+ }
+ }
+ }
+ }
+
+ private void checkException(Exception e) throws MetaException {
+ if (retryPolicy != null && !retryPolicy.test(e)) {
+ String message = "See a fatal exception, avoid to retry the command:" +
commandName;
+ LOG.info(message, e);
+ String errorMessage = ExceptionUtils.getMessage(e);
+ if (e instanceof InvocationTargetException || e instanceof
UndeclaredThrowableException) {
+ errorMessage = ExceptionUtils.getMessage(e.getCause());
+ }
+ Throwable rootCause = ExceptionUtils.getRootCause(e);
+ errorMessage += (rootCause == null ? "" : ("\nRoot cause: " +
rootCause));
+ throw new MetaException(message + " :: " + errorMessage);
+ }
+ }
+
+ public static class RetryException extends Exception {
+ private static final long serialVersionUID = 1L;
+
+ public RetryException(Exception ex) {
+ super(ex);
+ }
+
+ public RetryException(String msg) {
+ super(msg);
+ }
+ }
+
+ public long getSleepInterval() {
+ if (sleepIntervalFunc != null) {
+ this.sleepInterval = sleepIntervalFunc.apply(sleepInterval);
+ }
+ return sleepInterval;
+ }
+}
diff --git
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/DummyCustomRDBMS.java
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/DummyCustomRDBMS.java
index 444edd8812f..aa2ba60934a 100644
---
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/DummyCustomRDBMS.java
+++
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/DummyCustomRDBMS.java
@@ -74,7 +74,7 @@ public String isWithinCheckInterval(String expr, long
intervalInSeconds) {
return "DummyIsWithin";
}
@Override
- public String addForUpdateClause(String selectStatement) {
+ public String addForUpdateClause(String selectStatement, boolean noWait) {
return selectStatement + " for update";
}
@Override
diff --git
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestObjectStore.java
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestObjectStore.java
index 07af321300e..f755aec6b19 100644
---
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestObjectStore.java
+++
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestObjectStore.java
@@ -21,7 +21,6 @@
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
-import org.apache.hadoop.hive.metastore.ObjectStore.RetryingExecutor;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.annotation.MetastoreUnitTest;
import org.apache.hadoop.hive.metastore.api.Catalog;
@@ -81,6 +80,7 @@
import org.apache.hadoop.hive.metastore.model.MNotificationNextId;
import org.apache.hadoop.hive.metastore.model.MTable;
import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils;
+import org.apache.hadoop.hive.metastore.utils.RetryingExecutor;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
@@ -1273,7 +1273,12 @@ public void testQueryCloseOnError() throws Exception {
@Test
public void testRetryingExecutorSleep() throws Exception {
- RetryingExecutor re = new
ObjectStore.RetryingExecutor(MetastoreConf.newMetastoreConf(), null);
+ int maxRetries =
+ MetastoreConf.getIntVar(conf,
ConfVars.NOTIFICATION_SEQUENCE_LOCK_MAX_RETRIES);
+ long sleepInterval = MetastoreConf.getTimeVar(conf,
+ ConfVars.NOTIFICATION_SEQUENCE_LOCK_RETRY_SLEEP_INTERVAL,
TimeUnit.MILLISECONDS);
+ RetryingExecutor<Void> re = new RetryingExecutor<Void>(maxRetries, null)
+ .sleepInterval(sleepInterval);
Assert.assertTrue("invalid sleep value", re.getSleepInterval() >= 0);
}