rkirtir commented on code in PR #3880:
URL: https://github.com/apache/hive/pull/3880#discussion_r1085143053
##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java:
##########
@@ -6266,4 +6271,110 @@ public boolean isWrapperFor(Class<?> iface) throws
SQLException {
}
}
+ @Override
+ @RetrySemantics.SafeToRetry
+ public AbortCompactResponse abortCompactions(AbortCompactionRequest reqst)
throws MetaException, NoSuchCompactionException {
+ Map<Long, AbortCompactionResponseElement> abortCompactionResponseElements
= new HashMap<>();
+ AbortCompactResponse response = new AbortCompactResponse(new HashMap<>());
+ response.setAbortedcompacts(abortCompactionResponseElements);
+
+ List<Long> compactionIdsToAbort = reqst.getCompactionIds();
+ if (compactionIdsToAbort.isEmpty()) {
+ LOG.info("Compaction ids are missing in request. No compactions to
abort");
+ throw new NoSuchCompactionException("Compaction ids missing in request.
No compactions to abort");
+ }
+ reqst.getCompactionIds().forEach(x -> {
+ addAbortCompactionResponse(abortCompactionResponseElements,x, "No Such
Compaction Id Available","Error");
+ });
+
+ List<CompactionInfo> eligibleCompactionsToAbort =
findEligibleCompactionsToAbort(abortCompactionResponseElements,compactionIdsToAbort);
+ for (int x = 0; x < eligibleCompactionsToAbort.size(); x++) {
+ abortCompaction(abortCompactionResponseElements,
eligibleCompactionsToAbort.get(x));
+ }
+ return response;
+ }
+
+ @RetrySemantics.SafeToRetry
+ public void abortCompaction(Map<Long, AbortCompactionResponseElement>
abortCompactionResponseElements,
+ CompactionInfo compactionInfo) throws
MetaException {
+ try {
+ try (Connection dbConn =
getDbConn(Connection.TRANSACTION_READ_COMMITTED, connPoolMutex);
+ PreparedStatement pStmt =
dbConn.prepareStatement(TxnQueries.INSERT_INTO_COMPLETED_COMPACTION)) {
+ compactionInfo.state = TxnStore.ABORTED_STATE;
+ compactionInfo.errorMessage = "Comapction Aborted by Abort Comapction
request.";
+ CompactionInfo.insertIntoCompletedCompactions(pStmt, compactionInfo,
getDbTime(dbConn));
+ int updCount = pStmt.executeUpdate();
+ if (updCount != 1) {
+ LOG.error("Unable to update compaction record: {}. updCnt={}",
compactionInfo, updCount);
+ dbConn.rollback();
+ addAbortCompactionResponse(abortCompactionResponseElements,
compactionInfo.id,
+ "Error while aborting compaction:Unable to update compaction
record in COMPLETED_COMPACTIONS", "Error");
+ } else {
+ LOG.debug("Inserted {} entries into COMPLETED_COMPACTIONS",
updCount);
+ try (PreparedStatement stmt = dbConn.prepareStatement("DELETE FROM
\"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = ?")) {
+ stmt.setLong(1, compactionInfo.id);
+ LOG.debug("Going to execute update on COMPACTION_QUEUE ");
+ updCount = stmt.executeUpdate();
+ if (updCount != 1) {
+ LOG.error("Unable to update compaction record: {}. updCnt={}",
compactionInfo, updCount);
+ dbConn.rollback();
+ addAbortCompactionResponse(abortCompactionResponseElements,
compactionInfo.id,
+ "Error while aborting compaction: Unable to update
compaction record in COMPACTION_QUEUE", "Error");
+ } else {
+ dbConn.commit();
+ addAbortCompactionResponse(abortCompactionResponseElements,
compactionInfo.id, "Successfully aborted compaction",
+ "Success");
+ }
+ } catch (SQLException e) {
+ dbConn.rollback();
+ addAbortCompactionResponse(abortCompactionResponseElements,
compactionInfo.id,
+ "Error while aborting compaction:"+e.getMessage(),
"Error");
+ }
+ }
+ } catch (SQLException e) {
+ LOG.error("Unable to connect to transaction database: " +
e.getMessage());
+ checkRetryable(e, "abortCompaction(" + compactionInfo + ")");
+ addAbortCompactionResponse(abortCompactionResponseElements,
compactionInfo.id,
+ "Error while aborting compaction:"+ e.getMessage(), "Error" );
+ }
+ } catch (RetryException e) {
+ abortCompaction(abortCompactionResponseElements, compactionInfo);
+ }
+ }
+
+ private List<CompactionInfo> findEligibleCompactionsToAbort(Map<Long,
+ AbortCompactionResponseElement> abortCompactionResponseElements,
List<Long> requestedCompId) throws MetaException {
+
+ List<CompactionInfo> compactionInfoList = new ArrayList<>();
+ String queryText = TxnQueries.SELECT_COMPACTION_QUEUE_BY_COMPID + " WHERE
\"CC_ID\" IN " +
+ "(" + Joiner.on(',').join(requestedCompId) + ") ";
+ try (Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ PreparedStatement pStmt = dbConn.prepareStatement(queryText)) {
+ try (ResultSet rs = pStmt.executeQuery()) {
+ while (rs.next()) {
+ if (checkIfCompactionEligibleToAbort(rs.getString(5).charAt(0))) {
+
compactionInfoList.add(CompactionInfo.loadFullFromCompactionQueue(rs));
+ } else {
+ addAbortCompactionResponse(abortCompactionResponseElements,
rs.getLong(1),
+ "Error while aborting compaction as compaction is in state-" +
+ CompactionState.fromSqlConst(rs.getString(5).charAt(0)),
"Error");
+ }
+ }
+ }
+ } catch (SQLException e) {
+ throw new MetaException("Unable to select from transaction database-" +
StringUtils.stringifyException(e));
+ }
+ return compactionInfoList;
+ }
+
+ private boolean checkIfCompactionEligibleToAbort(char state) {
+
+ return
CompactionState.INITIATED.equals(CompactionState.fromSqlConst(state));
+ }
+
+ private void addAbortCompactionResponse(Map<Long,
AbortCompactionResponseElement> abortCompactionResponseElements,
+ long id, String message, String
status) {
+
+ abortCompactionResponseElements.put(id, new
AbortCompactionResponseElement(id, status, message));
+ }
Review Comment:
fixed
##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java:
##########
@@ -6266,4 +6271,110 @@ public boolean isWrapperFor(Class<?> iface) throws
SQLException {
}
}
+ @Override
+ @RetrySemantics.SafeToRetry
+ public AbortCompactResponse abortCompactions(AbortCompactionRequest reqst)
throws MetaException, NoSuchCompactionException {
+ Map<Long, AbortCompactionResponseElement> abortCompactionResponseElements
= new HashMap<>();
+ AbortCompactResponse response = new AbortCompactResponse(new HashMap<>());
+ response.setAbortedcompacts(abortCompactionResponseElements);
+
+ List<Long> compactionIdsToAbort = reqst.getCompactionIds();
+ if (compactionIdsToAbort.isEmpty()) {
+ LOG.info("Compaction ids are missing in request. No compactions to
abort");
+ throw new NoSuchCompactionException("Compaction ids missing in request.
No compactions to abort");
+ }
+ reqst.getCompactionIds().forEach(x -> {
+ addAbortCompactionResponse(abortCompactionResponseElements,x, "No Such
Compaction Id Available","Error");
+ });
+
+ List<CompactionInfo> eligibleCompactionsToAbort =
findEligibleCompactionsToAbort(abortCompactionResponseElements,compactionIdsToAbort);
+ for (int x = 0; x < eligibleCompactionsToAbort.size(); x++) {
+ abortCompaction(abortCompactionResponseElements,
eligibleCompactionsToAbort.get(x));
+ }
+ return response;
+ }
+
+ @RetrySemantics.SafeToRetry
+ public void abortCompaction(Map<Long, AbortCompactionResponseElement>
abortCompactionResponseElements,
Review Comment:
fixed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]