veghlaci05 commented on code in PR #4566:
URL: https://github.com/apache/hive/pull/4566#discussion_r1422534487


##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java:
##########
@@ -6052,450 +1096,53 @@ protected synchronized static DataSource 
setupJdbcConnectionPool(Configuration c
         throw new RuntimeException(e);
       }
     } else {
-      String connectionPooler = MetastoreConf.getVar(conf, 
ConfVars.CONNECTION_POOLING_TYPE).toLowerCase();
+      String connectionPooler = MetastoreConf.getVar(conf, 
MetastoreConf.ConfVars.CONNECTION_POOLING_TYPE).toLowerCase();
       if ("none".equals(connectionPooler)) {
         LOG.info("Choosing not to pool JDBC connections");
-        return new NoPoolConnectionPool(conf);
+        return new NoPoolConnectionPool(conf, dbProduct);
       } else {
         throw new RuntimeException("Unknown JDBC connection pooling " + 
connectionPooler);
       }
     }
   }
 
-  /**
-   * Returns true if {@code ex} should be retried
-   */
-  static boolean isRetryable(Configuration conf, Exception ex) {
-    if(ex instanceof SQLException) {
-      SQLException sqlException = (SQLException)ex;
-      if (MANUAL_RETRY.equalsIgnoreCase(sqlException.getSQLState())) {
-        // Manual retry exception was thrown
-        return true;
-      }
-      if ("08S01".equalsIgnoreCase(sqlException.getSQLState())) {
-        //in MSSQL this means Communication Link Failure
-        return true;
-      }
-      if ("ORA-08176".equalsIgnoreCase(sqlException.getSQLState()) ||
-        sqlException.getMessage().contains("consistent read failure; rollback 
data not available")) {
-        return true;
-      }
-
-      String regex = MetastoreConf.getVar(conf, 
ConfVars.TXN_RETRYABLE_SQLEX_REGEX);
-      if (regex != null && !regex.isEmpty()) {
-        String[] patterns = regex.split(",(?=\\S)");
-        String message = getMessage((SQLException)ex);
-        for (String p : patterns) {
-          if (Pattern.matches(p, message)) {
-            return true;
-          }
-        }
-      }
-      //see also https://issues.apache.org/jira/browse/HIVE-9938
-    }
-    return false;
-  }
-
-  private boolean isDuplicateKeyError(SQLException ex) {
-    return dbProduct.isDuplicateKeyError(ex);
-  }
-
-  private static String getMessage(SQLException ex) {
-    return ex.getMessage() + " (SQLState=" + ex.getSQLState() + ", ErrorCode=" 
+ ex.getErrorCode() + ")";
-  }
-  static String quoteString(String input) {
-    return "'" + input + "'";
-  }
-  static String quoteChar(char c) {
-    return "'" + c + "'";
-  }
-
-  /**
-   * {@link #lockInternal()} and {@link #unlockInternal()} are used to 
serialize those operations that require
-   * Select ... For Update to sequence operations properly.  In practice that 
means when running
-   * with Derby database.  See more notes at class level.
-   */
-  protected void lockInternal() {
-    if(dbProduct.isDERBY()) {
-      derbyLock.lock();
-    }
-  }
-  protected void unlockInternal() {
-    if(dbProduct.isDERBY()) {
-      derbyLock.unlock();
-    }
-  }
   @Override
   @RetrySemantics.Idempotent
   public MutexAPI getMutexAPI() {

Review Comment:
   removed



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java:
##########
@@ -6052,450 +1096,53 @@ protected synchronized static DataSource 
setupJdbcConnectionPool(Configuration c
         throw new RuntimeException(e);
       }
     } else {
-      String connectionPooler = MetastoreConf.getVar(conf, 
ConfVars.CONNECTION_POOLING_TYPE).toLowerCase();
+      String connectionPooler = MetastoreConf.getVar(conf, 
MetastoreConf.ConfVars.CONNECTION_POOLING_TYPE).toLowerCase();
       if ("none".equals(connectionPooler)) {
         LOG.info("Choosing not to pool JDBC connections");
-        return new NoPoolConnectionPool(conf);
+        return new NoPoolConnectionPool(conf, dbProduct);
       } else {
         throw new RuntimeException("Unknown JDBC connection pooling " + 
connectionPooler);
       }
     }
   }
 
