[ 
https://issues.apache.org/jira/browse/HIVE-26804?focusedWorklogId=841971&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-841971
 ]

ASF GitHub Bot logged work on HIVE-26804:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 27/Jan/23 10:15
            Start Date: 27/Jan/23 10:15
    Worklog Time Spent: 10m 
      Work Description: veghlaci05 commented on code in PR #3880:
URL: https://github.com/apache/hive/pull/3880#discussion_r1088761769


##########
ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java:
##########
@@ -3458,14 +3458,14 @@ public void testAbortCompaction() throws Exception {
     runStatementOnDriver("alter table mydb1.tbl1" + " PARTITION(ds='today') 
compact 'MAJOR'");
     TestTxnCommands2.runWorker(hiveConf);
 
-    runStatementOnDriver("drop table if exists T1");
-    runStatementOnDriver("create table T1 (a int, b int) stored as orc 
TBLPROPERTIES ('transactional'='true')");
-    runStatementOnDriver("insert into T1 values(0,2)");//makes delta_1_1 in T1
-    runStatementOnDriver("insert into T1 values(1,4)");//makes delta_2_2 in T2
+    runStatementOnDriver("drop table if exists myT1");
+    runStatementOnDriver("create table myT1 (a int, b int) stored as orc 
TBLPROPERTIES ('transactional'='true')");
+    runStatementOnDriver("insert into myT1 values(0,2)");//makes delta_1_1 in 
T1
+    runStatementOnDriver("insert into myT1 values(1,4)");//makes delta_2_2 in 
T2
 
     //create failed compaction attempt so that compactor txn is aborted
     HiveConf.setBoolVar(hiveConf, 
HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION, true);
-    runStatementOnDriver("alter table T1 compact 'minor'");
+    runStatementOnDriver("alter table myT1 compact 'minor'");

Review Comment:
   Is this change intended? If yes, why?



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java:
##########
@@ -6266,4 +6271,105 @@ public boolean isWrapperFor(Class<?> iface) throws 
SQLException {
     }
   }
 
+  @Override
+  @RetrySemantics.SafeToRetry
+  public AbortCompactResponse abortCompactions(AbortCompactionRequest reqst) 
throws MetaException, NoSuchCompactionException {
+    Map<Long, AbortCompactionResponseElement> abortCompactionResponseElements 
= new HashMap<>();
+    AbortCompactResponse response = new AbortCompactResponse(new HashMap<>());
+    response.setAbortedcompacts(abortCompactionResponseElements);
+
+    List<Long> compactionIdsToAbort = reqst.getCompactionIds();
+    if (compactionIdsToAbort.isEmpty()) {
+      LOG.info("Compaction ids are missing in request. No compactions to 
abort");
+      throw new NoSuchCompactionException("Compaction ids missing in request. 
No compactions to abort");
+    }
+    reqst.getCompactionIds().forEach(x -> {
+      abortCompactionResponseElements.put(x, new 
AbortCompactionResponseElement(x, "Error",
+              "No Such Compaction Id Available"));
+    });
+
+    List<CompactionInfo> eligibleCompactionsToAbort = 
findEligibleCompactionsToAbort(abortCompactionResponseElements,
+            compactionIdsToAbort);
+    for (int x = 0; x < eligibleCompactionsToAbort.size(); x++) {

Review Comment:
   You could replace it with
   `for (CompactionInfo compactionInfo : eligibleCompactionsToAbort) {`



##########
standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnQueries.java:
##########
@@ -18,36 +18,70 @@
 package org.apache.hadoop.hive.metastore.txn;
 
 public class TxnQueries {
-  public static final String SHOW_COMPACTION_ORDERBY_CLAUSE =
-    " ORDER BY CASE " +
-    "   WHEN \"CC_END\" > \"CC_START\" and \"CC_END\" > \"CC_COMMIT_TIME\" " +
-    "     THEN \"CC_END\" " +
-    "   WHEN \"CC_START\" > \"CC_COMMIT_TIME\" " +
-    "     THEN \"CC_START\" " +
-    "   ELSE \"CC_COMMIT_TIME\" " +
-    " END desc ," +
-    " \"CC_ENQUEUE_TIME\" asc";
+    public static final String SHOW_COMPACTION_ORDERBY_CLAUSE =
+            " ORDER BY CASE " +
+                    "   WHEN \"CC_END\" > \"CC_START\" and \"CC_END\" > 
\"CC_COMMIT_TIME\" " +
+                    "     THEN \"CC_END\" " +
+                    "   WHEN \"CC_START\" > \"CC_COMMIT_TIME\" " +
+                    "     THEN \"CC_START\" " +
+                    "   ELSE \"CC_COMMIT_TIME\" " +
+                    " END desc ," +
+                    " \"CC_ENQUEUE_TIME\" asc";
 
-  public static final String SHOW_COMPACTION_QUERY = 
-    "SELECT XX.* FROM ( SELECT " +
-    "  \"CQ_DATABASE\" AS \"CC_DATABASE\", \"CQ_TABLE\" AS \"CC_TABLE\", 
\"CQ_PARTITION\" AS \"CC_PARTITION\", " +
-    "  \"CQ_STATE\" AS \"CC_STATE\", \"CQ_TYPE\" AS \"CC_TYPE\", 
\"CQ_WORKER_ID\" AS \"CC_WORKER_ID\", " +
-    "  \"CQ_START\" AS \"CC_START\", -1 \"CC_END\", \"CQ_RUN_AS\" AS 
\"CC_RUN_AS\", " +
-    "  \"CQ_HADOOP_JOB_ID\" AS \"CC_HADOOP_JOB_ID\", \"CQ_ID\" AS \"CC_ID\", 
\"CQ_ERROR_MESSAGE\" AS \"CC_ERROR_MESSAGE\", " +
-    "  \"CQ_ENQUEUE_TIME\" AS \"CC_ENQUEUE_TIME\", \"CQ_WORKER_VERSION\" AS 
\"CC_WORKER_VERSION\", " +
-    "  \"CQ_INITIATOR_ID\" AS \"CC_INITIATOR_ID\", \"CQ_INITIATOR_VERSION\" AS 
\"CC_INITIATOR_VERSION\", " +
-    "  \"CQ_CLEANER_START\" AS \"CC_CLEANER_START\", \"CQ_POOL_NAME\" AS 
\"CC_POOL_NAME\", \"CQ_TXN_ID\" AS \"CC_TXN_ID\", " +
-    "  \"CQ_NEXT_TXN_ID\" AS \"CC_NEXT_TXN_ID\", \"CQ_COMMIT_TIME\" AS 
\"CC_COMMIT_TIME\", " +
-    "  \"CQ_HIGHEST_WRITE_ID\" AS \"CC_HIGHEST_WRITE_ID\" " +
-    "FROM " +
-    "  \"COMPACTION_QUEUE\" " +
-    "UNION ALL " +
-    "SELECT " +
-    "  \"CC_DATABASE\", \"CC_TABLE\", \"CC_PARTITION\", \"CC_STATE\", 
\"CC_TYPE\", \"CC_WORKER_ID\", " +
-    "  \"CC_START\", \"CC_END\", \"CC_RUN_AS\", \"CC_HADOOP_JOB_ID\", 
\"CC_ID\", \"CC_ERROR_MESSAGE\", " +
-    "  \"CC_ENQUEUE_TIME\", \"CC_WORKER_VERSION\", \"CC_INITIATOR_ID\", 
\"CC_INITIATOR_VERSION\", " +
-    "   -1 , \"CC_POOL_NAME\", \"CC_TXN_ID\", \"CC_NEXT_TXN_ID\", 
\"CC_COMMIT_TIME\", " +
-    "  \"CC_HIGHEST_WRITE_ID\"" +
-    "FROM " +
-    "  \"COMPLETED_COMPACTIONS\" ) XX ";
+    public static final String SHOW_COMPACTION_QUERY =
+            "SELECT XX.* FROM ( SELECT " +
+                    "  \"CQ_DATABASE\" AS \"CC_DATABASE\", \"CQ_TABLE\" AS 
\"CC_TABLE\", \"CQ_PARTITION\" AS \"CC_PARTITION\", " +
+                    "  \"CQ_STATE\" AS \"CC_STATE\", \"CQ_TYPE\" AS 
\"CC_TYPE\", \"CQ_WORKER_ID\" AS \"CC_WORKER_ID\", " +
+                    "  \"CQ_START\" AS \"CC_START\", -1 \"CC_END\", 
\"CQ_RUN_AS\" AS \"CC_RUN_AS\", " +
+                    "  \"CQ_HADOOP_JOB_ID\" AS \"CC_HADOOP_JOB_ID\", \"CQ_ID\" 
AS \"CC_ID\", \"CQ_ERROR_MESSAGE\" AS \"CC_ERROR_MESSAGE\", " +
+                    "  \"CQ_ENQUEUE_TIME\" AS \"CC_ENQUEUE_TIME\", 
\"CQ_WORKER_VERSION\" AS \"CC_WORKER_VERSION\", " +
+                    "  \"CQ_INITIATOR_ID\" AS \"CC_INITIATOR_ID\", 
\"CQ_INITIATOR_VERSION\" AS \"CC_INITIATOR_VERSION\", " +
+                    "  \"CQ_CLEANER_START\" AS \"CC_CLEANER_START\", 
\"CQ_POOL_NAME\" AS \"CC_POOL_NAME\", \"CQ_TXN_ID\" AS \"CC_TXN_ID\", " +
+                    "  \"CQ_NEXT_TXN_ID\" AS \"CC_NEXT_TXN_ID\", 
\"CQ_COMMIT_TIME\" AS \"CC_COMMIT_TIME\", " +
+                    "  \"CQ_HIGHEST_WRITE_ID\" AS \"CC_HIGHEST_WRITE_ID\" " +
+                    "FROM " +
+                    "  \"COMPACTION_QUEUE\" " +
+                    "UNION ALL " +
+                    "SELECT " +
+                    "  \"CC_DATABASE\" , \"CC_TABLE\", \"CC_PARTITION\", 
\"CC_STATE\", \"CC_TYPE\", \"CC_WORKER_ID\", " +
+                    "  \"CC_START\", \"CC_END\", \"CC_RUN_AS\", 
\"CC_HADOOP_JOB_ID\", \"CC_ID\", \"CC_ERROR_MESSAGE\", " +
+                    "  \"CC_ENQUEUE_TIME\", \"CC_WORKER_VERSION\", 
\"CC_INITIATOR_ID\", \"CC_INITIATOR_VERSION\", " +
+                    "   -1 , \"CC_POOL_NAME\", \"CC_TXN_ID\", 
\"CC_NEXT_TXN_ID\", \"CC_COMMIT_TIME\", " +
+                    "  \"CC_HIGHEST_WRITE_ID\"" +
+                    "FROM " +
+                    "  \"COMPLETED_COMPACTIONS\" ) XX ";
+
+
+    public static final String SELECT_COMPACTION_QUEUE_BY_COMPID =

Review Comment:
   There is no need for aliases, your are accessing the data by indexes. If the 
number and thype of the columns are equal, the coulmn names can differ.



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java:
##########
@@ -6266,4 +6271,105 @@ public boolean isWrapperFor(Class<?> iface) throws 
SQLException {
     }
   }
 
+  @Override
+  @RetrySemantics.SafeToRetry
+  public AbortCompactResponse abortCompactions(AbortCompactionRequest reqst) 
throws MetaException, NoSuchCompactionException {
+    Map<Long, AbortCompactionResponseElement> abortCompactionResponseElements 
= new HashMap<>();
+    AbortCompactResponse response = new AbortCompactResponse(new HashMap<>());
+    response.setAbortedcompacts(abortCompactionResponseElements);
+
+    List<Long> compactionIdsToAbort = reqst.getCompactionIds();
+    if (compactionIdsToAbort.isEmpty()) {
+      LOG.info("Compaction ids are missing in request. No compactions to 
abort");
+      throw new NoSuchCompactionException("Compaction ids missing in request. 
No compactions to abort");
+    }
+    reqst.getCompactionIds().forEach(x -> {
+      abortCompactionResponseElements.put(x, new 
AbortCompactionResponseElement(x, "Error",
+              "No Such Compaction Id Available"));
+    });
+
+    List<CompactionInfo> eligibleCompactionsToAbort = 
findEligibleCompactionsToAbort(abortCompactionResponseElements,
+            compactionIdsToAbort);
+    for (int x = 0; x < eligibleCompactionsToAbort.size(); x++) {
+      
abortCompactionResponseElements.put(eligibleCompactionsToAbort.get(x).id, 
abortCompaction(eligibleCompactionsToAbort.get(x)));
+    }
+    return response;
+  }
+
+  @RetrySemantics.SafeToRetry
+  public AbortCompactionResponseElement abortCompaction(CompactionInfo 
compactionInfo) throws MetaException {
+    try {
+      try (Connection dbConn = 
getDbConn(Connection.TRANSACTION_READ_COMMITTED, connPoolMutex);
+           PreparedStatement pStmt = 
dbConn.prepareStatement(TxnQueries.INSERT_INTO_COMPLETED_COMPACTION)) {
+        compactionInfo.state = TxnStore.ABORTED_STATE;
+        compactionInfo.errorMessage = "Comapction Aborted by Abort Comapction 
request.";
+        CompactionInfo.insertIntoCompletedCompactions(pStmt, compactionInfo, 
getDbTime(dbConn));
+        int updCount = pStmt.executeUpdate();
+        if (updCount != 1) {
+          LOG.error("Unable to update compaction record: {}. updCnt={}", 
compactionInfo, updCount);
+          dbConn.rollback();
+          return new AbortCompactionResponseElement(compactionInfo.id,
+                  "Error", "Error while aborting compaction:Unable to update 
compaction record in COMPLETED_COMPACTIONS");
+        } else {
+          LOG.debug("Inserted {} entries into COMPLETED_COMPACTIONS", 
updCount);
+          try (PreparedStatement stmt = dbConn.prepareStatement("DELETE FROM 
\"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = ?")) {
+            stmt.setLong(1, compactionInfo.id);
+            LOG.debug("Going to execute update on COMPACTION_QUEUE ");
+            updCount = stmt.executeUpdate();
+            if (updCount != 1) {
+              LOG.error("Unable to update compaction record: {}. updCnt={}", 
compactionInfo, updCount);
+              dbConn.rollback();
+              return new AbortCompactionResponseElement(compactionInfo.id,
+                      "Error", "Error while aborting compaction: Unable to 
update compaction record in COMPACTION_QUEUE");
+            } else {
+              dbConn.commit();
+              return new AbortCompactionResponseElement(compactionInfo.id,
+                      "Success", "Successfully aborted compaction");
+            }
+          } catch (SQLException e) {
+            dbConn.rollback();
+            return new AbortCompactionResponseElement(compactionInfo.id,
+                    "Error", "Error while aborting compaction:" + 
e.getMessage());
+          }
+        }
+      } catch (SQLException e) {
+        LOG.error("Unable to connect to transaction database: " + 
e.getMessage());
+        checkRetryable(e, "abortCompaction(" + compactionInfo + ")");
+        return new AbortCompactionResponseElement(compactionInfo.id,
+                "Error", "Error while aborting compaction:" + e.getMessage());
+      }
+    } catch (RetryException e) {
+      return abortCompaction(compactionInfo);
+    }
+  }
+
+  private List<CompactionInfo> findEligibleCompactionsToAbort(Map<Long,
+          AbortCompactionResponseElement> abortCompactionResponseElements, 
List<Long> requestedCompId) throws MetaException {
+
+    List<CompactionInfo> compactionInfoList = new ArrayList<>();
+    String queryText = TxnQueries.SELECT_COMPACTION_QUEUE_BY_COMPID + "  WHERE 
\"CC_ID\" IN (?) " ;
+    String sqlIN = requestedCompId.stream()
+            .map(x -> String.valueOf(x))
+            .collect(Collectors.joining(",", "(", ")"));
+    queryText = queryText.replace("(?)", sqlIN);

Review Comment:
   You could simply append the in clause to the end of the query, there's no 
need of replace here.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 841971)
    Time Spent: 4h 20m  (was: 4h 10m)

> Cancel Compactions in initiated state
> -------------------------------------
>
>                 Key: HIVE-26804
>                 URL: https://issues.apache.org/jira/browse/HIVE-26804
>             Project: Hive
>          Issue Type: New Feature
>          Components: Hive
>            Reporter: KIRTI RUGE
>            Assignee: KIRTI RUGE
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 4h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to