veghlaci05 commented on code in PR #4566:
URL: https://github.com/apache/hive/pull/4566#discussion_r1410639712


##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java:
##########
@@ -248,199 +180,114 @@
  * the metstore call stack should have logic not to retry.  There are {@link 
RetrySemantics}
  * annotations to document the behavior.
  */
+@SuppressWarnings("SqlSourceToSinkFlow")
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
   
-  private static final String TXN_TMP_STATE = "_";
-  private static final String DEFAULT_POOL_NAME = "default";
-
-  // Lock states
-  static final protected char LOCK_ACQUIRED = 'a';
-  static final protected char LOCK_WAITING = 'w';
-
-  private static final int ALLOWED_REPEATED_DEADLOCKS = 10;
   private static final Logger LOG = 
LoggerFactory.getLogger(TxnHandler.class.getName());
 
-  private static DataSource connPool;
-  private static DataSource connPoolMutex;
-
-  private static final String MANUAL_RETRY = "ManualRetry";
-
-  // Query definitions
-  private static final String HIVE_LOCKS_INSERT_QRY = "INSERT INTO 
\"HIVE_LOCKS\" ( " +
-      "\"HL_LOCK_EXT_ID\", \"HL_LOCK_INT_ID\", \"HL_TXNID\", \"HL_DB\", 
\"HL_TABLE\", \"HL_PARTITION\", " +
-      "\"HL_LOCK_STATE\", \"HL_LOCK_TYPE\", \"HL_LAST_HEARTBEAT\", 
\"HL_USER\", \"HL_HOST\", \"HL_AGENT_INFO\") " +
-      "VALUES (?, ?, ?, ?, ?, ?, ?, ?, %s, ?, ?, ?)";
-  private static final String TXN_COMPONENTS_INSERT_QUERY = "INSERT INTO 
\"TXN_COMPONENTS\" (" +
-      "\"TC_TXNID\", \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", 
\"TC_OPERATION_TYPE\", \"TC_WRITEID\")" +
-      " VALUES (?, ?, ?, ?, ?, ?)";
-  private static final String TXN_COMPONENTS_DP_DELETE_QUERY = "DELETE FROM 
\"TXN_COMPONENTS\" " +
-      "WHERE \"TC_TXNID\" = ? AND \"TC_DATABASE\" = ? AND \"TC_TABLE\" = ? AND 
\"TC_PARTITION\" IS NULL";
-  private static final String INCREMENT_NEXT_LOCK_ID_QUERY = "UPDATE 
\"NEXT_LOCK_ID\" SET \"NL_NEXT\" = %s";
-  private static final String UPDATE_HIVE_LOCKS_EXT_ID_QUERY = "UPDATE 
\"HIVE_LOCKS\" SET \"HL_LOCK_EXT_ID\" = %s " +
-      "WHERE \"HL_LOCK_EXT_ID\" = %s";
-  private static final String SELECT_WRITE_ID_QUERY = "SELECT \"T2W_WRITEID\" 
FROM \"TXN_TO_WRITE_ID\" WHERE" +
-      " \"T2W_DATABASE\" = ? AND \"T2W_TABLE\" = ? AND \"T2W_TXNID\" = ?";
-  private static final String COMPL_TXN_COMPONENTS_INSERT_QUERY = "INSERT INTO 
\"COMPLETED_TXN_COMPONENTS\" " +
-      "(\"CTC_TXNID\"," + " \"CTC_DATABASE\", \"CTC_TABLE\", 
\"CTC_PARTITION\", \"CTC_WRITEID\", \"CTC_UPDATE_DELETE\")" +
-      " VALUES (%s, ?, ?, ?, ?, %s)";
-  private static final String TXNS_INSERT_QRY = "INSERT INTO \"TXNS\" " +
-      "(\"TXN_STATE\", \"TXN_STARTED\", \"TXN_LAST_HEARTBEAT\", \"TXN_USER\", 
\"TXN_HOST\", \"TXN_TYPE\") " +
-      "VALUES(?,%s,%s,?,?,?)";
-  private static final String SELECT_LOCKS_FOR_LOCK_ID_QUERY = "SELECT 
\"HL_LOCK_EXT_ID\", \"HL_LOCK_INT_ID\", " +
-      "\"HL_DB\", \"HL_TABLE\", \"HL_PARTITION\", \"HL_LOCK_STATE\", 
\"HL_LOCK_TYPE\", \"HL_TXNID\" " +
-      "FROM \"HIVE_LOCKS\" WHERE \"HL_LOCK_EXT_ID\" = ?";
-  private static final String SELECT_TIMED_OUT_LOCKS_QUERY = "SELECT DISTINCT 
\"HL_LOCK_EXT_ID\" FROM \"HIVE_LOCKS\" " +
-      "WHERE \"HL_LAST_HEARTBEAT\" < %s - :timeout AND \"HL_TXNID\" = 0";
-  private static final String TXN_TO_WRITE_ID_INSERT_QUERY = "INSERT INTO 
\"TXN_TO_WRITE_ID\" (\"T2W_TXNID\", " +
-      "\"T2W_DATABASE\", \"T2W_TABLE\", \"T2W_WRITEID\") VALUES (?, ?, ?, ?)";
-  private static final String MIN_HISTORY_WRITE_ID_INSERT_QUERY = "INSERT INTO 
\"MIN_HISTORY_WRITE_ID\" (\"MH_TXNID\", " +
-      "\"MH_DATABASE\", \"MH_TABLE\", \"MH_WRITEID\") VALUES (?, ?, ?, ?)";
-  private static final String SELECT_NWI_NEXT_FROM_NEXT_WRITE_ID =
-      "SELECT \"NWI_NEXT\" FROM \"NEXT_WRITE_ID\" WHERE \"NWI_DATABASE\" = ? 
AND \"NWI_TABLE\" = ?";
-  private static final String SELECT_METRICS_INFO_QUERY =
-      "SELECT * FROM (SELECT COUNT(*) FROM \"TXN_TO_WRITE_ID\") \"TTWID\" 
CROSS JOIN (" +
-      "SELECT COUNT(*) FROM \"COMPLETED_TXN_COMPONENTS\") \"CTC\" CROSS JOIN 
(" +
-      "SELECT COUNT(*), MIN(\"TXN_ID\"), ({0} - MIN(\"TXN_STARTED\"))/1000 
FROM \"TXNS\" WHERE \"TXN_STATE\"='" +
-        TxnStatus.OPEN + "' AND \"TXN_TYPE\" = "+ 
TxnType.REPL_CREATED.getValue() +") \"TR\" CROSS JOIN (" +
-      "SELECT COUNT(*), MIN(\"TXN_ID\"), ({0} - MIN(\"TXN_STARTED\"))/1000 
FROM \"TXNS\" WHERE \"TXN_STATE\"='" +
-        TxnStatus.OPEN + "' AND \"TXN_TYPE\" != "+ 
TxnType.REPL_CREATED.getValue() +") \"T\" CROSS JOIN (" +
-      "SELECT COUNT(*), MIN(\"TXN_ID\"), ({0} - MIN(\"TXN_STARTED\"))/1000 
FROM \"TXNS\" WHERE \"TXN_STATE\"='" +
-        TxnStatus.ABORTED + "') \"A\" CROSS JOIN (" +
-      "SELECT COUNT(*), ({0} - MIN(\"HL_ACQUIRED_AT\"))/1000 FROM 
\"HIVE_LOCKS\") \"HL\" CROSS JOIN (" +
-      "SELECT ({0} - MIN(\"CQ_COMMIT_TIME\"))/1000 from \"COMPACTION_QUEUE\" 
WHERE " +
-          "\"CQ_STATE\"=''" + READY_FOR_CLEANING + "'') OLDEST_CLEAN";
-  private static final String SELECT_TABLES_WITH_X_ABORTED_TXNS =
-      "SELECT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\" FROM 
\"TXN_COMPONENTS\" " +
-          "INNER JOIN \"TXNS\" ON \"TC_TXNID\" = \"TXN_ID\" WHERE 
\"TXN_STATE\" = " + TxnStatus.ABORTED +
-      " GROUP BY \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\" HAVING 
COUNT(\"TXN_ID\") > ?";
-  
-  private static final String  EXCL_CTAS_ERR_MSG = 
-      "Failed to initiate a concurrent CTAS operation with the same table 
name, lockInfo : %s";
-  private static final String ZERO_WAIT_READ_ERR_MSG = "Unable to acquire read 
lock due to an existing exclusive lock {%s}";
-
-
-  protected List<TransactionalMetaStoreEventListener> transactionalListeners;
-
   // Maximum number of open transactions that's allowed
   private static volatile int maxOpenTxns = 0;
   // Whether number of open transactions reaches the threshold
   private static volatile boolean tooManyOpenTxns = false;
+  // Current number of open txns
+  private static AtomicInteger numOpenTxns;
+
+  private static volatile boolean initialized = false;
+  private static DataSource connPool;
+  private static DataSource connPoolMutex;
+  protected static DataSource connPoolCompactor;
+
+  protected static DatabaseProduct dbProduct;
+  protected static SQLGenerator sqlGenerator;
+  protected static long openTxnTimeOutMillis;
 
   /**
    * Number of consecutive deadlocks we have seen
    */
-  private int deadlockCnt;
-  private long deadlockRetryInterval;
   protected Configuration conf;
-  protected static DatabaseProduct dbProduct;
-  protected static SQLGenerator sqlGenerator;
-  private static long openTxnTimeOutMillis;
 
+  protected List<TransactionalMetaStoreEventListener> transactionalListeners;
   // (End user) Transaction timeout, in milliseconds.
   private long timeout;
   private long replicationTxnTimeout;
 
-  private int maxBatchSize;
-  private String identifierQuoteString; // quotes to use for quoting tables, 
where necessary
-  private long retryInterval;
-  private int retryLimit;
-  private int retryNum;
-  // Current number of open txns
-  private AtomicInteger numOpenTxns;
-  // Whether to use min_history_level table or not.
-  // At startup we read it from the config, but set it to false if 
min_history_level does nto exists.
-  static boolean useMinHistoryLevel;
-  static boolean useMinHistoryWriteId;
-
-  private static SqlRetryHandler sqlRetryHandler;
-  protected static MultiDataSourceJdbcResource jdbcResource;
+  private MutexAPI mutexAPI;
+  private TxnLockHandler txnLockHandler;
+  private SqlRetryHandler sqlRetryHandler;
+  protected MultiDataSourceJdbcResource jdbcResource;
 
-  /**
-   * Derby specific concurrency control
-   */
-  private static final ReentrantLock derbyLock = new ReentrantLock(true);
-  /**
-   * must be static since even in UT there may be > 1 instance of TxnHandler
-   * (e.g. via Compactor services)
-   */
-  private final static ConcurrentHashMap<String, Semaphore> derbyKey2Lock = 
new ConcurrentHashMap<>();
   private static final String hostname = JavaUtils.hostname();
 
-  // Private methods should never catch SQLException and then throw 
MetaException.  The public
-  // methods depend on SQLException coming back so they can detect and handle 
deadlocks.  Private
-  // methods should only throw MetaException when they explicitly know there's 
a logic error and
-  // they want to throw past the public methods.
-  //
-  // All public methods that write to the database have to check for deadlocks 
when a SQLException
-  // comes back and handle it if they see one.  This has to be done with the 
connection pooling
-  // in mind.  To do this they should call checkRetryable() AFTER rolling back 
the db transaction,
-  // and then they should catch RetryException and call themselves 
recursively. See commitTxn for an example.
-
   public TxnHandler() {
   }
 
   /**
    * This is logically part of c'tor and must be called prior to any other 
method.
    * Not physically part of c'tor due to use of reflection
    */
-  public void setConf(Configuration conf){
+  public void setConf(Configuration conf) {
     this.conf = conf;
 
-    int maxPoolSize = MetastoreConf.getIntVar(conf, 
ConfVars.CONNECTION_POOLING_MAX_CONNECTIONS);
-    synchronized (TxnHandler.class) {
-      try (DataSourceProvider.DataSourceNameConfigurator configurator =
-               new DataSourceProvider.DataSourceNameConfigurator(conf, 
POOL_TX)) {
-        if (connPool == null) {
-          connPool = setupJdbcConnectionPool(conf, maxPoolSize);
-        }
-        if (connPoolMutex == null) {
-          configurator.resetName(POOL_MUTEX);
-          connPoolMutex = setupJdbcConnectionPool(conf, maxPoolSize);
-        }
-      }
-      if (dbProduct == null) {
-        try (Connection dbConn = 
getDbConn(Connection.TRANSACTION_READ_COMMITTED)) {
-          determineDatabaseProduct(dbConn);
-        } catch (SQLException e) {
-          LOG.error("Unable to determine database product", e);
-          throw new RuntimeException(e);
+    if (!initialized) {
+      synchronized (TxnHandler.class) {
+        if (!initialized) {
+          try (DataSourceProvider.DataSourceNameConfigurator configurator =
+                   new DataSourceProvider.DataSourceNameConfigurator(conf, 
POOL_TX)) {
+            int maxPoolSize = MetastoreConf.getIntVar(conf, 
ConfVars.CONNECTION_POOLING_MAX_CONNECTIONS);
+            if (connPool == null) {
+              connPool = setupJdbcConnectionPool(conf, maxPoolSize);
+            }
+            if (connPoolMutex == null) {
+              configurator.resetName(POOL_MUTEX);
+              connPoolMutex = setupJdbcConnectionPool(conf, maxPoolSize);
+            }
+            if (connPoolCompactor == null) {
+              configurator.resetName(POOL_COMPACTOR);
+              connPoolCompactor = setupJdbcConnectionPool(conf, maxPoolSize);

Review Comment:
   Good catch! for compaction pool that value needs to be used. Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org

Reply via email to