[ 
https://issues.apache.org/jira/browse/HIVE-26804?focusedWorklogId=841358&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-841358
 ]

ASF GitHub Bot logged work on HIVE-26804:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 24/Jan/23 11:18
            Start Date: 24/Jan/23 11:18
    Worklog Time Spent: 10m 
      Work Description: rkirtir commented on code in PR #3880:
URL: https://github.com/apache/hive/pull/3880#discussion_r1085143053


##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java:
##########
@@ -6266,4 +6271,110 @@ public boolean isWrapperFor(Class<?> iface) throws 
SQLException {
     }
   }
 
+  @Override
+  @RetrySemantics.SafeToRetry
+  public AbortCompactResponse abortCompactions(AbortCompactionRequest reqst) 
throws MetaException, NoSuchCompactionException {
+    Map<Long, AbortCompactionResponseElement> abortCompactionResponseElements 
= new HashMap<>();
+    AbortCompactResponse response = new AbortCompactResponse(new HashMap<>());
+    response.setAbortedcompacts(abortCompactionResponseElements);
+
+    List<Long> compactionIdsToAbort = reqst.getCompactionIds();
+    if (compactionIdsToAbort.isEmpty()) {
+      LOG.info("Compaction ids are missing in request. No compactions to 
abort");
+      throw new NoSuchCompactionException("Compaction ids missing in request. 
No compactions to abort");
+    }
+    reqst.getCompactionIds().forEach(x -> {
+      addAbortCompactionResponse(abortCompactionResponseElements,x,  "No Such 
Compaction Id Available","Error");
+    });
+
+    List<CompactionInfo> eligibleCompactionsToAbort = 
findEligibleCompactionsToAbort(abortCompactionResponseElements,compactionIdsToAbort);
+    for (int x = 0; x < eligibleCompactionsToAbort.size(); x++) {
+      abortCompaction(abortCompactionResponseElements, 
eligibleCompactionsToAbort.get(x));
+    }
+    return response;
+  }
+
+  @RetrySemantics.SafeToRetry
+  public void abortCompaction(Map<Long, AbortCompactionResponseElement> 
abortCompactionResponseElements,
+                              CompactionInfo compactionInfo) throws 
MetaException {
+    try {
+      try (Connection dbConn = 
getDbConn(Connection.TRANSACTION_READ_COMMITTED, connPoolMutex);
+           PreparedStatement pStmt = 
dbConn.prepareStatement(TxnQueries.INSERT_INTO_COMPLETED_COMPACTION)) {
+        compactionInfo.state = TxnStore.ABORTED_STATE;
+        compactionInfo.errorMessage = "Comapction Aborted by Abort Comapction 
request.";
+        CompactionInfo.insertIntoCompletedCompactions(pStmt, compactionInfo, 
getDbTime(dbConn));
+        int updCount = pStmt.executeUpdate();
+        if (updCount != 1) {
+          LOG.error("Unable to update compaction record: {}. updCnt={}", 
compactionInfo, updCount);
+          dbConn.rollback();
+          addAbortCompactionResponse(abortCompactionResponseElements, 
compactionInfo.id,
+                  "Error while aborting compaction:Unable to update compaction 
record in COMPLETED_COMPACTIONS", "Error");
+        } else {
+          LOG.debug("Inserted {} entries into COMPLETED_COMPACTIONS", 
updCount);
+          try (PreparedStatement stmt = dbConn.prepareStatement("DELETE FROM 
\"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = ?")) {
+            stmt.setLong(1, compactionInfo.id);
+            LOG.debug("Going to execute update on COMPACTION_QUEUE ");
+            updCount = stmt.executeUpdate();
+            if (updCount != 1) {
+              LOG.error("Unable to update compaction record: {}. updCnt={}", 
compactionInfo, updCount);
+              dbConn.rollback();
+              addAbortCompactionResponse(abortCompactionResponseElements, 
compactionInfo.id,
+                      "Error while aborting compaction: Unable to update 
compaction record in COMPACTION_QUEUE", "Error");
+            } else {
+              dbConn.commit();
+              addAbortCompactionResponse(abortCompactionResponseElements, 
compactionInfo.id, "Successfully aborted compaction",
+                      "Success");
+            }
+          } catch (SQLException e) {
+            dbConn.rollback();
+            addAbortCompactionResponse(abortCompactionResponseElements, 
compactionInfo.id,
+                    "Error while aborting compaction:"+e.getMessage(), 
"Error");
+          }
+        }
+      } catch (SQLException e) {
+        LOG.error("Unable to connect to transaction database: " + 
e.getMessage());
+        checkRetryable(e, "abortCompaction(" + compactionInfo + ")");
+        addAbortCompactionResponse(abortCompactionResponseElements, 
compactionInfo.id,
+                "Error while aborting compaction:"+ e.getMessage(), "Error" );
+      }
+    } catch (RetryException e) {
+      abortCompaction(abortCompactionResponseElements, compactionInfo);
+    }
+  }
+
+  private List<CompactionInfo> findEligibleCompactionsToAbort(Map<Long,
+          AbortCompactionResponseElement> abortCompactionResponseElements, 
List<Long> requestedCompId) throws MetaException {
+
+    List<CompactionInfo> compactionInfoList = new ArrayList<>();
+    String queryText = TxnQueries.SELECT_COMPACTION_QUEUE_BY_COMPID + "  WHERE 
\"CC_ID\" IN " +
+            "(" + Joiner.on(',').join(requestedCompId) + ") ";
+    try (Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+         PreparedStatement pStmt = dbConn.prepareStatement(queryText)) {
+      try (ResultSet rs = pStmt.executeQuery()) {
+        while (rs.next()) {
+          if (checkIfCompactionEligibleToAbort(rs.getString(5).charAt(0))) {
+            
compactionInfoList.add(CompactionInfo.loadFullFromCompactionQueue(rs));
+          } else {
+            addAbortCompactionResponse(abortCompactionResponseElements, 
rs.getLong(1),
+              "Error while aborting compaction as compaction is in state-" +
+                      CompactionState.fromSqlConst(rs.getString(5).charAt(0)), 
"Error");
+          }
+        }
+      }
+    } catch (SQLException e) {
+      throw new MetaException("Unable to select from transaction database-" + 
StringUtils.stringifyException(e));
+    }
+    return compactionInfoList;
+  }
+
+  private boolean checkIfCompactionEligibleToAbort(char state) {
+
+    return 
CompactionState.INITIATED.equals(CompactionState.fromSqlConst(state));
+  }
+
+  private void addAbortCompactionResponse(Map<Long, 
AbortCompactionResponseElement> abortCompactionResponseElements,
+                                          long id, String message, String 
status) {
+
+    abortCompactionResponseElements.put(id, new 
AbortCompactionResponseElement(id, status, message));
+  }

Review Comment:
   fixed



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java:
##########
@@ -6266,4 +6271,110 @@ public boolean isWrapperFor(Class<?> iface) throws 
SQLException {
     }
   }
 
+  @Override
+  @RetrySemantics.SafeToRetry
+  public AbortCompactResponse abortCompactions(AbortCompactionRequest reqst) 
throws MetaException, NoSuchCompactionException {
+    Map<Long, AbortCompactionResponseElement> abortCompactionResponseElements 
= new HashMap<>();
+    AbortCompactResponse response = new AbortCompactResponse(new HashMap<>());
+    response.setAbortedcompacts(abortCompactionResponseElements);
+
+    List<Long> compactionIdsToAbort = reqst.getCompactionIds();
+    if (compactionIdsToAbort.isEmpty()) {
+      LOG.info("Compaction ids are missing in request. No compactions to 
abort");
+      throw new NoSuchCompactionException("Compaction ids missing in request. 
No compactions to abort");
+    }
+    reqst.getCompactionIds().forEach(x -> {
+      addAbortCompactionResponse(abortCompactionResponseElements,x,  "No Such 
Compaction Id Available","Error");
+    });
+
+    List<CompactionInfo> eligibleCompactionsToAbort = 
findEligibleCompactionsToAbort(abortCompactionResponseElements,compactionIdsToAbort);
+    for (int x = 0; x < eligibleCompactionsToAbort.size(); x++) {
+      abortCompaction(abortCompactionResponseElements, 
eligibleCompactionsToAbort.get(x));
+    }
+    return response;
+  }
+
+  @RetrySemantics.SafeToRetry
+  public void abortCompaction(Map<Long, AbortCompactionResponseElement> 
abortCompactionResponseElements,

Review Comment:
   fixed





Issue Time Tracking
-------------------

    Worklog Id:     (was: 841358)
    Time Spent: 3.5h  (was: 3h 20m)

> Cancel Compactions in initiated state
> -------------------------------------
>
>                 Key: HIVE-26804
>                 URL: https://issues.apache.org/jira/browse/HIVE-26804
>             Project: Hive
>          Issue Type: New Feature
>          Components: Hive
>            Reporter: KIRTI RUGE
>            Assignee: KIRTI RUGE
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 3.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to