[
https://issues.apache.org/jira/browse/HIVE-26804?focusedWorklogId=841972&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-841972
]
ASF GitHub Bot logged work on HIVE-26804:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 27/Jan/23 10:30
Start Date: 27/Jan/23 10:30
Worklog Time Spent: 10m
Work Description: rkirtir commented on code in PR #3880:
URL: https://github.com/apache/hive/pull/3880#discussion_r1088805034
##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java:
##########
@@ -6266,4 +6271,105 @@ public boolean isWrapperFor(Class<?> iface) throws
SQLException {
}
}
+ @Override
+ @RetrySemantics.SafeToRetry
+ public AbortCompactResponse abortCompactions(AbortCompactionRequest reqst)
throws MetaException, NoSuchCompactionException {
+ Map<Long, AbortCompactionResponseElement> abortCompactionResponseElements
= new HashMap<>();
+ AbortCompactResponse response = new AbortCompactResponse(new HashMap<>());
+ response.setAbortedcompacts(abortCompactionResponseElements);
+
+ List<Long> compactionIdsToAbort = reqst.getCompactionIds();
+ if (compactionIdsToAbort.isEmpty()) {
+ LOG.info("Compaction ids are missing in request. No compactions to
abort");
+ throw new NoSuchCompactionException("Compaction ids missing in request.
No compactions to abort");
+ }
+ reqst.getCompactionIds().forEach(x -> {
+ abortCompactionResponseElements.put(x, new
AbortCompactionResponseElement(x, "Error",
+ "No Such Compaction Id Available"));
+ });
+
+ List<CompactionInfo> eligibleCompactionsToAbort =
findEligibleCompactionsToAbort(abortCompactionResponseElements,
+ compactionIdsToAbort);
+ for (int x = 0; x < eligibleCompactionsToAbort.size(); x++) {
+
abortCompactionResponseElements.put(eligibleCompactionsToAbort.get(x).id,
abortCompaction(eligibleCompactionsToAbort.get(x)));
+ }
+ return response;
+ }
+
+ @RetrySemantics.SafeToRetry
+ public AbortCompactionResponseElement abortCompaction(CompactionInfo
compactionInfo) throws MetaException {
+ try {
+ try (Connection dbConn =
getDbConn(Connection.TRANSACTION_READ_COMMITTED, connPoolMutex);
+ PreparedStatement pStmt =
dbConn.prepareStatement(TxnQueries.INSERT_INTO_COMPLETED_COMPACTION)) {
+ compactionInfo.state = TxnStore.ABORTED_STATE;
+ compactionInfo.errorMessage = "Comapction Aborted by Abort Comapction
request.";
+ CompactionInfo.insertIntoCompletedCompactions(pStmt, compactionInfo,
getDbTime(dbConn));
+ int updCount = pStmt.executeUpdate();
+ if (updCount != 1) {
+ LOG.error("Unable to update compaction record: {}. updCnt={}",
compactionInfo, updCount);
+ dbConn.rollback();
+ return new AbortCompactionResponseElement(compactionInfo.id,
+ "Error", "Error while aborting compaction:Unable to update
compaction record in COMPLETED_COMPACTIONS");
+ } else {
+ LOG.debug("Inserted {} entries into COMPLETED_COMPACTIONS",
updCount);
+ try (PreparedStatement stmt = dbConn.prepareStatement("DELETE FROM
\"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = ?")) {
+ stmt.setLong(1, compactionInfo.id);
+ LOG.debug("Going to execute update on COMPACTION_QUEUE ");
+ updCount = stmt.executeUpdate();
+ if (updCount != 1) {
+ LOG.error("Unable to update compaction record: {}. updCnt={}",
compactionInfo, updCount);
+ dbConn.rollback();
+ return new AbortCompactionResponseElement(compactionInfo.id,
+ "Error", "Error while aborting compaction: Unable to
update compaction record in COMPACTION_QUEUE");
+ } else {
+ dbConn.commit();
+ return new AbortCompactionResponseElement(compactionInfo.id,
+ "Success", "Successfully aborted compaction");
+ }
+ } catch (SQLException e) {
+ dbConn.rollback();
+ return new AbortCompactionResponseElement(compactionInfo.id,
+ "Error", "Error while aborting compaction:" +
e.getMessage());
+ }
+ }
+ } catch (SQLException e) {
+ LOG.error("Unable to connect to transaction database: " +
e.getMessage());
+ checkRetryable(e, "abortCompaction(" + compactionInfo + ")");
+ return new AbortCompactionResponseElement(compactionInfo.id,
+ "Error", "Error while aborting compaction:" + e.getMessage());
+ }
+ } catch (RetryException e) {
+ return abortCompaction(compactionInfo);
+ }
+ }
+
+ private List<CompactionInfo> findEligibleCompactionsToAbort(Map<Long,
+ AbortCompactionResponseElement> abortCompactionResponseElements,
List<Long> requestedCompId) throws MetaException {
+
+ List<CompactionInfo> compactionInfoList = new ArrayList<>();
+ String queryText = TxnQueries.SELECT_COMPACTION_QUEUE_BY_COMPID + " WHERE
\"CC_ID\" IN (?) " ;
+ String sqlIN = requestedCompId.stream()
+ .map(x -> String.valueOf(x))
+ .collect(Collectors.joining(",", "(", ")"));
+ queryText = queryText.replace("(?)", sqlIN);
Review Comment:
There was code smell warning before for dynamic formating of sql
Issue Time Tracking
-------------------
Worklog Id: (was: 841972)
Time Spent: 4.5h (was: 4h 20m)
> Cancel Compactions in initiated state
> -------------------------------------
>
> Key: HIVE-26804
> URL: https://issues.apache.org/jira/browse/HIVE-26804
> Project: Hive
> Issue Type: New Feature
> Components: Hive
> Reporter: KIRTI RUGE
> Assignee: KIRTI RUGE
> Priority: Major
> Labels: pull-request-available
> Time Spent: 4.5h
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)