deniskuzZ commented on code in PR #4566:
URL: https://github.com/apache/hive/pull/4566#discussion_r1419073617


##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java:
##########
@@ -5785,261 +1068,22 @@ private void timeOutLocks() {
    */
   @RetrySemantics.Idempotent
   public void performTimeOuts() {
-    jdbcResource.bindDataSource(POOL_TX);
-    try (TransactionContext context = 
jdbcResource.getTransactionManager().getTransaction(PROPAGATION_REQUIRED)) {
-      //We currently commit after selecting the TXNS to abort.  So whether 
SERIALIZABLE
-      //READ_COMMITTED, the effect is the same.  We could use FOR UPDATE on 
Select from TXNS
-      //and do the whole performTimeOuts() in a single huge transaction, but 
the only benefit
-      //would be to make sure someone cannot heartbeat one of these txns at 
the same time.
-      //The attempt to heartbeat would block and fail immediately after it's 
unblocked.
-      //With current (RC + multiple txns) implementation it is possible for 
someone to send
-      //heartbeat at the very end of the expiry interval, and just after the 
Select from TXNS
-      //is made, in which case heartbeat will succeed but txn will still be 
Aborted.
-      //Solving this corner case is not worth the perf penalty.  The client 
should heartbeat in a
-      //timely way.
-      timeOutLocks();
-      while (true) {
-        String s = " \"TXN_ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = " + 
TxnStatus.OPEN +
-            " AND (" +
-            "\"TXN_TYPE\" != " + TxnType.REPL_CREATED.getValue() +
-            " AND \"TXN_LAST_HEARTBEAT\" <  " + getEpochFn(dbProduct) + "-" + 
timeout +
-            " OR " +
-            " \"TXN_TYPE\" = " + TxnType.REPL_CREATED.getValue() +
-            " AND \"TXN_LAST_HEARTBEAT\" <  " + getEpochFn(dbProduct) + "-" + 
replicationTxnTimeout +
-            ")";
-        //safety valve for extreme cases
-        s = sqlGenerator.addLimitClause(10 * TIMED_OUT_TXN_ABORT_BATCH_SIZE, 
s);
-
-        LOG.debug("Going to execute query <{}>", s);
-        List<List<Long>> timedOutTxns = 
jdbcResource.getJdbcTemplate().query(s, rs -> {
-          List<List<Long>> txnbatch = new ArrayList<>();
-          List<Long> currentBatch = new 
ArrayList<>(TIMED_OUT_TXN_ABORT_BATCH_SIZE);
-          while (rs.next()) {
-            currentBatch.add(rs.getLong(1));
-            if (currentBatch.size() == TIMED_OUT_TXN_ABORT_BATCH_SIZE) {
-              txnbatch.add(currentBatch);
-              currentBatch = new ArrayList<>(TIMED_OUT_TXN_ABORT_BATCH_SIZE);
-            }
-          }
-          if (currentBatch.size() > 0) {
-            txnbatch.add(currentBatch);
-          }
-          return txnbatch;
-        });
-        //noinspection DataFlowIssue
-        if (timedOutTxns.size() == 0) {
-          jdbcResource.getTransactionManager().commit(context);
-          return;
-        }
-
-        Object savePoint = context.getTransactionStatus().createSavepoint();
-
-        int numTxnsAborted = 0;
-        for (List<Long> batchToAbort : timedOutTxns) {
-          context.getTransactionStatus().releaseSavepoint(savePoint);
-          savePoint = context.getTransactionStatus().createSavepoint();
-          if (abortTxns(jdbcResource.getConnection(), batchToAbort, true, 
false, false, TxnErrorMsg.ABORT_TIMEOUT) == batchToAbort.size()) {
-            numTxnsAborted += batchToAbort.size();
-            //todo: add TXNS.COMMENT filed and set it to 'aborted by system 
due to timeout'
-          } else {
-            //could not abort all txns in this batch - this may happen because 
in parallel with this
-            //operation there was activity on one of the txns in this batch 
(commit/abort/heartbeat)
-            //This is not likely but may happen if client experiences long 
pause between heartbeats or
-            //unusually long/extreme pauses between heartbeat() calls and 
other logic in checkLock(),
-            //lock(), etc.
-            context.getTransactionStatus().rollbackToSavepoint(savePoint);
-          }
-        }
-        LOG.info("Aborted {} transaction(s) due to timeout", numTxnsAborted);
-        if (MetastoreConf.getBoolVar(conf, 
MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON)) {
-          
Metrics.getOrCreateCounter(MetricsConstants.TOTAL_NUM_TIMED_OUT_TXNS).inc(numTxnsAborted);
-        }
-      }
-    } catch (MetaException | SQLException e) {
-      LOG.warn("Aborting timed out transactions failed due to " + 
e.getMessage(), e);
-    } finally {
-      jdbcResource.unbindDataSource();
-    }
+    new PerformTimeoutsFunction(timeout, 
replicationTxnTimeout).execute(jdbcResource);
   }
 
   @Override
   @RetrySemantics.ReadOnly
   public void countOpenTxns() throws MetaException {
-    Connection dbConn = null;
-    Statement stmt = null;
-    ResultSet rs = null;
-    try {
-      try {
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        stmt = dbConn.createStatement();
-        String s = "SELECT COUNT(*) FROM \"TXNS\" WHERE \"TXN_STATE\" = " + 
TxnStatus.OPEN;
-        LOG.debug("Going to execute query <{}>", s);
-        rs = stmt.executeQuery(s);
-        if (!rs.next()) {
-          LOG.error("Transaction database not properly configured, can't find 
txn_state from TXNS.");
-        } else {
-          Long numOpen = rs.getLong(1);
-          if (numOpen > Integer.MAX_VALUE) {
-            LOG.error("Open transaction count above {}, can't count that 
high!", Integer.MAX_VALUE);
-          } else {
-            numOpenTxns.set(numOpen.intValue());
-          }
-        }
-      } catch (SQLException e) {
-        LOG.info("Failed to update number of open transactions");
-        checkRetryable(e, "countOpenTxns()");
-      } finally {
-        close(rs, stmt, dbConn);
-      }
-    } catch (RetryException e) {
-      countOpenTxns();
-    }
-  }
-
-  /**
-   * Add min history level entry for each generated txn record
-   * @param dbConn Connection
-   * @param txnIds new transaction ids
-   * @deprecated Remove this method when min_history_level table is dropped
-   * @throws SQLException ex
-   */
-  @Deprecated
-  private void addTxnToMinHistoryLevel(Connection dbConn, List<Long> txnIds, 
long minOpenTxnId) throws SQLException {
-    if (!useMinHistoryLevel) {
-      return;
-    }
-    // Need to register minimum open txnid for current transactions into 
MIN_HISTORY table.
-    try (Statement stmt = dbConn.createStatement()) {
-      List<String> rows = txnIds.stream().map(txnId -> txnId + ", " + 
minOpenTxnId).collect(Collectors.toList());
-
-      // Insert transaction entries into MIN_HISTORY_LEVEL.
-      List<String> inserts =
-          sqlGenerator.createInsertValuesStmt("\"MIN_HISTORY_LEVEL\" 
(\"MHL_TXNID\", \"MHL_MIN_OPEN_TXNID\")", rows);
-      for (String insert : inserts) {
-        LOG.debug("Going to execute insert <{}>", insert);
-        stmt.execute(insert);
-      }
-      LOG.info("Added entries to MIN_HISTORY_LEVEL for current txns: ({}) with 
min_open_txn: {}", txnIds, minOpenTxnId);
-    } catch (SQLException e) {
-      if (dbProduct.isTableNotExistsError(e)) {
-        // If the table does not exists anymore, we disable the flag and start 
to work the new way
-        // This enables to switch to the new functionality without a restart
-        useMinHistoryLevel = false;
-      } else {
-        throw e;
-      }
+    int openTxns = jdbcResource.execute(new CountOpenTxnsHandler());
+    if (openTxns > -1) {
+      numOpenTxns.set(openTxns);
     }
   }
 
   @Override
   @RetrySemantics.SafeToRetry

Review Comment:
   should we keep this? please check for othe methods as well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to