-  /**
-   * Returns true if {@code ex} should be retried
-   */
-  static boolean isRetryable(Configuration conf, Exception ex) {
-    if(ex instanceof SQLException) {
-      SQLException sqlException = (SQLException)ex;
-      if (MANUAL_RETRY.equalsIgnoreCase(sqlException.getSQLState())) {
-        // Manual retry exception was thrown
-        return true;
-      }
-      if ("08S01".equalsIgnoreCase(sqlException.getSQLState())) {
-        //in MSSQL this means Communication Link Failure
-        return true;
-      }
-      if ("ORA-08176".equalsIgnoreCase(sqlException.getSQLState()) ||
-        sqlException.getMessage().contains("consistent read failure; rollback 
data not available")) {
-        return true;
-      }
-
-      String regex = MetastoreConf.getVar(conf, 
ConfVars.TXN_RETRYABLE_SQLEX_REGEX);
-      if (regex != null && !regex.isEmpty()) {
-        String[] patterns = regex.split(",(?=\\S)");
-        String message = getMessage((SQLException)ex);
-        for (String p : patterns) {
-          if (Pattern.matches(p, message)) {
-            return true;
-          }
-        }
-      }
-      //see also https://issues.apache.org/jira/browse/HIVE-9938
-    }
-    return false;
-  }
-
-  private boolean isDuplicateKeyError(SQLException ex) {
-    return dbProduct.isDuplicateKeyError(ex);
-  }
-
-  private static String getMessage(SQLException ex) {
-    return ex.getMessage() + " (SQLState=" + ex.getSQLState() + ", ErrorCode=" 
+ ex.getErrorCode() + ")";
-  }
-  static String quoteString(String input) {
-    return "'" + input + "'";
-  }
-  static String quoteChar(char c) {
-    return "'" + c + "'";
-  }
-
-  /**
-   * {@link #lockInternal()} and {@link #unlockInternal()} are used to 
serialize those operations that require
-   * Select ... For Update to sequence operations properly.  In practice that 
means when running
-   * with Derby database.  See more notes at class level.
-   */
-  protected void lockInternal() {
-    if(dbProduct.isDERBY()) {
-      derbyLock.lock();
-    }
-  }
-  protected void unlockInternal() {
-    if(dbProduct.isDERBY()) {
-      derbyLock.unlock();
-    }
-  }
   @Override
   @RetrySemantics.Idempotent
   public MutexAPI getMutexAPI() {
-    return this;
+    return mutexAPI;
   }
 
   @Override
   public LockHandle acquireLock(String key) throws MetaException {
-    /**
-     * The implementation here is a bit kludgey but done so that code 
exercised by unit tests
-     * (which run against Derby which has no support for select for update) is 
as similar to
-     * production code as possible.
-     * In particular, with Derby we always run in a single process with a 
single metastore and
-     * the absence of For Update is handled via a Semaphore.  The later would 
strictly speaking
-     * make the SQL statements below unnecessary (for Derby), but then they 
would not be tested.
-     */
-    Connection dbConn = null;
-    Statement stmt = null;
-    ResultSet rs = null;
-    boolean needToCloseConn = true;
-    try {
-      try {
-        String sqlStmt = sqlGenerator.addForUpdateClause("SELECT 
\"MT_COMMENT\", \"MT_KEY2\" FROM \"AUX_TABLE\" WHERE \"MT_KEY1\"=" + 
quoteString(key));
-        lockInternal();
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED, 
connPoolMutex);
-        stmt = dbConn.createStatement();
-        LOG.debug("About to execute SQL: {}", sqlStmt);
-        rs = stmt.executeQuery(sqlStmt);
-        if (!rs.next()) {
-          close(rs);
-          try {
-            stmt.executeUpdate("INSERT INTO \"AUX_TABLE\" (\"MT_KEY1\", 
\"MT_KEY2\") VALUES(" + quoteString(key) + ", 0)");
-            dbConn.commit();
-          } catch (SQLException ex) {
-            if (!isDuplicateKeyError(ex)) {
-              throw new RuntimeException("Unable to lock " + quoteString(key) 
+ " due to: " + getMessage(ex), ex);
-            }
-            //if here, it means a concrurrent acquireLock() inserted the 'key'
-
-            //rollback is done for the benefit of Postgres which throws 
(SQLState=25P02, ErrorCode=0) if
-            //you attempt any stmt in a txn which had an error.
-            dbConn.rollback();
-          }
-          rs = stmt.executeQuery(sqlStmt);
-          if (!rs.next()) {
-            throw new IllegalStateException("Unable to lock " + 
quoteString(key) + ".  Expected row in AUX_TABLE is missing.");
-          }
-        }
-        Semaphore derbySemaphore = null;
-        if(dbProduct.isDERBY()) {
-          derbyKey2Lock.putIfAbsent(key, new Semaphore(1));
-          derbySemaphore =  derbyKey2Lock.get(key);
-          derbySemaphore.acquire();
-        }
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("{} locked by {}", quoteString(key), 
quoteString(TxnHandler.hostname));
-        }
-        needToCloseConn = false;  //The connection is good, we need not close 
it
-        //OK, so now we have a lock
-        return new LockHandleImpl(dbConn, stmt, rs, key, derbySemaphore);
-      } catch (SQLException ex) {
-        checkRetryable(ex, "acquireLock(" + key + ")");
-        throw new MetaException("Unable to lock " + quoteString(key) + " due 
to: " + getMessage(ex) + "; " + StringUtils.stringifyException(ex));
-      }
-      catch(InterruptedException ex) {
-        throw new MetaException("Unable to lock " + quoteString(key) + " due 
to: " + ex.getMessage() + StringUtils.stringifyException(ex));
-      }
-      finally {
-        if (needToCloseConn) {
-          rollbackDBConn(dbConn);
-          close(rs, stmt, dbConn);
-        }
-        unlockInternal();
-      }
-    }
-    catch(RetryException ex) {
-      return acquireLock(key);
-    }
+    return mutexAPI.acquireLock(key);
   }
 
   @Override
-  public void acquireLock(String key, LockHandle handle) {
-    //the idea is that this will use LockHandle.dbConn
-    throw new NotImplementedException("acquireLock(String, LockHandle) is not 
implemented");
+  public void acquireLock(String key, LockHandle handle) throws MetaException {
+    mutexAPI.acquireLock(key, handle);
   }
 
   /**
    * Acquire the global txn lock, used to mutex the openTxn and commitTxn.
    * @param shared either SHARED_READ or EXCLUSIVE
-   * @throws SQLException
    */
-  private void acquireTxnLock(Statement stmt, boolean shared) throws 
SQLException, MetaException {
+  private void acquireTxnLock(boolean shared) throws MetaException {
     String sqlStmt = sqlGenerator.createTxnLockStatement(shared);
-    stmt.execute(sqlStmt);
-    LOG.debug("TXN lock locked by {} in mode {}", 
quoteString(TxnHandler.hostname), shared);
-  }
-
-  private static final class LockHandleImpl implements LockHandle {
-    private final Connection dbConn;
-    private final Statement stmt;
-    private final ResultSet rs;
-    private final Semaphore derbySemaphore;
-    private final String key;
-    private final Long lastUpdateTime;
-
-    LockHandleImpl(Connection conn, Statement stmt, ResultSet rs, String key, 
Semaphore derbySemaphore) {
-      this.dbConn = conn;
-      this.stmt = stmt;
-      this.rs = rs;
-      this.derbySemaphore = derbySemaphore;
-      if(derbySemaphore != null) {
-        //oterwise it may later release permit acquired by someone else
-        assert derbySemaphore.availablePermits() == 0 : "Expected locked 
Semaphore";
-      }
-      this.key = key;
-      Long lastUpdateTime;
-      try {
-        lastUpdateTime = rs.getLong("MT_KEY2");
-      } catch (SQLException e) {
-        LOG.warn("Couldn't resolve MT_KEY2 for MT_KEY1=" + 
quoteString(this.key), e);
-        lastUpdateTime = -1L;
-      }
-      this.lastUpdateTime = lastUpdateTime;
-    }
-
-    @Override
-    public void releaseLocks() {
-      rollbackDBConn(dbConn);
-      TxnHandler.close(rs, stmt, dbConn);
-      if(derbySemaphore != null) {
-        derbySemaphore.release();
-      }
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("{} unlocked by {}", quoteString(key), 
quoteString(TxnHandler.hostname));
-      }
-    }
-
-    @Override
-    public Long getLastUpdateTime() {
-      return lastUpdateTime;
-    }
-
-    @Override
-    public void releaseLocks(Long timestamp) {
-      try {
-        stmt.executeUpdate("UPDATE \"AUX_TABLE\" SET \"MT_KEY2\" = "+ 
timestamp + " WHERE \"MT_KEY1\"=" + quoteString(key));
-        dbConn.commit();
-      } catch (SQLException ex) {
-        LOG.warn("Unable to update MT_KEY2 value for MT_KEY1=" + key, ex);
-        rollbackDBConn(dbConn);
-      }
-      TxnHandler.close(rs, stmt, dbConn);
-      if(derbySemaphore != null) {
-        derbySemaphore.release();
-      }
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("{} unlocked by {}", quoteString(key), 
quoteString(TxnHandler.hostname));
-      }
-    }
-
-    @Override
-    public void close() {
-      releaseLocks();
-    }
-
-  }
-
-
-  private static class NoPoolConnectionPool implements DataSource {
-    // Note that this depends on the fact that no-one in this class calls 
anything but
-    // getConnection.  If you want to use any of the Logger or wrap calls 
you'll have to
-    // implement them.
-    private final Configuration conf;
-    private Driver driver;
-    private String connString;
-    private String user;
-    private String passwd;
-
-    public NoPoolConnectionPool(Configuration conf) {
-      this.conf = conf;
-    }
-
-    @Override
-    public Connection getConnection() throws SQLException {
-      if (user == null) {
-        user = DataSourceProvider.getMetastoreJdbcUser(conf);
-        passwd = DataSourceProvider.getMetastoreJdbcPasswd(conf);
-      }
-      return getConnection(user, passwd);
-    }
-
-    @Override
-    public Connection getConnection(String username, String password) throws 
SQLException {
-      // Find the JDBC driver
-      if (driver == null) {
-        String driverName = MetastoreConf.getVar(conf, 
ConfVars.CONNECTION_DRIVER);
-        if (driverName == null || driverName.equals("")) {
-          String msg = "JDBC driver for transaction db not set in 
configuration " +
-              "file, need to set " + ConfVars.CONNECTION_DRIVER.getVarname();
-          LOG.error(msg);
-          throw new RuntimeException(msg);
-        }
-        try {
-          LOG.info("Going to load JDBC driver {}", driverName);
-          driver = (Driver) Class.forName(driverName).newInstance();
-        } catch (InstantiationException e) {
-          throw new RuntimeException("Unable to instantiate driver " + 
driverName + ", " +
-              e.getMessage(), e);
-        } catch (IllegalAccessException e) {
-          throw new RuntimeException(
-              "Unable to access driver " + driverName + ", " + e.getMessage(),
-              e);
-        } catch (ClassNotFoundException e) {
-          throw new RuntimeException("Unable to find driver " + driverName + 
", " + e.getMessage(),
-              e);
-        }
-        connString = MetastoreConf.getVar(conf, ConfVars.CONNECT_URL_KEY);
-      }
-
-      try {
-        LOG.info("Connecting to transaction db with connection string {}", 
connString);
-        Properties connectionProps = new Properties();
-        connectionProps.setProperty("user", username);
-        connectionProps.setProperty("password", password);
-        Connection conn = driver.connect(connString, connectionProps);
-        String prepareStmt = dbProduct != null ? dbProduct.getPrepareTxnStmt() 
: null;
-        if (prepareStmt != null) {
-          try (Statement stmt = conn.createStatement()) {
-            stmt.execute(prepareStmt);
-          }
-        }
-        conn.setAutoCommit(false);
-        return conn;
-      } catch (SQLException e) {
-        throw new RuntimeException("Unable to connect to transaction manager 
using " + connString
-            + ", " + e.getMessage(), e);
-      }
-    }
-
-    @Override
-    public PrintWriter getLogWriter() throws SQLException {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void setLogWriter(PrintWriter out) throws SQLException {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void setLoginTimeout(int seconds) throws SQLException {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public int getLoginTimeout() throws SQLException {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public java.util.logging.Logger getParentLogger() throws 
SQLFeatureNotSupportedException {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public <T> T unwrap(Class<T> iface) throws SQLException {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public boolean isWrapperFor(Class<?> iface) throws SQLException {
-      throw new UnsupportedOperationException();
-    }
+    jdbcResource.getJdbcTemplate().getJdbcTemplate().execute((Statement stmt) 
-> {
+        stmt.execute(sqlStmt);
+        return null;
+    });
+    LOG.debug("TXN lock locked by '{}' in mode {}", TxnHandler.hostname, 
shared);
   }
 
   @Override
   @RetrySemantics.SafeToRetry

Review Comment:
   removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to