veghlaci05 commented on code in PR #4566:
URL: https://github.com/apache/hive/pull/4566#discussion_r1422534487
##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java:
##########
@@ -6052,450 +1096,53 @@ protected synchronized static DataSource
setupJdbcConnectionPool(Configuration c
throw new RuntimeException(e);
}
} else {
- String connectionPooler = MetastoreConf.getVar(conf,
ConfVars.CONNECTION_POOLING_TYPE).toLowerCase();
+ String connectionPooler = MetastoreConf.getVar(conf,
MetastoreConf.ConfVars.CONNECTION_POOLING_TYPE).toLowerCase();
if ("none".equals(connectionPooler)) {
LOG.info("Choosing not to pool JDBC connections");
- return new NoPoolConnectionPool(conf);
+ return new NoPoolConnectionPool(conf, dbProduct);
} else {
throw new RuntimeException("Unknown JDBC connection pooling " +
connectionPooler);
}
}
}
- /**
- * Returns true if {@code ex} should be retried
- */
- static boolean isRetryable(Configuration conf, Exception ex) {
- if(ex instanceof SQLException) {
- SQLException sqlException = (SQLException)ex;
- if (MANUAL_RETRY.equalsIgnoreCase(sqlException.getSQLState())) {
- // Manual retry exception was thrown
- return true;
- }
- if ("08S01".equalsIgnoreCase(sqlException.getSQLState())) {
- //in MSSQL this means Communication Link Failure
- return true;
- }
- if ("ORA-08176".equalsIgnoreCase(sqlException.getSQLState()) ||
- sqlException.getMessage().contains("consistent read failure; rollback
data not available")) {
- return true;
- }
-
- String regex = MetastoreConf.getVar(conf,
ConfVars.TXN_RETRYABLE_SQLEX_REGEX);
- if (regex != null && !regex.isEmpty()) {
- String[] patterns = regex.split(",(?=\\S)");
- String message = getMessage((SQLException)ex);
- for (String p : patterns) {
- if (Pattern.matches(p, message)) {
- return true;
- }
- }
- }
- //see also https://issues.apache.org/jira/browse/HIVE-9938
- }
- return false;
- }
-
- private boolean isDuplicateKeyError(SQLException ex) {
- return dbProduct.isDuplicateKeyError(ex);
- }
-
- private static String getMessage(SQLException ex) {
- return ex.getMessage() + " (SQLState=" + ex.getSQLState() + ", ErrorCode="
+ ex.getErrorCode() + ")";
- }
- static String quoteString(String input) {
- return "'" + input + "'";
- }
- static String quoteChar(char c) {
- return "'" + c + "'";
- }
-
- /**
- * {@link #lockInternal()} and {@link #unlockInternal()} are used to
serialize those operations that require
- * Select ... For Update to sequence operations properly. In practice that
means when running
- * with Derby database. See more notes at class level.
- */
- protected void lockInternal() {
- if(dbProduct.isDERBY()) {
- derbyLock.lock();
- }
- }
- protected void unlockInternal() {
- if(dbProduct.isDERBY()) {
- derbyLock.unlock();
- }
- }
@Override
@RetrySemantics.Idempotent
public MutexAPI getMutexAPI() {
Review Comment:
removed
##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java:
##########
@@ -6052,450 +1096,53 @@ protected synchronized static DataSource
setupJdbcConnectionPool(Configuration c
throw new RuntimeException(e);
}
} else {
- String connectionPooler = MetastoreConf.getVar(conf,
ConfVars.CONNECTION_POOLING_TYPE).toLowerCase();
+ String connectionPooler = MetastoreConf.getVar(conf,
MetastoreConf.ConfVars.CONNECTION_POOLING_TYPE).toLowerCase();
if ("none".equals(connectionPooler)) {
LOG.info("Choosing not to pool JDBC connections");
- return new NoPoolConnectionPool(conf);
+ return new NoPoolConnectionPool(conf, dbProduct);
} else {
throw new RuntimeException("Unknown JDBC connection pooling " +
connectionPooler);
}
}
}
- /**
- * Returns true if {@code ex} should be retried
- */
- static boolean isRetryable(Configuration conf, Exception ex) {
- if(ex instanceof SQLException) {
- SQLException sqlException = (SQLException)ex;
- if (MANUAL_RETRY.equalsIgnoreCase(sqlException.getSQLState())) {
- // Manual retry exception was thrown
- return true;
- }
- if ("08S01".equalsIgnoreCase(sqlException.getSQLState())) {
- //in MSSQL this means Communication Link Failure
- return true;
- }
- if ("ORA-08176".equalsIgnoreCase(sqlException.getSQLState()) ||
- sqlException.getMessage().contains("consistent read failure; rollback
data not available")) {
- return true;
- }
-
- String regex = MetastoreConf.getVar(conf,
ConfVars.TXN_RETRYABLE_SQLEX_REGEX);
- if (regex != null && !regex.isEmpty()) {
- String[] patterns = regex.split(",(?=\\S)");
- String message = getMessage((SQLException)ex);
- for (String p : patterns) {
- if (Pattern.matches(p, message)) {
- return true;
- }
- }
- }
- //see also https://issues.apache.org/jira/browse/HIVE-9938
- }
- return false;
- }
-
- private boolean isDuplicateKeyError(SQLException ex) {
- return dbProduct.isDuplicateKeyError(ex);
- }
-
- private static String getMessage(SQLException ex) {
- return ex.getMessage() + " (SQLState=" + ex.getSQLState() + ", ErrorCode="
+ ex.getErrorCode() + ")";
- }
- static String quoteString(String input) {
- return "'" + input + "'";
- }
- static String quoteChar(char c) {
- return "'" + c + "'";
- }
-
- /**
- * {@link #lockInternal()} and {@link #unlockInternal()} are used to
serialize those operations that require
- * Select ... For Update to sequence operations properly. In practice that
means when running
- * with Derby database. See more notes at class level.
- */
- protected void lockInternal() {
- if(dbProduct.isDERBY()) {
- derbyLock.lock();
- }
- }
- protected void unlockInternal() {
- if(dbProduct.isDERBY()) {
- derbyLock.unlock();
- }
- }
@Override
@RetrySemantics.Idempotent
public MutexAPI getMutexAPI() {
- return this;
+ return mutexAPI;
}
@Override
public LockHandle acquireLock(String key) throws MetaException {
- /**
- * The implementation here is a bit kludgey but done so that code
exercised by unit tests
- * (which run against Derby which has no support for select for update) is
as similar to
- * production code as possible.
- * In particular, with Derby we always run in a single process with a
single metastore and
- * the absence of For Update is handled via a Semaphore. The later would
strictly speaking
- * make the SQL statements below unnecessary (for Derby), but then they
would not be tested.
- */
- Connection dbConn = null;
- Statement stmt = null;
- ResultSet rs = null;
- boolean needToCloseConn = true;
- try {
- try {
- String sqlStmt = sqlGenerator.addForUpdateClause("SELECT
\"MT_COMMENT\", \"MT_KEY2\" FROM \"AUX_TABLE\" WHERE \"MT_KEY1\"=" +
quoteString(key));
- lockInternal();
- dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED,
connPoolMutex);
- stmt = dbConn.createStatement();
- LOG.debug("About to execute SQL: {}", sqlStmt);
- rs = stmt.executeQuery(sqlStmt);
- if (!rs.next()) {
- close(rs);
- try {
- stmt.executeUpdate("INSERT INTO \"AUX_TABLE\" (\"MT_KEY1\",
\"MT_KEY2\") VALUES(" + quoteString(key) + ", 0)");
- dbConn.commit();
- } catch (SQLException ex) {
- if (!isDuplicateKeyError(ex)) {
- throw new RuntimeException("Unable to lock " + quoteString(key)
+ " due to: " + getMessage(ex), ex);
- }
- //if here, it means a concrurrent acquireLock() inserted the 'key'
-
- //rollback is done for the benefit of Postgres which throws
(SQLState=25P02, ErrorCode=0) if
- //you attempt any stmt in a txn which had an error.
- dbConn.rollback();
- }
- rs = stmt.executeQuery(sqlStmt);
- if (!rs.next()) {
- throw new IllegalStateException("Unable to lock " +
quoteString(key) + ". Expected row in AUX_TABLE is missing.");
- }
- }
- Semaphore derbySemaphore = null;
- if(dbProduct.isDERBY()) {
- derbyKey2Lock.putIfAbsent(key, new Semaphore(1));
- derbySemaphore = derbyKey2Lock.get(key);
- derbySemaphore.acquire();
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("{} locked by {}", quoteString(key),
quoteString(TxnHandler.hostname));
- }
- needToCloseConn = false; //The connection is good, we need not close
it
- //OK, so now we have a lock
- return new LockHandleImpl(dbConn, stmt, rs, key, derbySemaphore);
- } catch (SQLException ex) {
- checkRetryable(ex, "acquireLock(" + key + ")");
- throw new MetaException("Unable to lock " + quoteString(key) + " due
to: " + getMessage(ex) + "; " + StringUtils.stringifyException(ex));
- }
- catch(InterruptedException ex) {
- throw new MetaException("Unable to lock " + quoteString(key) + " due
to: " + ex.getMessage() + StringUtils.stringifyException(ex));
- }
- finally {
- if (needToCloseConn) {
- rollbackDBConn(dbConn);
- close(rs, stmt, dbConn);
- }
- unlockInternal();
- }
- }
- catch(RetryException ex) {
- return acquireLock(key);
- }
+ return mutexAPI.acquireLock(key);
}
@Override
- public void acquireLock(String key, LockHandle handle) {
- //the idea is that this will use LockHandle.dbConn
- throw new NotImplementedException("acquireLock(String, LockHandle) is not
implemented");
+ public void acquireLock(String key, LockHandle handle) throws MetaException {
+ mutexAPI.acquireLock(key, handle);
}
/**
* Acquire the global txn lock, used to mutex the openTxn and commitTxn.
* @param shared either SHARED_READ or EXCLUSIVE
- * @throws SQLException
*/
- private void acquireTxnLock(Statement stmt, boolean shared) throws
SQLException, MetaException {
+ private void acquireTxnLock(boolean shared) throws MetaException {
String sqlStmt = sqlGenerator.createTxnLockStatement(shared);
- stmt.execute(sqlStmt);
- LOG.debug("TXN lock locked by {} in mode {}",
quoteString(TxnHandler.hostname), shared);
- }
-
- private static final class LockHandleImpl implements LockHandle {
- private final Connection dbConn;
- private final Statement stmt;
- private final ResultSet rs;
- private final Semaphore derbySemaphore;
- private final String key;
- private final Long lastUpdateTime;
-
- LockHandleImpl(Connection conn, Statement stmt, ResultSet rs, String key,
Semaphore derbySemaphore) {
- this.dbConn = conn;
- this.stmt = stmt;
- this.rs = rs;
- this.derbySemaphore = derbySemaphore;
- if(derbySemaphore != null) {
- //oterwise it may later release permit acquired by someone else
- assert derbySemaphore.availablePermits() == 0 : "Expected locked
Semaphore";
- }
- this.key = key;
- Long lastUpdateTime;
- try {
- lastUpdateTime = rs.getLong("MT_KEY2");
- } catch (SQLException e) {
- LOG.warn("Couldn't resolve MT_KEY2 for MT_KEY1=" +
quoteString(this.key), e);
- lastUpdateTime = -1L;
- }
- this.lastUpdateTime = lastUpdateTime;
- }
-
- @Override
- public void releaseLocks() {
- rollbackDBConn(dbConn);
- TxnHandler.close(rs, stmt, dbConn);
- if(derbySemaphore != null) {
- derbySemaphore.release();
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("{} unlocked by {}", quoteString(key),
quoteString(TxnHandler.hostname));
- }
- }
-
- @Override
- public Long getLastUpdateTime() {
- return lastUpdateTime;
- }
-
- @Override
- public void releaseLocks(Long timestamp) {
- try {
- stmt.executeUpdate("UPDATE \"AUX_TABLE\" SET \"MT_KEY2\" = "+
timestamp + " WHERE \"MT_KEY1\"=" + quoteString(key));
- dbConn.commit();
- } catch (SQLException ex) {
- LOG.warn("Unable to update MT_KEY2 value for MT_KEY1=" + key, ex);
- rollbackDBConn(dbConn);
- }
- TxnHandler.close(rs, stmt, dbConn);
- if(derbySemaphore != null) {
- derbySemaphore.release();
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("{} unlocked by {}", quoteString(key),
quoteString(TxnHandler.hostname));
- }
- }
-
- @Override
- public void close() {
- releaseLocks();
- }
-
- }
-
-
- private static class NoPoolConnectionPool implements DataSource {
- // Note that this depends on the fact that no-one in this class calls
anything but
- // getConnection. If you want to use any of the Logger or wrap calls
you'll have to
- // implement them.
- private final Configuration conf;
- private Driver driver;
- private String connString;
- private String user;
- private String passwd;
-
- public NoPoolConnectionPool(Configuration conf) {
- this.conf = conf;
- }
-
- @Override
- public Connection getConnection() throws SQLException {
- if (user == null) {
- user = DataSourceProvider.getMetastoreJdbcUser(conf);
- passwd = DataSourceProvider.getMetastoreJdbcPasswd(conf);
- }
- return getConnection(user, passwd);
- }
-
- @Override
- public Connection getConnection(String username, String password) throws
SQLException {
- // Find the JDBC driver
- if (driver == null) {
- String driverName = MetastoreConf.getVar(conf,
ConfVars.CONNECTION_DRIVER);
- if (driverName == null || driverName.equals("")) {
- String msg = "JDBC driver for transaction db not set in
configuration " +
- "file, need to set " + ConfVars.CONNECTION_DRIVER.getVarname();
- LOG.error(msg);
- throw new RuntimeException(msg);
- }
- try {
- LOG.info("Going to load JDBC driver {}", driverName);
- driver = (Driver) Class.forName(driverName).newInstance();
- } catch (InstantiationException e) {
- throw new RuntimeException("Unable to instantiate driver " +
driverName + ", " +
- e.getMessage(), e);
- } catch (IllegalAccessException e) {
- throw new RuntimeException(
- "Unable to access driver " + driverName + ", " + e.getMessage(),
- e);
- } catch (ClassNotFoundException e) {
- throw new RuntimeException("Unable to find driver " + driverName +
", " + e.getMessage(),
- e);
- }
- connString = MetastoreConf.getVar(conf, ConfVars.CONNECT_URL_KEY);
- }
-
- try {
- LOG.info("Connecting to transaction db with connection string {}",
connString);
- Properties connectionProps = new Properties();
- connectionProps.setProperty("user", username);
- connectionProps.setProperty("password", password);
- Connection conn = driver.connect(connString, connectionProps);
- String prepareStmt = dbProduct != null ? dbProduct.getPrepareTxnStmt()
: null;
- if (prepareStmt != null) {
- try (Statement stmt = conn.createStatement()) {
- stmt.execute(prepareStmt);
- }
- }
- conn.setAutoCommit(false);
- return conn;
- } catch (SQLException e) {
- throw new RuntimeException("Unable to connect to transaction manager
using " + connString
- + ", " + e.getMessage(), e);
- }
- }
-
- @Override
- public PrintWriter getLogWriter() throws SQLException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void setLogWriter(PrintWriter out) throws SQLException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void setLoginTimeout(int seconds) throws SQLException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public int getLoginTimeout() throws SQLException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public java.util.logging.Logger getParentLogger() throws
SQLFeatureNotSupportedException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public <T> T unwrap(Class<T> iface) throws SQLException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean isWrapperFor(Class<?> iface) throws SQLException {
- throw new UnsupportedOperationException();
- }
+ jdbcResource.getJdbcTemplate().getJdbcTemplate().execute((Statement stmt)
-> {
+ stmt.execute(sqlStmt);
+ return null;
+ });
+ LOG.debug("TXN lock locked by '{}' in mode {}", TxnHandler.hostname,
shared);
}
@Override
@RetrySemantics.SafeToRetry
Review Comment:
removed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]