deniskuzZ commented on code in PR #4384:
URL: https://github.com/apache/hive/pull/4384#discussion_r1337244926


##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java:
##########
@@ -5755,79 +5785,80 @@ private void timeOutLocks(Connection dbConn) {
    */
   @RetrySemantics.Idempotent
   public void performTimeOuts() {
-    Connection dbConn = null;
-    Statement stmt = null;
-    ResultSet rs = null;
-    try {
-      dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+    jdbcResource.bindDataSource(POOL_TX);
+    try (TransactionContext context = 
jdbcResource.getTransactionManager().getTransaction(PROPAGATION_REQUIRED)) {
       //We currently commit after selecting the TXNS to abort.  So whether 
SERIALIZABLE
       //READ_COMMITTED, the effect is the same.  We could use FOR UPDATE on 
Select from TXNS
       //and do the whole performTimeOuts() in a single huge transaction, but 
the only benefit
       //would be to make sure someone cannot heartbeat one of these txns at 
the same time.
       //The attempt to heartbeat would block and fail immediately after it's 
unblocked.
       //With current (RC + multiple txns) implementation it is possible for 
someone to send
-      //heartbeat at the very end of the expire interval, and just after the 
Select from TXNS
+      //heartbeat at the very end of the expiry interval, and just after the 
Select from TXNS
       //is made, in which case heartbeat will succeed but txn will still be 
Aborted.
       //Solving this corner case is not worth the perf penalty.  The client 
should heartbeat in a
       //timely way.
-      timeOutLocks(dbConn);
-      while(true) {
-        stmt = dbConn.createStatement();
+      timeOutLocks();
+      while (true) {
         String s = " \"TXN_ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = " + 
TxnStatus.OPEN +
-          " AND (" +
-                "\"TXN_TYPE\" != " + TxnType.REPL_CREATED.getValue() +
-                " AND \"TXN_LAST_HEARTBEAT\" <  " + getEpochFn(dbProduct) + 
"-" + timeout +
-             " OR " +
-                " \"TXN_TYPE\" = " + TxnType.REPL_CREATED.getValue() +
-                " AND \"TXN_LAST_HEARTBEAT\" <  " + getEpochFn(dbProduct) + 
"-" + replicationTxnTimeout +
-             ")";
+            " AND (" +
+            "\"TXN_TYPE\" != " + TxnType.REPL_CREATED.getValue() +
+            " AND \"TXN_LAST_HEARTBEAT\" <  " + getEpochFn(dbProduct) + "-" + 
timeout +
+            " OR " +
+            " \"TXN_TYPE\" = " + TxnType.REPL_CREATED.getValue() +
+            " AND \"TXN_LAST_HEARTBEAT\" <  " + getEpochFn(dbProduct) + "-" + 
replicationTxnTimeout +
+            ")";
         //safety valve for extreme cases
         s = sqlGenerator.addLimitClause(10 * TIMED_OUT_TXN_ABORT_BATCH_SIZE, 
s);
+
         LOG.debug("Going to execute query <{}>", s);
-        rs = stmt.executeQuery(s);
-        if(!rs.next()) {
-          return;//no more timedout txns
-        }
-        List<List<Long>> timedOutTxns = new ArrayList<>();
-        List<Long> currentBatch = new 
ArrayList<>(TIMED_OUT_TXN_ABORT_BATCH_SIZE);
-        timedOutTxns.add(currentBatch);
-        do {
-          if(currentBatch.size() == TIMED_OUT_TXN_ABORT_BATCH_SIZE) {
-            currentBatch = new ArrayList<>(TIMED_OUT_TXN_ABORT_BATCH_SIZE);
-            timedOutTxns.add(currentBatch);
+        List<List<Long>> timedOutTxns = 
jdbcResource.getJdbcTemplate().query(s, rs -> {
+          List<List<Long>> txnbatch = new ArrayList<>();
+          List<Long> currentBatch = new 
ArrayList<>(TIMED_OUT_TXN_ABORT_BATCH_SIZE);
+          while (rs.next()) {
+            currentBatch.add(rs.getLong(1));
+            if (currentBatch.size() == TIMED_OUT_TXN_ABORT_BATCH_SIZE) {
+              txnbatch.add(currentBatch);
+              currentBatch = new ArrayList<>(TIMED_OUT_TXN_ABORT_BATCH_SIZE);
+            }
           }
-          currentBatch.add(rs.getLong(1));
-        } while(rs.next());
-        dbConn.commit();
-        close(rs, stmt, null);
+          if (currentBatch.size() > 0) {
+            txnbatch.add(currentBatch);
+          }
+          return txnbatch;
+        });
+        //noinspection DataFlowIssue
+        if (timedOutTxns.size() == 0) {
+          jdbcResource.getTransactionManager().commit(context);

Review Comment:
   why is that needed? commit when nothing changed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to