[ 
https://issues.apache.org/jira/browse/HIVE-26804?focusedWorklogId=840962&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-840962
 ]

ASF GitHub Bot logged work on HIVE-26804:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 23/Jan/23 06:18
            Start Date: 23/Jan/23 06:18
    Worklog Time Spent: 10m 
      Work Description: rkirtir commented on code in PR #3880:
URL: https://github.com/apache/hive/pull/3880#discussion_r1083679106


##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java:
##########
@@ -6242,4 +6245,91 @@ public boolean isWrapperFor(Class<?> iface) throws 
SQLException {
     }
   }
 
+  @Override
+  @RetrySemantics.SafeToRetry
+  public AbortCompactResponse abortCompactions(AbortCompactionRequest reqst) 
throws MetaException, NoSuchCompactionException {
+
+    AbortCompactResponse response = new AbortCompactResponse(new HashMap<>());
+    response.setAbortedcompacts(abortCompactionResponseElements);
+    List<Long> compactionIdsToAbort = reqst.getCompactionIds();
+    if (compactionIdsToAbort.isEmpty()) {
+      LOG.info("Compaction ids are missing in request. No compactions to 
abort");
+      throw new NoSuchCompactionException("Compaction ids missing in request. 
No compactions to abort");
+    }
+    reqst.getCompactionIds().forEach(x -> {
+      abortCompactionResponseElements.put(x, new 
AbortCompactionResponseElement(x, "Error", "Not Eligible"));
+    });
+    List<CompactionInfo> eligibleCompactionsToAbort = 
findEligibleCompactionsToAbort(compactionIdsToAbort);
+    for (int x = 0; x < eligibleCompactionsToAbort.size(); x++) {
+      abortCompaction(eligibleCompactionsToAbort.get(x));
+    }
+    return response;
+  }
+
+  private void addAbortCompactionResponse(long id, String message, String 
status) {
+    abortCompactionResponseElements.put(id, new 
AbortCompactionResponseElement(id, status, message));
+  }
+
+  @RetrySemantics.SafeToRetry
+  public void abortCompaction(CompactionInfo compactionInfo) throws 
MetaException {
+    try {
+      try (Connection dbConn = 
getDbConn(Connection.TRANSACTION_READ_COMMITTED, connPoolMutex);
+           PreparedStatement pStmt = 
dbConn.prepareStatement(TxnQueries.INSERT_INTO_COMPLETED_COMPACTION)) {
+        CompactionInfo.insertIntoCompletedCompactions(pStmt, compactionInfo, 
getDbTime(dbConn));
+        int updCount = pStmt.executeUpdate();
+        if (updCount != 1) {
+          LOG.error("Unable to update compaction record: {}. updCnt={}", 
compactionInfo, updCount);
+          dbConn.rollback();
+        } else {
+          LOG.debug("Inserted {} entries into COMPLETED_COMPACTIONS", 
updCount);
+          try (PreparedStatement stmt = dbConn.prepareStatement("DELETE FROM 
\"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = ?")) {
+            stmt.setLong(1, compactionInfo.id);
+            LOG.debug("Going to execute update on COMPACTION_QUEUE ");
+            updCount = stmt.executeUpdate();
+            if (updCount != 1) {
+              LOG.error("Unable to update compaction record: {}. updCnt={}", 
compactionInfo, updCount);
+              dbConn.rollback();
+              addAbortCompactionResponse(compactionInfo.id, "Error while 
aborting compaction ", "Error");
+            } else {
+              dbConn.commit();
+              addAbortCompactionResponse(compactionInfo.id, "Successfully 
aborted compaction ", "Success");
+            }
+          } catch (SQLException e) {
+            dbConn.rollback();
+            addAbortCompactionResponse(compactionInfo.id, "Error while 
aborting compaction ", "Error");
+          }
+        }
+      } catch (SQLException e) {
+        LOG.error("Unable to connect to transaction database: " + 
e.getMessage());
+        checkRetryable(e, "abortCompaction(" + compactionInfo + ")");
+        addAbortCompactionResponse(compactionInfo.id, "Error while aborting 
compaction ", "Error");

Review Comment:
   fixed



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java:
##########
@@ -6242,4 +6245,91 @@ public boolean isWrapperFor(Class<?> iface) throws 
SQLException {
     }
   }
 
+  @Override
+  @RetrySemantics.SafeToRetry
+  public AbortCompactResponse abortCompactions(AbortCompactionRequest reqst) 
throws MetaException, NoSuchCompactionException {
+
+    AbortCompactResponse response = new AbortCompactResponse(new HashMap<>());
+    response.setAbortedcompacts(abortCompactionResponseElements);
+    List<Long> compactionIdsToAbort = reqst.getCompactionIds();
+    if (compactionIdsToAbort.isEmpty()) {
+      LOG.info("Compaction ids are missing in request. No compactions to 
abort");
+      throw new NoSuchCompactionException("Compaction ids missing in request. 
No compactions to abort");
+    }
+    reqst.getCompactionIds().forEach(x -> {
+      abortCompactionResponseElements.put(x, new 
AbortCompactionResponseElement(x, "Error", "Not Eligible"));
+    });
+    List<CompactionInfo> eligibleCompactionsToAbort = 
findEligibleCompactionsToAbort(compactionIdsToAbort);
+    for (int x = 0; x < eligibleCompactionsToAbort.size(); x++) {
+      abortCompaction(eligibleCompactionsToAbort.get(x));
+    }
+    return response;
+  }
+
+  private void addAbortCompactionResponse(long id, String message, String 
status) {
+    abortCompactionResponseElements.put(id, new 
AbortCompactionResponseElement(id, status, message));
+  }
+
+  @RetrySemantics.SafeToRetry
+  public void abortCompaction(CompactionInfo compactionInfo) throws 
MetaException {
+    try {
+      try (Connection dbConn = 
getDbConn(Connection.TRANSACTION_READ_COMMITTED, connPoolMutex);
+           PreparedStatement pStmt = 
dbConn.prepareStatement(TxnQueries.INSERT_INTO_COMPLETED_COMPACTION)) {
+        CompactionInfo.insertIntoCompletedCompactions(pStmt, compactionInfo, 
getDbTime(dbConn));
+        int updCount = pStmt.executeUpdate();
+        if (updCount != 1) {
+          LOG.error("Unable to update compaction record: {}. updCnt={}", 
compactionInfo, updCount);
+          dbConn.rollback();
+        } else {
+          LOG.debug("Inserted {} entries into COMPLETED_COMPACTIONS", 
updCount);
+          try (PreparedStatement stmt = dbConn.prepareStatement("DELETE FROM 
\"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = ?")) {
+            stmt.setLong(1, compactionInfo.id);
+            LOG.debug("Going to execute update on COMPACTION_QUEUE ");
+            updCount = stmt.executeUpdate();
+            if (updCount != 1) {
+              LOG.error("Unable to update compaction record: {}. updCnt={}", 
compactionInfo, updCount);
+              dbConn.rollback();
+              addAbortCompactionResponse(compactionInfo.id, "Error while 
aborting compaction ", "Error");
+            } else {
+              dbConn.commit();
+              addAbortCompactionResponse(compactionInfo.id, "Successfully 
aborted compaction ", "Success");
+            }
+          } catch (SQLException e) {
+            dbConn.rollback();
+            addAbortCompactionResponse(compactionInfo.id, "Error while 
aborting compaction ", "Error");

Review Comment:
   fixed





Issue Time Tracking
-------------------

    Worklog Id:     (was: 840962)
    Time Spent: 2h 10m  (was: 2h)

> Cancel Compactions in initiated state
> -------------------------------------
>
>                 Key: HIVE-26804
>                 URL: https://issues.apache.org/jira/browse/HIVE-26804
>             Project: Hive
>          Issue Type: New Feature
>          Components: Hive
>            Reporter: KIRTI RUGE
>            Assignee: KIRTI RUGE
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 2h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to