veghlaci05 commented on code in PR #3880:
URL: https://github.com/apache/hive/pull/3880#discussion_r1088761769


##########
ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java:
##########
@@ -3458,14 +3458,14 @@ public void testAbortCompaction() throws Exception {
     runStatementOnDriver("alter table mydb1.tbl1" + " PARTITION(ds='today') 
compact 'MAJOR'");
     TestTxnCommands2.runWorker(hiveConf);
 
-    runStatementOnDriver("drop table if exists T1");
-    runStatementOnDriver("create table T1 (a int, b int) stored as orc 
TBLPROPERTIES ('transactional'='true')");
-    runStatementOnDriver("insert into T1 values(0,2)");//makes delta_1_1 in T1
-    runStatementOnDriver("insert into T1 values(1,4)");//makes delta_2_2 in T2
+    runStatementOnDriver("drop table if exists myT1");
+    runStatementOnDriver("create table myT1 (a int, b int) stored as orc 
TBLPROPERTIES ('transactional'='true')");
+    runStatementOnDriver("insert into myT1 values(0,2)");//makes delta_1_1 in 
T1
+    runStatementOnDriver("insert into myT1 values(1,4)");//makes delta_2_2 in 
T2
 
     //create failed compaction attempt so that compactor txn is aborted
     HiveConf.setBoolVar(hiveConf, 
HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION, true);
-    runStatementOnDriver("alter table T1 compact 'minor'");
+    runStatementOnDriver("alter table myT1 compact 'minor'");

Review Comment:
   Is this change intended? If yes, why?



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java:
##########
@@ -6266,4 +6271,105 @@ public boolean isWrapperFor(Class<?> iface) throws 
SQLException {
     }
   }
 
+  @Override
+  @RetrySemantics.SafeToRetry
+  public AbortCompactResponse abortCompactions(AbortCompactionRequest reqst) 
throws MetaException, NoSuchCompactionException {
+    Map<Long, AbortCompactionResponseElement> abortCompactionResponseElements 
= new HashMap<>();
+    AbortCompactResponse response = new AbortCompactResponse(new HashMap<>());
+    response.setAbortedcompacts(abortCompactionResponseElements);
+
+    List<Long> compactionIdsToAbort = reqst.getCompactionIds();
+    if (compactionIdsToAbort.isEmpty()) {
+      LOG.info("Compaction ids are missing in request. No compactions to 
abort");
+      throw new NoSuchCompactionException("Compaction ids missing in request. 
No compactions to abort");
+    }
+    reqst.getCompactionIds().forEach(x -> {
+      abortCompactionResponseElements.put(x, new 
AbortCompactionResponseElement(x, "Error",
+              "No Such Compaction Id Available"));
+    });
+
+    List<CompactionInfo> eligibleCompactionsToAbort = 
findEligibleCompactionsToAbort(abortCompactionResponseElements,
+            compactionIdsToAbort);
+    for (int x = 0; x < eligibleCompactionsToAbort.size(); x++) {

Review Comment:
   You could replace it with
   `for (CompactionInfo compactionInfo : eligibleCompactionsToAbort) {`



##########
standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnQueries.java:
##########
@@ -18,36 +18,70 @@
 package org.apache.hadoop.hive.metastore.txn;
 
 public class TxnQueries {
-  public static final String SHOW_COMPACTION_ORDERBY_CLAUSE =
-    " ORDER BY CASE " +
-    "   WHEN \"CC_END\" > \"CC_START\" and \"CC_END\" > \"CC_COMMIT_TIME\" " +
-    "     THEN \"CC_END\" " +
-    "   WHEN \"CC_START\" > \"CC_COMMIT_TIME\" " +
-    "     THEN \"CC_START\" " +
-    "   ELSE \"CC_COMMIT_TIME\" " +
-    " END desc ," +
-    " \"CC_ENQUEUE_TIME\" asc";
+    public static final String SHOW_COMPACTION_ORDERBY_CLAUSE =
+            " ORDER BY CASE " +
+                    "   WHEN \"CC_END\" > \"CC_START\" and \"CC_END\" > 
\"CC_COMMIT_TIME\" " +
+                    "     THEN \"CC_END\" " +
+                    "   WHEN \"CC_START\" > \"CC_COMMIT_TIME\" " +
+                    "     THEN \"CC_START\" " +
+                    "   ELSE \"CC_COMMIT_TIME\" " +
+                    " END desc ," +
+                    " \"CC_ENQUEUE_TIME\" asc";
 
-  public static final String SHOW_COMPACTION_QUERY = 
-    "SELECT XX.* FROM ( SELECT " +
-    "  \"CQ_DATABASE\" AS \"CC_DATABASE\", \"CQ_TABLE\" AS \"CC_TABLE\", 
\"CQ_PARTITION\" AS \"CC_PARTITION\", " +
-    "  \"CQ_STATE\" AS \"CC_STATE\", \"CQ_TYPE\" AS \"CC_TYPE\", 
\"CQ_WORKER_ID\" AS \"CC_WORKER_ID\", " +
-    "  \"CQ_START\" AS \"CC_START\", -1 \"CC_END\", \"CQ_RUN_AS\" AS 
\"CC_RUN_AS\", " +
-    "  \"CQ_HADOOP_JOB_ID\" AS \"CC_HADOOP_JOB_ID\", \"CQ_ID\" AS \"CC_ID\", 
\"CQ_ERROR_MESSAGE\" AS \"CC_ERROR_MESSAGE\", " +
-    "  \"CQ_ENQUEUE_TIME\" AS \"CC_ENQUEUE_TIME\", \"CQ_WORKER_VERSION\" AS 
\"CC_WORKER_VERSION\", " +
-    "  \"CQ_INITIATOR_ID\" AS \"CC_INITIATOR_ID\", \"CQ_INITIATOR_VERSION\" AS 
\"CC_INITIATOR_VERSION\", " +
-    "  \"CQ_CLEANER_START\" AS \"CC_CLEANER_START\", \"CQ_POOL_NAME\" AS 
\"CC_POOL_NAME\", \"CQ_TXN_ID\" AS \"CC_TXN_ID\", " +
-    "  \"CQ_NEXT_TXN_ID\" AS \"CC_NEXT_TXN_ID\", \"CQ_COMMIT_TIME\" AS 
\"CC_COMMIT_TIME\", " +
-    "  \"CQ_HIGHEST_WRITE_ID\" AS \"CC_HIGHEST_WRITE_ID\" " +
-    "FROM " +
-    "  \"COMPACTION_QUEUE\" " +
-    "UNION ALL " +
-    "SELECT " +
-    "  \"CC_DATABASE\", \"CC_TABLE\", \"CC_PARTITION\", \"CC_STATE\", 
\"CC_TYPE\", \"CC_WORKER_ID\", " +
-    "  \"CC_START\", \"CC_END\", \"CC_RUN_AS\", \"CC_HADOOP_JOB_ID\", 
\"CC_ID\", \"CC_ERROR_MESSAGE\", " +
-    "  \"CC_ENQUEUE_TIME\", \"CC_WORKER_VERSION\", \"CC_INITIATOR_ID\", 
\"CC_INITIATOR_VERSION\", " +
-    "   -1 , \"CC_POOL_NAME\", \"CC_TXN_ID\", \"CC_NEXT_TXN_ID\", 
\"CC_COMMIT_TIME\", " +
-    "  \"CC_HIGHEST_WRITE_ID\"" +
-    "FROM " +
-    "  \"COMPLETED_COMPACTIONS\" ) XX ";
+    public static final String SHOW_COMPACTION_QUERY =
+            "SELECT XX.* FROM ( SELECT " +
+                    "  \"CQ_DATABASE\" AS \"CC_DATABASE\", \"CQ_TABLE\" AS 
\"CC_TABLE\", \"CQ_PARTITION\" AS \"CC_PARTITION\", " +
+                    "  \"CQ_STATE\" AS \"CC_STATE\", \"CQ_TYPE\" AS 
\"CC_TYPE\", \"CQ_WORKER_ID\" AS \"CC_WORKER_ID\", " +
+                    "  \"CQ_START\" AS \"CC_START\", -1 \"CC_END\", 
\"CQ_RUN_AS\" AS \"CC_RUN_AS\", " +
+                    "  \"CQ_HADOOP_JOB_ID\" AS \"CC_HADOOP_JOB_ID\", \"CQ_ID\" 
AS \"CC_ID\", \"CQ_ERROR_MESSAGE\" AS \"CC_ERROR_MESSAGE\", " +
+                    "  \"CQ_ENQUEUE_TIME\" AS \"CC_ENQUEUE_TIME\", 
\"CQ_WORKER_VERSION\" AS \"CC_WORKER_VERSION\", " +
+                    "  \"CQ_INITIATOR_ID\" AS \"CC_INITIATOR_ID\", 
\"CQ_INITIATOR_VERSION\" AS \"CC_INITIATOR_VERSION\", " +
+                    "  \"CQ_CLEANER_START\" AS \"CC_CLEANER_START\", 
\"CQ_POOL_NAME\" AS \"CC_POOL_NAME\", \"CQ_TXN_ID\" AS \"CC_TXN_ID\", " +
+                    "  \"CQ_NEXT_TXN_ID\" AS \"CC_NEXT_TXN_ID\", 
\"CQ_COMMIT_TIME\" AS \"CC_COMMIT_TIME\", " +
+                    "  \"CQ_HIGHEST_WRITE_ID\" AS \"CC_HIGHEST_WRITE_ID\" " +
+                    "FROM " +
+                    "  \"COMPACTION_QUEUE\" " +
+                    "UNION ALL " +
+                    "SELECT " +
+                    "  \"CC_DATABASE\" , \"CC_TABLE\", \"CC_PARTITION\", 
\"CC_STATE\", \"CC_TYPE\", \"CC_WORKER_ID\", " +
+                    "  \"CC_START\", \"CC_END\", \"CC_RUN_AS\", 
\"CC_HADOOP_JOB_ID\", \"CC_ID\", \"CC_ERROR_MESSAGE\", " +
+                    "  \"CC_ENQUEUE_TIME\", \"CC_WORKER_VERSION\", 
\"CC_INITIATOR_ID\", \"CC_INITIATOR_VERSION\", " +
+                    "   -1 , \"CC_POOL_NAME\", \"CC_TXN_ID\", 
\"CC_NEXT_TXN_ID\", \"CC_COMMIT_TIME\", " +
+                    "  \"CC_HIGHEST_WRITE_ID\"" +
+                    "FROM " +
+                    "  \"COMPLETED_COMPACTIONS\" ) XX ";
+
+
+    public static final String SELECT_COMPACTION_QUEUE_BY_COMPID =

Review Comment:
   There is no need for aliases, your are accessing the data by indexes. If the 
number and thype of the columns are equal, the coulmn names can differ.



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java:
##########
@@ -6266,4 +6271,105 @@ public boolean isWrapperFor(Class<?> iface) throws 
SQLException {
     }
   }
 
+  @Override
+  @RetrySemantics.SafeToRetry
+  public AbortCompactResponse abortCompactions(AbortCompactionRequest reqst) 
throws MetaException, NoSuchCompactionException {
+    Map<Long, AbortCompactionResponseElement> abortCompactionResponseElements 
= new HashMap<>();
+    AbortCompactResponse response = new AbortCompactResponse(new HashMap<>());
+    response.setAbortedcompacts(abortCompactionResponseElements);
+
+    List<Long> compactionIdsToAbort = reqst.getCompactionIds();
+    if (compactionIdsToAbort.isEmpty()) {
+      LOG.info("Compaction ids are missing in request. No compactions to 
abort");
+      throw new NoSuchCompactionException("Compaction ids missing in request. 
No compactions to abort");
+    }
+    reqst.getCompactionIds().forEach(x -> {
+      abortCompactionResponseElements.put(x, new 
AbortCompactionResponseElement(x, "Error",
+              "No Such Compaction Id Available"));
+    });
+
+    List<CompactionInfo> eligibleCompactionsToAbort = 
findEligibleCompactionsToAbort(abortCompactionResponseElements,
+            compactionIdsToAbort);
+    for (int x = 0; x < eligibleCompactionsToAbort.size(); x++) {
+      
abortCompactionResponseElements.put(eligibleCompactionsToAbort.get(x).id, 
abortCompaction(eligibleCompactionsToAbort.get(x)));
+    }
+    return response;
+  }
+
+  @RetrySemantics.SafeToRetry
+  public AbortCompactionResponseElement abortCompaction(CompactionInfo 
compactionInfo) throws MetaException {
+    try {
+      try (Connection dbConn = 
getDbConn(Connection.TRANSACTION_READ_COMMITTED, connPoolMutex);
+           PreparedStatement pStmt = 
dbConn.prepareStatement(TxnQueries.INSERT_INTO_COMPLETED_COMPACTION)) {
+        compactionInfo.state = TxnStore.ABORTED_STATE;
+        compactionInfo.errorMessage = "Comapction Aborted by Abort Comapction 
request.";
+        CompactionInfo.insertIntoCompletedCompactions(pStmt, compactionInfo, 
getDbTime(dbConn));
+        int updCount = pStmt.executeUpdate();
+        if (updCount != 1) {
+          LOG.error("Unable to update compaction record: {}. updCnt={}", 
compactionInfo, updCount);
+          dbConn.rollback();
+          return new AbortCompactionResponseElement(compactionInfo.id,
+                  "Error", "Error while aborting compaction:Unable to update 
compaction record in COMPLETED_COMPACTIONS");
+        } else {
+          LOG.debug("Inserted {} entries into COMPLETED_COMPACTIONS", 
updCount);
+          try (PreparedStatement stmt = dbConn.prepareStatement("DELETE FROM 
\"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = ?")) {
+            stmt.setLong(1, compactionInfo.id);
+            LOG.debug("Going to execute update on COMPACTION_QUEUE ");
+            updCount = stmt.executeUpdate();
+            if (updCount != 1) {
+              LOG.error("Unable to update compaction record: {}. updCnt={}", 
compactionInfo, updCount);
+              dbConn.rollback();
+              return new AbortCompactionResponseElement(compactionInfo.id,
+                      "Error", "Error while aborting compaction: Unable to 
update compaction record in COMPACTION_QUEUE");
+            } else {
+              dbConn.commit();
+              return new AbortCompactionResponseElement(compactionInfo.id,
+                      "Success", "Successfully aborted compaction");
+            }
+          } catch (SQLException e) {
+            dbConn.rollback();
+            return new AbortCompactionResponseElement(compactionInfo.id,
+                    "Error", "Error while aborting compaction:" + 
e.getMessage());
+          }
+        }
+      } catch (SQLException e) {
+        LOG.error("Unable to connect to transaction database: " + 
e.getMessage());
+        checkRetryable(e, "abortCompaction(" + compactionInfo + ")");
+        return new AbortCompactionResponseElement(compactionInfo.id,
+                "Error", "Error while aborting compaction:" + e.getMessage());
+      }
+    } catch (RetryException e) {
+      return abortCompaction(compactionInfo);
+    }
+  }
+
+  private List<CompactionInfo> findEligibleCompactionsToAbort(Map<Long,
+          AbortCompactionResponseElement> abortCompactionResponseElements, 
List<Long> requestedCompId) throws MetaException {
+
+    List<CompactionInfo> compactionInfoList = new ArrayList<>();
+    String queryText = TxnQueries.SELECT_COMPACTION_QUEUE_BY_COMPID + "  WHERE 
\"CC_ID\" IN (?) " ;
+    String sqlIN = requestedCompId.stream()
+            .map(x -> String.valueOf(x))
+            .collect(Collectors.joining(",", "(", ")"));
+    queryText = queryText.replace("(?)", sqlIN);

Review Comment:
   You could simply append the in clause to the end of the query, there's no 
need of replace here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org

Reply via email to