veghlaci05 commented on code in PR #4384:
URL: https://github.com/apache/hive/pull/4384#discussion_r1293109026


##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java:
##########
@@ -1001,38 +261,22 @@ public void removeDuplicateCompletedTxnComponents() 
throws MetaException {
   @RetrySemantics.SafeToRetry
   public void cleanEmptyAbortedAndCommittedTxns() throws MetaException {
     LOG.info("Start to clean empty aborted or committed TXNS");
-    try {
-      try (Connection dbConn = 
getDbConn(Connection.TRANSACTION_READ_COMMITTED, connPoolCompaction)) {
-        try (PreparedStatement stmt = 
dbConn.prepareStatement(DELETE_FAILED_TXNS_SQL)) {
-          //Aborted and committed are terminal states, so nothing about the 
txn can change
-          //after that, so READ COMMITTED is sufficient.
-          /*
-           * Only delete aborted / committed transaction in a way that 
guarantees two things:
-           * 1. never deletes anything that is inside the TXN_OPENTXN_TIMEOUT 
window
-           * 2. never deletes the maximum txnId even if it is before the 
TXN_OPENTXN_TIMEOUT window
-           */
-          long lowWaterMark = getOpenTxnTimeoutLowBoundaryTxnId(dbConn);
-          stmt.setLong(1, lowWaterMark);
-          LOG.debug("Going to execute query <{}>", DELETE_FAILED_TXNS_SQL);
-          int rc = stmt.executeUpdate();
-          LOG.debug("Removed {} empty Aborted and Committed transactions from 
TXNS", rc);
-          LOG.debug("Going to commit");
-          dbConn.commit();
-        } catch (SQLException e) {
-          LOG.error("Unable to delete from txns table " + e.getMessage());
-          LOG.debug("Going to rollback");
-          rollbackDBConn(dbConn);
-          checkRetryable(e, "cleanEmptyAbortedTxns");
-          throw new MetaException("Unable to delete from txns table " + 
e.getMessage());
-        }
-      } catch (SQLException e) {
-        LOG.error(DB_FAILED_TO_CONNECT + e.getMessage());
-        checkRetryable(e, "cleanEmptyAbortedTxns");
-        throw new MetaException(DB_FAILED_TO_CONNECT + e.getMessage());
-      }
-    } catch (RetryException e) {
-      cleanEmptyAbortedAndCommittedTxns();
-    }
+    //Aborted and committed are terminal states, so nothing about the txn can 
change
+    //after that, so READ COMMITTED is sufficient.
+    /*
+     * Only delete aborted / committed transaction in a way that guarantees 
two things:
+     * 1. never deletes anything that is inside the TXN_OPENTXN_TIMEOUT window
+     * 2. never deletes the maximum txnId even if it is before the 
TXN_OPENTXN_TIMEOUT window
+     */
+    long lowWaterMark = getOpenTxnTimeoutLowBoundaryTxnId();
+    String query = "DELETE FROM \"TXNS\" WHERE \"TXN_ID\" NOT IN (SELECT 
\"TC_TXNID\" FROM \"TXN_COMPONENTS\") " +
+        "AND (\"TXN_STATE\" = :abortedState OR \"TXN_STATE\" = 
:committedState) AND \"TXN_ID\" < :txnId";
+
+    LOG.debug("Going to execute query <{}>", query);
+    dataSourceWrapper.getJdbcTemplate().update(query, new 
MapSqlParameterSource()

Review Comment:
   Sure, we can. for the very simple cases I did not create a separate class, 
but to be consistent, I can create for these too.



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java:
##########
@@ -678,316 +215,39 @@ private void setCleanerStart(Connection dbConn, 
CompactionInfo info, Long timest
   @RetrySemantics.CannotRetry
   public void markCleaned(CompactionInfo info) throws MetaException {
     LOG.debug("Running markCleaned with CompactionInfo: {}", info);
-    try {
-      Connection dbConn = null;
-      PreparedStatement pStmt = null;
-      ResultSet rs = null;
-      try {
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED, 
connPoolCompaction);
-        if (!info.isAbortedTxnCleanup()) {
-          String s = "INSERT INTO \"COMPLETED_COMPACTIONS\"(\"CC_ID\", 
\"CC_DATABASE\", "
-              + "\"CC_TABLE\", \"CC_PARTITION\", \"CC_STATE\", \"CC_TYPE\", 
\"CC_TBLPROPERTIES\", \"CC_WORKER_ID\", "
-              + "\"CC_START\", \"CC_END\", \"CC_RUN_AS\", 
\"CC_HIGHEST_WRITE_ID\", \"CC_META_INFO\", "
-              + "\"CC_HADOOP_JOB_ID\", \"CC_ERROR_MESSAGE\", 
\"CC_ENQUEUE_TIME\", "
-              + "\"CC_WORKER_VERSION\", \"CC_INITIATOR_ID\", 
\"CC_INITIATOR_VERSION\", "
-              + "\"CC_NEXT_TXN_ID\", \"CC_TXN_ID\", \"CC_COMMIT_TIME\", 
\"CC_POOL_NAME\", \"CC_NUMBER_OF_BUCKETS\","
-              + "\"CC_ORDER_BY\") "
-              + "SELECT \"CQ_ID\", \"CQ_DATABASE\", \"CQ_TABLE\", 
\"CQ_PARTITION\", "
-              + quoteChar(SUCCEEDED_STATE) + ", \"CQ_TYPE\", 
\"CQ_TBLPROPERTIES\", \"CQ_WORKER_ID\", \"CQ_START\", "
-              + getEpochFn(dbProduct) + ", \"CQ_RUN_AS\", 
\"CQ_HIGHEST_WRITE_ID\", \"CQ_META_INFO\", "
-              + "\"CQ_HADOOP_JOB_ID\", \"CQ_ERROR_MESSAGE\", 
\"CQ_ENQUEUE_TIME\", "
-              + "\"CQ_WORKER_VERSION\", \"CQ_INITIATOR_ID\", 
\"CQ_INITIATOR_VERSION\", "
-              + "\"CQ_NEXT_TXN_ID\", \"CQ_TXN_ID\", \"CQ_COMMIT_TIME\", 
\"CQ_POOL_NAME\", \"CQ_NUMBER_OF_BUCKETS\", "
-              + "\"CQ_ORDER_BY\" "
-              + "FROM \"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = ?";
-          pStmt = dbConn.prepareStatement(s);
-          pStmt.setLong(1, info.id);
-          LOG.debug("Going to execute update <{}> for CQ_ID={}", s, info.id);
-          pStmt.executeUpdate();
-        }
-
-        /* Remove compaction queue record corresponding to the compaction 
which has been successful as well as
-         * remove all abort retry associated metadata of table/partition in 
the COMPACTION_QUEUE both when compaction
-         * or abort cleanup is successful. We don't want a situation wherein 
we have an abort retry entry for a table
-         * but no corresponding entry in TXN_COMPONENTS table. Successful 
compaction will delete
-         * the retry metadata, so that abort cleanup is retried again (an 
optimistic retry approach).
-         */
-        removeCompactionAndAbortRetryEntries(dbConn, info);
-
-        if (!info.isAbortedTxnCleanup()) {
-          // Remove entries from completed_txn_components as well, so we don't 
start looking there
-          // again but only up to the highest write ID include in this 
compaction job.
-          //highestWriteId will be NULL in upgrade scenarios
-          String query =
-              "DELETE FROM \"COMPLETED_TXN_COMPONENTS\" " +
-              " WHERE \"CTC_DATABASE\" = ? AND \"CTC_TABLE\" = ?";
-          if (info.partName != null) {
-            query += " AND \"CTC_PARTITION\" = ?";
-          }
-          if (info.highestWriteId != 0) {
-            query += " AND \"CTC_WRITEID\" <= ?";
-          }
-          pStmt = dbConn.prepareStatement(query);
-          int paramCount = 1;
-          pStmt.setString(paramCount++, info.dbname);
-          pStmt.setString(paramCount++, info.tableName);
-          if (info.partName != null) {
-            pStmt.setString(paramCount++, info.partName);
-          }
-          if (info.highestWriteId != 0) {
-            pStmt.setLong(paramCount, info.highestWriteId);
-          }
-          LOG.debug("Going to execute update <{}>", query);
-          int updCount = pStmt.executeUpdate();
-          if (updCount < 1) {
-            LOG.warn("Expected to remove at least one row from 
completed_txn_components when " +
-                    "marking compaction entry as clean!");
-          }
-          LOG.debug("Removed {} records from completed_txn_components", 
updCount);
-        }
-
-        // Do cleanup of metadata in TXN_COMPONENTS table.
-        removeTxnComponents(dbConn, info);
-        LOG.debug("Going to commit");
-        dbConn.commit();
-      } catch (SQLException e) {
-        LOG.error("Unable to delete from compaction queue " + e.getMessage());
-        LOG.debug("Going to rollback");
-        rollbackDBConn(dbConn);
-        checkRetryable(e, "markCleaned(" + info + ")");
-        throw new MetaException(DB_FAILED_TO_CONNECT + e.getMessage());
-      } finally {
-        close(rs, pStmt, dbConn);
-      }
-    } catch (RetryException e) {
-      markCleaned(info);
-    }
-  }
-
-  private void removeTxnComponents(Connection dbConn, CompactionInfo info) 
throws MetaException, RetryException {
-    PreparedStatement pStmt = null;
-    ResultSet rs = null;
-    try {
-      /*
-       * compaction may remove data from aborted txns above tc_writeid bit it 
only guarantees to
-       * remove it up to (inclusive) tc_writeid, so it's critical to not 
remove metadata about
-       * aborted TXN_COMPONENTS above tc_writeid (and consequently about 
aborted txns).
-       * See {@link ql.txn.compactor.Cleaner.removeFiles()}
-       */
-      String s = "DELETE FROM \"TXN_COMPONENTS\" WHERE \"TC_TXNID\" IN ( "
-              + "SELECT \"TXN_ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = " + 
TxnStatus.ABORTED + ") "
-              + "AND \"TC_DATABASE\" = ? AND \"TC_TABLE\" = ? "
-              + "AND \"TC_PARTITION\" " + TxnUtils.nvl(info.partName);
-
-      List<String> queries = new ArrayList<>();
-      Iterator<Long> writeIdsIter = null;
-      List<Integer> counts = null;
-
-      if (info.writeIds != null && !info.writeIds.isEmpty()) {
-        StringBuilder prefix = new StringBuilder(s).append(" AND ");
-        List<String> questions = Collections.nCopies(info.writeIds.size(), 
"?");
-
-        counts = TxnUtils.buildQueryWithINClauseStrings(conf, queries, prefix,
-                new StringBuilder(), questions, "\"TC_WRITEID\"", false, 
false);
-        writeIdsIter = info.writeIds.iterator();
-      } else if (!info.hasUncompactedAborts) {
-        if (info.highestWriteId != 0) {
-          s += " AND \"TC_WRITEID\" <= ?";
-        }
-        queries.add(s);
-      }
-
-      for (int i = 0; i < queries.size(); i++) {
-        String query = queries.get(i);
-        int writeIdCount = (counts != null) ? counts.get(i) : 0;
-
-        LOG.debug("Going to execute update <{}>", query);
-        pStmt = dbConn.prepareStatement(query);
-        int paramCount = 1;
-
-        pStmt.setString(paramCount++, info.dbname);
-        pStmt.setString(paramCount++, info.tableName);
-        if (info.partName != null) {
-          pStmt.setString(paramCount++, info.partName);
-        }
-        if (info.highestWriteId != 0 && writeIdCount == 0) {
-          pStmt.setLong(paramCount, info.highestWriteId);
-        }
-        for (int j = 0; j < writeIdCount; j++) {
-          if (writeIdsIter.hasNext()) {
-            pStmt.setLong(paramCount + j, writeIdsIter.next());
-          }
-        }
-        int rc = pStmt.executeUpdate();
-        LOG.debug("Removed {} records from txn_components", rc);
-      }
-    } catch (SQLException e) {
-      LOG.error("Unable to delete from txn components due to {}", 
e.getMessage());
-      LOG.debug("Going to rollback");
-      rollbackDBConn(dbConn);
-      checkRetryable(e, "markCleanedForAborts(" + info + ")");
-      throw new MetaException(DB_FAILED_TO_CONNECT + e.getMessage());
-    } finally {
-      close(rs);
-      closeStmt(pStmt);
-    }
+    new MarkCleanedFunction(info, dbProduct, conf).call(dataSourceWrapper);
   }
-
+  
   /**
    * Clean up entries from TXN_TO_WRITE_ID table less than 
min_uncommited_txnid as found by
    * min(max(TXNS.txn_id), min(WRITE_SET.WS_COMMIT_ID), min(Aborted 
TXNS.txn_id)).
    */
   @Override
   @RetrySemantics.SafeToRetry
   public void cleanTxnToWriteIdTable() throws MetaException {
-    try {
-      Connection dbConn = null;
-      Statement stmt = null;
-      ResultSet rs = null;
-
-      try {
-        long minTxnIdSeenOpen = findMinTxnIdSeenOpen();
-
-        // We query for minimum values in all the queries and they can only 
increase by any concurrent
-        // operations. So, READ COMMITTED is sufficient.
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED, 
connPoolCompaction);
-        stmt = dbConn.createStatement();
-
-        // First need to find the min_uncommitted_txnid which is currently 
seen by any open transactions.
-        // If there are no txns which are currently open or aborted in the 
system, then current value of
-        // max(TXNS.txn_id) could be min_uncommitted_txnid.
-        String s = "SELECT MIN(\"RES\".\"ID\") AS \"ID\" FROM (" +
-          " SELECT MAX(\"TXN_ID\") + 1 AS \"ID\" FROM \"TXNS\"" +
-          (useMinHistoryLevel ? "" :
-          "   UNION" +
-          " SELECT MIN(\"WS_TXNID\") AS \"ID\" FROM \"WRITE_SET\"") +
-          "   UNION" +
-          " SELECT MIN(\"TXN_ID\") AS \"ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" 
= " + TxnStatus.ABORTED +
-          (useMinHistoryLevel ? "" :
-          "   OR \"TXN_STATE\" = " + TxnStatus.OPEN) +
-          " ) \"RES\"";
+    long minTxnIdSeenOpen = findMinTxnIdSeenOpen();
 
-        LOG.debug("Going to execute query <{}>", s);
-        rs = stmt.executeQuery(s);
-        if (!rs.next()) {
-          throw new MetaException("Transaction tables not properly 
initialized, no record found in TXNS");
-        }
-        long minUncommitedTxnid = Math.min(rs.getLong(1), minTxnIdSeenOpen);
-
-        // As all txns below min_uncommitted_txnid are either committed or 
empty_aborted, we are allowed
-        // to cleanup the entries less than min_uncommitted_txnid from the 
TXN_TO_WRITE_ID table.
-        s = "DELETE FROM \"TXN_TO_WRITE_ID\" WHERE \"T2W_TXNID\" < " + 
minUncommitedTxnid;
-        LOG.debug("Going to execute delete <{}>", s);
-        int rc = stmt.executeUpdate(s);
-        LOG.info("Removed {} rows from TXN_TO_WRITE_ID with Txn 
Low-Water-Mark: {}", rc, minUncommitedTxnid);
-
-        LOG.debug("Going to commit");
-        dbConn.commit();
-      } catch (SQLException e) {
-        LOG.error("Unable to delete from TXN_TO_WRITE_ID table " + 
e.getMessage());
-        LOG.debug("Going to rollback");
-        rollbackDBConn(dbConn);
-        checkRetryable(e, "cleanTxnToWriteIdTable");
-        throw new MetaException(DB_FAILED_TO_CONNECT + e.getMessage());
-      } finally {
-        close(rs, stmt, dbConn);
-      }
-    } catch (RetryException e) {
-      cleanTxnToWriteIdTable();
+    // First need to find the min_uncommitted_txnid which is currently seen by 
any open transactions.
+    // If there are no txns which are currently open or aborted in the system, 
then current value of
+    // max(TXNS.txn_id) could be min_uncommitted_txnid.
+    Long minTxnId = new 
MinUncommittedTxnIdHandler(useMinHistoryLevel).execute(dataSourceWrapper);
+    if (minTxnId == null) {
+      throw new MetaException("Transaction tables not properly initialized, no 
record found in TXNS");
     }
+    long minUncommitedTxnid = Math.min(minTxnId, minTxnIdSeenOpen);
+
+    // As all txns below min_uncommitted_txnid are either committed or 
empty_aborted, we are allowed
+    // to clean up the entries less than min_uncommitted_txnid from the 
TXN_TO_WRITE_ID table.
+    String sql = "DELETE FROM \"TXN_TO_WRITE_ID\" WHERE \"T2W_TXNID\" < 
:txnId";
+    LOG.debug("Going to execute delete <{}>", sql);
+    int rc = dataSourceWrapper.getJdbcTemplate().update(sql, new 
MapSqlParameterSource("txnId", minUncommitedTxnid));

Review Comment:
   Sure, we can. for the very simple cases I did not create a separate class, 
but to be consistent, I can create for these too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to