[
https://issues.apache.org/jira/browse/HIVE-26804?focusedWorklogId=841358&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-841358
]
ASF GitHub Bot logged work on HIVE-26804:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 24/Jan/23 11:18
Start Date: 24/Jan/23 11:18
Worklog Time Spent: 10m
Work Description: rkirtir commented on code in PR #3880:
URL: https://github.com/apache/hive/pull/3880#discussion_r1085143053
##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java:
##########
@@ -6266,4 +6271,110 @@ public boolean isWrapperFor(Class<?> iface) throws
SQLException {
}
}
+ @Override
+ @RetrySemantics.SafeToRetry
+ public AbortCompactResponse abortCompactions(AbortCompactionRequest reqst)
throws MetaException, NoSuchCompactionException {
+ Map<Long, AbortCompactionResponseElement> abortCompactionResponseElements
= new HashMap<>();
+ AbortCompactResponse response = new AbortCompactResponse(new HashMap<>());
+ response.setAbortedcompacts(abortCompactionResponseElements);
+
+ List<Long> compactionIdsToAbort = reqst.getCompactionIds();
+ if (compactionIdsToAbort.isEmpty()) {
+ LOG.info("Compaction ids are missing in request. No compactions to
abort");
+ throw new NoSuchCompactionException("Compaction ids missing in request.
No compactions to abort");
+ }
+ reqst.getCompactionIds().forEach(x -> {
+ addAbortCompactionResponse(abortCompactionResponseElements,x, "No Such
Compaction Id Available","Error");
+ });
+
+ List<CompactionInfo> eligibleCompactionsToAbort =
findEligibleCompactionsToAbort(abortCompactionResponseElements,compactionIdsToAbort);
+ for (int x = 0; x < eligibleCompactionsToAbort.size(); x++) {
+ abortCompaction(abortCompactionResponseElements,
eligibleCompactionsToAbort.get(x));
+ }
+ return response;
+ }
+
+ @RetrySemantics.SafeToRetry
+ public void abortCompaction(Map<Long, AbortCompactionResponseElement>
abortCompactionResponseElements,
+ CompactionInfo compactionInfo) throws
MetaException {
+ try {
+ try (Connection dbConn =
getDbConn(Connection.TRANSACTION_READ_COMMITTED, connPoolMutex);
+ PreparedStatement pStmt =
dbConn.prepareStatement(TxnQueries.INSERT_INTO_COMPLETED_COMPACTION)) {
+ compactionInfo.state = TxnStore.ABORTED_STATE;
+ compactionInfo.errorMessage = "Comapction Aborted by Abort Comapction
request.";
+ CompactionInfo.insertIntoCompletedCompactions(pStmt, compactionInfo,
getDbTime(dbConn));
+ int updCount = pStmt.executeUpdate();
+ if (updCount != 1) {
+ LOG.error("Unable to update compaction record: {}. updCnt={}",
compactionInfo, updCount);
+ dbConn.rollback();
+ addAbortCompactionResponse(abortCompactionResponseElements,
compactionInfo.id,
+ "Error while aborting compaction:Unable to update compaction
record in COMPLETED_COMPACTIONS", "Error");
+ } else {
+ LOG.debug("Inserted {} entries into COMPLETED_COMPACTIONS",
updCount);
+ try (PreparedStatement stmt = dbConn.prepareStatement("DELETE FROM
\"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = ?")) {
+ stmt.setLong(1, compactionInfo.id);
+ LOG.debug("Going to execute update on COMPACTION_QUEUE ");
+ updCount = stmt.executeUpdate();
+ if (updCount != 1) {
+ LOG.error("Unable to update compaction record: {}. updCnt={}",
compactionInfo, updCount);
+ dbConn.rollback();
+ addAbortCompactionResponse(abortCompactionResponseElements,
compactionInfo.id,
+ "Error while aborting compaction: Unable to update
compaction record in COMPACTION_QUEUE", "Error");
+ } else {
+ dbConn.commit();
+ addAbortCompactionResponse(abortCompactionResponseElements,
compactionInfo.id, "Successfully aborted compaction",
+ "Success");
+ }
+ } catch (SQLException e) {
+ dbConn.rollback();
+ addAbortCompactionResponse(abortCompactionResponseElements,
compactionInfo.id,
+ "Error while aborting compaction:"+e.getMessage(),
"Error");
+ }
+ }
+ } catch (SQLException e) {
+ LOG.error("Unable to connect to transaction database: " +
e.getMessage());
+ checkRetryable(e, "abortCompaction(" + compactionInfo + ")");
+ addAbortCompactionResponse(abortCompactionResponseElements,
compactionInfo.id,
+ "Error while aborting compaction:"+ e.getMessage(), "Error" );
+ }
+ } catch (RetryException e) {
+ abortCompaction(abortCompactionResponseElements, compactionInfo);
+ }
+ }
+
+ private List<CompactionInfo> findEligibleCompactionsToAbort(Map<Long,
+ AbortCompactionResponseElement> abortCompactionResponseElements,
List<Long> requestedCompId) throws MetaException {
+
+ List<CompactionInfo> compactionInfoList = new ArrayList<>();
+ String queryText = TxnQueries.SELECT_COMPACTION_QUEUE_BY_COMPID + " WHERE
\"CC_ID\" IN " +
+ "(" + Joiner.on(',').join(requestedCompId) + ") ";
+ try (Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ PreparedStatement pStmt = dbConn.prepareStatement(queryText)) {
+ try (ResultSet rs = pStmt.executeQuery()) {
+ while (rs.next()) {
+ if (checkIfCompactionEligibleToAbort(rs.getString(5).charAt(0))) {
+
compactionInfoList.add(CompactionInfo.loadFullFromCompactionQueue(rs));
+ } else {
+ addAbortCompactionResponse(abortCompactionResponseElements,
rs.getLong(1),
+ "Error while aborting compaction as compaction is in state-" +
+ CompactionState.fromSqlConst(rs.getString(5).charAt(0)),
"Error");
+ }
+ }
+ }
+ } catch (SQLException e) {
+ throw new MetaException("Unable to select from transaction database-" +
StringUtils.stringifyException(e));
+ }
+ return compactionInfoList;
+ }
+
+ private boolean checkIfCompactionEligibleToAbort(char state) {
+
+ return
CompactionState.INITIATED.equals(CompactionState.fromSqlConst(state));
+ }
+
+ private void addAbortCompactionResponse(Map<Long,
AbortCompactionResponseElement> abortCompactionResponseElements,
+ long id, String message, String
status) {
+
+ abortCompactionResponseElements.put(id, new
AbortCompactionResponseElement(id, status, message));
+ }
Review Comment:
fixed
##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java:
##########
@@ -6266,4 +6271,110 @@ public boolean isWrapperFor(Class<?> iface) throws
SQLException {
}
}
+ @Override
+ @RetrySemantics.SafeToRetry
+ public AbortCompactResponse abortCompactions(AbortCompactionRequest reqst)
throws MetaException, NoSuchCompactionException {
+ Map<Long, AbortCompactionResponseElement> abortCompactionResponseElements
= new HashMap<>();
+ AbortCompactResponse response = new AbortCompactResponse(new HashMap<>());
+ response.setAbortedcompacts(abortCompactionResponseElements);
+
+ List<Long> compactionIdsToAbort = reqst.getCompactionIds();
+ if (compactionIdsToAbort.isEmpty()) {
+ LOG.info("Compaction ids are missing in request. No compactions to
abort");
+ throw new NoSuchCompactionException("Compaction ids missing in request.
No compactions to abort");
+ }
+ reqst.getCompactionIds().forEach(x -> {
+ addAbortCompactionResponse(abortCompactionResponseElements,x, "No Such
Compaction Id Available","Error");
+ });
+
+ List<CompactionInfo> eligibleCompactionsToAbort =
findEligibleCompactionsToAbort(abortCompactionResponseElements,compactionIdsToAbort);
+ for (int x = 0; x < eligibleCompactionsToAbort.size(); x++) {
+ abortCompaction(abortCompactionResponseElements,
eligibleCompactionsToAbort.get(x));
+ }
+ return response;
+ }
+
+ @RetrySemantics.SafeToRetry
+ public void abortCompaction(Map<Long, AbortCompactionResponseElement>
abortCompactionResponseElements,
Review Comment:
fixed
Issue Time Tracking
-------------------
Worklog Id: (was: 841358)
Time Spent: 3.5h (was: 3h 20m)
> Cancel Compactions in initiated state
> -------------------------------------
>
> Key: HIVE-26804
> URL: https://issues.apache.org/jira/browse/HIVE-26804
> Project: Hive
> Issue Type: New Feature
> Components: Hive
> Reporter: KIRTI RUGE
> Assignee: KIRTI RUGE
> Priority: Major
> Labels: pull-request-available
> Time Spent: 3.5h
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)