deniskuzZ commented on code in PR #4313:
URL: https://github.com/apache/hive/pull/4313#discussion_r1205812270
##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java:
##########
@@ -1203,6 +1252,79 @@ private static class RetentionCounters {
}
}
+ static class TxnCleanupQueueHandler {
+
+ public static void removeRetryQueueEntries(CompactionTxnHandler txnHandler,
+ Connection dbConn,
CompactionInfo info) throws MetaException, RetryException {
+ PreparedStatement pStmt = null;
+ String query = "DELETE FROM \"TXN_CLEANUP_QUEUE\" WHERE \"TCQ_DATABASE\"
= ? " +
+ "AND \"TCQ_TABLE\" = ? AND (\"TCQ_PARTITION\" = ? OR
\"TCQ_PARTITION\" IS NULL)";
+ try {
+ LOG.debug("Going to execute update <{}>", query);
+ pStmt = dbConn.prepareStatement(query);
+ pStmt.setString(1, info.dbname);
+ pStmt.setString(2, info.tableName);
+ if (info.partName != null) {
+ pStmt.setString(3, info.partName);
+ } else {
+ // Since the type of 'TCQ_PARTITION' column is varchar.
+ // Hence, setting null for VARCHAR type.
+ pStmt.setNull(3, Types.VARCHAR);
+ }
+ int rc = pStmt.executeUpdate();
+ LOG.debug("Removed {} records in txn_cleanup_queue", rc);
+ } catch (SQLException e) {
+ LOG.error("Unable to delete from txn components due to {}",
e.getMessage());
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ txnHandler.checkRetryable(e, "removeRetryQueueEntries(" + info + ")");
+ throw new MetaException(DB_FAILED_TO_CONNECT + e.getMessage());
+ } finally {
+ closeStmt(pStmt);
+ }
+ }
+
+ public static void setRetryRetentionTimeOnError(CompactionTxnHandler
txnHandler,
+ Connection dbConn,
AbortTxnRequestInfo info) throws MetaException, RetryException {
+ String query;
+ if (!info.hasRetryEntry) {
+ query = "INSERT INTO \"TXN_CLEANUP_QUEUE\" (\"TCQ_RETRY_RETENTION\",
\"TCQ_ERROR_MESSAGE\", " +
+ "\"TCQ_DATABASE\", \"TCQ_TABLE\", \"TCQ_PARTITION\",
\"TCQ_RETRY_TIME\") VALUES (?, ?, ?, ?, ?, " + getEpochFn(dbProduct) + ")";
+ } else {
+ query = "UPDATE \"TXN_CLEANUP_QUEUE\" SET \"TCQ_RETRY_RETENTION\" = ?,
\"TCQ_ERROR_MESSAGE\" = ? " +
+ "WHERE \"TCQ_DATABASE\" = ? AND \"TCQ_TABLE\" = ? AND
(\"TCQ_PARTITION\" = ? OR \"TCQ_PARTITION\" IS NULL)";
+ }
+ try (PreparedStatement stmt = dbConn.prepareStatement(query)) {
+ stmt.setLong(1, info.retryRetention);
+ stmt.setString(2, info.errorMessage);
+ stmt.setString(3, info.dbname);
+ stmt.setString(4, info.tableName);
+ if (info.partName != null) {
+ stmt.setString(5, info.partName);
+ } else {
+ // Since the type of 'TCQ_PARTITION' column is varchar.
+ // Hence, setting null for VARCHAR type.
+ stmt.setNull(5, Types.VARCHAR);
+ }
+ int updCnt = stmt.executeUpdate();
+ if (updCnt == 0) {
+ LOG.error("Unable to update compaction queue record: {}. updCnt={}",
info, updCnt);
+ dbConn.rollback();
+ throw new MetaException("No record with TCQ_DATABASE=" + info.dbname
+ ", TCQ_TABLE="
+ + info.tableName + ", TCQ_PARTITION" + info.partName + "
found in TXN_CLEANUP_QUEUE");
+ }
+ LOG.debug("Going to commit");
+ dbConn.commit();
+ } catch (SQLException e) {
+ LOG.error("Unable to update compaction queue: " + e.getMessage());
+ rollbackDBConn(dbConn);
+ txnHandler.checkRetryable(e, "setCleanerRetryRetentionTimeOnError(" +
info + ")");
Review Comment:
handle this in a caller method or pass a predicate
````
e -> txnHandler.checkRetryable(e)
````
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]