veghlaci05 commented on code in PR #3880: URL: https://github.com/apache/hive/pull/3880#discussion_r1088761769
########## ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java: ########## @@ -3458,14 +3458,14 @@ public void testAbortCompaction() throws Exception { runStatementOnDriver("alter table mydb1.tbl1" + " PARTITION(ds='today') compact 'MAJOR'"); TestTxnCommands2.runWorker(hiveConf); - runStatementOnDriver("drop table if exists T1"); - runStatementOnDriver("create table T1 (a int, b int) stored as orc TBLPROPERTIES ('transactional'='true')"); - runStatementOnDriver("insert into T1 values(0,2)");//makes delta_1_1 in T1 - runStatementOnDriver("insert into T1 values(1,4)");//makes delta_2_2 in T2 + runStatementOnDriver("drop table if exists myT1"); + runStatementOnDriver("create table myT1 (a int, b int) stored as orc TBLPROPERTIES ('transactional'='true')"); + runStatementOnDriver("insert into myT1 values(0,2)");//makes delta_1_1 in T1 + runStatementOnDriver("insert into myT1 values(1,4)");//makes delta_2_2 in T2 //create failed compaction attempt so that compactor txn is aborted HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION, true); - runStatementOnDriver("alter table T1 compact 'minor'"); + runStatementOnDriver("alter table myT1 compact 'minor'"); Review Comment: Is this change intended? If yes, why? ########## standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java: ########## @@ -6266,4 +6271,105 @@ public boolean isWrapperFor(Class<?> iface) throws SQLException { } } + @Override + @RetrySemantics.SafeToRetry + public AbortCompactResponse abortCompactions(AbortCompactionRequest reqst) throws MetaException, NoSuchCompactionException { + Map<Long, AbortCompactionResponseElement> abortCompactionResponseElements = new HashMap<>(); + AbortCompactResponse response = new AbortCompactResponse(new HashMap<>()); + response.setAbortedcompacts(abortCompactionResponseElements); + + List<Long> compactionIdsToAbort = reqst.getCompactionIds(); + if (compactionIdsToAbort.isEmpty()) { + LOG.info("Compaction ids are missing in request. No compactions to abort"); + throw new NoSuchCompactionException("Compaction ids missing in request. No compactions to abort"); + } + reqst.getCompactionIds().forEach(x -> { + abortCompactionResponseElements.put(x, new AbortCompactionResponseElement(x, "Error", + "No Such Compaction Id Available")); + }); + + List<CompactionInfo> eligibleCompactionsToAbort = findEligibleCompactionsToAbort(abortCompactionResponseElements, + compactionIdsToAbort); + for (int x = 0; x < eligibleCompactionsToAbort.size(); x++) { Review Comment: You could replace it with `for (CompactionInfo compactionInfo : eligibleCompactionsToAbort) {` ########## standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnQueries.java: ########## @@ -18,36 +18,70 @@ package org.apache.hadoop.hive.metastore.txn; public class TxnQueries { - public static final String SHOW_COMPACTION_ORDERBY_CLAUSE = - " ORDER BY CASE " + - " WHEN \"CC_END\" > \"CC_START\" and \"CC_END\" > \"CC_COMMIT_TIME\" " + - " THEN \"CC_END\" " + - " WHEN \"CC_START\" > \"CC_COMMIT_TIME\" " + - " THEN \"CC_START\" " + - " ELSE \"CC_COMMIT_TIME\" " + - " END desc ," + - " \"CC_ENQUEUE_TIME\" asc"; + public static final String SHOW_COMPACTION_ORDERBY_CLAUSE = + " ORDER BY CASE " + + " WHEN \"CC_END\" > \"CC_START\" and \"CC_END\" > \"CC_COMMIT_TIME\" " + + " THEN \"CC_END\" " + + " WHEN \"CC_START\" > \"CC_COMMIT_TIME\" " + + " THEN \"CC_START\" " + + " ELSE \"CC_COMMIT_TIME\" " + + " END desc ," + + " \"CC_ENQUEUE_TIME\" asc"; - public static final String SHOW_COMPACTION_QUERY = - "SELECT XX.* FROM ( SELECT " + - " \"CQ_DATABASE\" AS \"CC_DATABASE\", \"CQ_TABLE\" AS \"CC_TABLE\", \"CQ_PARTITION\" AS \"CC_PARTITION\", " + - " \"CQ_STATE\" AS \"CC_STATE\", \"CQ_TYPE\" AS \"CC_TYPE\", \"CQ_WORKER_ID\" AS \"CC_WORKER_ID\", " + - " \"CQ_START\" AS \"CC_START\", -1 \"CC_END\", \"CQ_RUN_AS\" AS \"CC_RUN_AS\", " + - " \"CQ_HADOOP_JOB_ID\" AS \"CC_HADOOP_JOB_ID\", \"CQ_ID\" AS \"CC_ID\", \"CQ_ERROR_MESSAGE\" AS \"CC_ERROR_MESSAGE\", " + - " \"CQ_ENQUEUE_TIME\" AS \"CC_ENQUEUE_TIME\", \"CQ_WORKER_VERSION\" AS \"CC_WORKER_VERSION\", " + - " \"CQ_INITIATOR_ID\" AS \"CC_INITIATOR_ID\", \"CQ_INITIATOR_VERSION\" AS \"CC_INITIATOR_VERSION\", " + - " \"CQ_CLEANER_START\" AS \"CC_CLEANER_START\", \"CQ_POOL_NAME\" AS \"CC_POOL_NAME\", \"CQ_TXN_ID\" AS \"CC_TXN_ID\", " + - " \"CQ_NEXT_TXN_ID\" AS \"CC_NEXT_TXN_ID\", \"CQ_COMMIT_TIME\" AS \"CC_COMMIT_TIME\", " + - " \"CQ_HIGHEST_WRITE_ID\" AS \"CC_HIGHEST_WRITE_ID\" " + - "FROM " + - " \"COMPACTION_QUEUE\" " + - "UNION ALL " + - "SELECT " + - " \"CC_DATABASE\", \"CC_TABLE\", \"CC_PARTITION\", \"CC_STATE\", \"CC_TYPE\", \"CC_WORKER_ID\", " + - " \"CC_START\", \"CC_END\", \"CC_RUN_AS\", \"CC_HADOOP_JOB_ID\", \"CC_ID\", \"CC_ERROR_MESSAGE\", " + - " \"CC_ENQUEUE_TIME\", \"CC_WORKER_VERSION\", \"CC_INITIATOR_ID\", \"CC_INITIATOR_VERSION\", " + - " -1 , \"CC_POOL_NAME\", \"CC_TXN_ID\", \"CC_NEXT_TXN_ID\", \"CC_COMMIT_TIME\", " + - " \"CC_HIGHEST_WRITE_ID\"" + - "FROM " + - " \"COMPLETED_COMPACTIONS\" ) XX "; + public static final String SHOW_COMPACTION_QUERY = + "SELECT XX.* FROM ( SELECT " + + " \"CQ_DATABASE\" AS \"CC_DATABASE\", \"CQ_TABLE\" AS \"CC_TABLE\", \"CQ_PARTITION\" AS \"CC_PARTITION\", " + + " \"CQ_STATE\" AS \"CC_STATE\", \"CQ_TYPE\" AS \"CC_TYPE\", \"CQ_WORKER_ID\" AS \"CC_WORKER_ID\", " + + " \"CQ_START\" AS \"CC_START\", -1 \"CC_END\", \"CQ_RUN_AS\" AS \"CC_RUN_AS\", " + + " \"CQ_HADOOP_JOB_ID\" AS \"CC_HADOOP_JOB_ID\", \"CQ_ID\" AS \"CC_ID\", \"CQ_ERROR_MESSAGE\" AS \"CC_ERROR_MESSAGE\", " + + " \"CQ_ENQUEUE_TIME\" AS \"CC_ENQUEUE_TIME\", \"CQ_WORKER_VERSION\" AS \"CC_WORKER_VERSION\", " + + " \"CQ_INITIATOR_ID\" AS \"CC_INITIATOR_ID\", \"CQ_INITIATOR_VERSION\" AS \"CC_INITIATOR_VERSION\", " + + " \"CQ_CLEANER_START\" AS \"CC_CLEANER_START\", \"CQ_POOL_NAME\" AS \"CC_POOL_NAME\", \"CQ_TXN_ID\" AS \"CC_TXN_ID\", " + + " \"CQ_NEXT_TXN_ID\" AS \"CC_NEXT_TXN_ID\", \"CQ_COMMIT_TIME\" AS \"CC_COMMIT_TIME\", " + + " \"CQ_HIGHEST_WRITE_ID\" AS \"CC_HIGHEST_WRITE_ID\" " + + "FROM " + + " \"COMPACTION_QUEUE\" " + + "UNION ALL " + + "SELECT " + + " \"CC_DATABASE\" , \"CC_TABLE\", \"CC_PARTITION\", \"CC_STATE\", \"CC_TYPE\", \"CC_WORKER_ID\", " + + " \"CC_START\", \"CC_END\", \"CC_RUN_AS\", \"CC_HADOOP_JOB_ID\", \"CC_ID\", \"CC_ERROR_MESSAGE\", " + + " \"CC_ENQUEUE_TIME\", \"CC_WORKER_VERSION\", \"CC_INITIATOR_ID\", \"CC_INITIATOR_VERSION\", " + + " -1 , \"CC_POOL_NAME\", \"CC_TXN_ID\", \"CC_NEXT_TXN_ID\", \"CC_COMMIT_TIME\", " + + " \"CC_HIGHEST_WRITE_ID\"" + + "FROM " + + " \"COMPLETED_COMPACTIONS\" ) XX "; + + + public static final String SELECT_COMPACTION_QUEUE_BY_COMPID = Review Comment: There is no need for aliases, your are accessing the data by indexes. If the number and thype of the columns are equal, the coulmn names can differ. ########## standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java: ########## @@ -6266,4 +6271,105 @@ public boolean isWrapperFor(Class<?> iface) throws SQLException { } } + @Override + @RetrySemantics.SafeToRetry + public AbortCompactResponse abortCompactions(AbortCompactionRequest reqst) throws MetaException, NoSuchCompactionException { + Map<Long, AbortCompactionResponseElement> abortCompactionResponseElements = new HashMap<>(); + AbortCompactResponse response = new AbortCompactResponse(new HashMap<>()); + response.setAbortedcompacts(abortCompactionResponseElements); + + List<Long> compactionIdsToAbort = reqst.getCompactionIds(); + if (compactionIdsToAbort.isEmpty()) { + LOG.info("Compaction ids are missing in request. No compactions to abort"); + throw new NoSuchCompactionException("Compaction ids missing in request. No compactions to abort"); + } + reqst.getCompactionIds().forEach(x -> { + abortCompactionResponseElements.put(x, new AbortCompactionResponseElement(x, "Error", + "No Such Compaction Id Available")); + }); + + List<CompactionInfo> eligibleCompactionsToAbort = findEligibleCompactionsToAbort(abortCompactionResponseElements, + compactionIdsToAbort); + for (int x = 0; x < eligibleCompactionsToAbort.size(); x++) { + abortCompactionResponseElements.put(eligibleCompactionsToAbort.get(x).id, abortCompaction(eligibleCompactionsToAbort.get(x))); + } + return response; + } + + @RetrySemantics.SafeToRetry + public AbortCompactionResponseElement abortCompaction(CompactionInfo compactionInfo) throws MetaException { + try { + try (Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED, connPoolMutex); + PreparedStatement pStmt = dbConn.prepareStatement(TxnQueries.INSERT_INTO_COMPLETED_COMPACTION)) { + compactionInfo.state = TxnStore.ABORTED_STATE; + compactionInfo.errorMessage = "Comapction Aborted by Abort Comapction request."; + CompactionInfo.insertIntoCompletedCompactions(pStmt, compactionInfo, getDbTime(dbConn)); + int updCount = pStmt.executeUpdate(); + if (updCount != 1) { + LOG.error("Unable to update compaction record: {}. updCnt={}", compactionInfo, updCount); + dbConn.rollback(); + return new AbortCompactionResponseElement(compactionInfo.id, + "Error", "Error while aborting compaction:Unable to update compaction record in COMPLETED_COMPACTIONS"); + } else { + LOG.debug("Inserted {} entries into COMPLETED_COMPACTIONS", updCount); + try (PreparedStatement stmt = dbConn.prepareStatement("DELETE FROM \"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = ?")) { + stmt.setLong(1, compactionInfo.id); + LOG.debug("Going to execute update on COMPACTION_QUEUE "); + updCount = stmt.executeUpdate(); + if (updCount != 1) { + LOG.error("Unable to update compaction record: {}. updCnt={}", compactionInfo, updCount); + dbConn.rollback(); + return new AbortCompactionResponseElement(compactionInfo.id, + "Error", "Error while aborting compaction: Unable to update compaction record in COMPACTION_QUEUE"); + } else { + dbConn.commit(); + return new AbortCompactionResponseElement(compactionInfo.id, + "Success", "Successfully aborted compaction"); + } + } catch (SQLException e) { + dbConn.rollback(); + return new AbortCompactionResponseElement(compactionInfo.id, + "Error", "Error while aborting compaction:" + e.getMessage()); + } + } + } catch (SQLException e) { + LOG.error("Unable to connect to transaction database: " + e.getMessage()); + checkRetryable(e, "abortCompaction(" + compactionInfo + ")"); + return new AbortCompactionResponseElement(compactionInfo.id, + "Error", "Error while aborting compaction:" + e.getMessage()); + } + } catch (RetryException e) { + return abortCompaction(compactionInfo); + } + } + + private List<CompactionInfo> findEligibleCompactionsToAbort(Map<Long, + AbortCompactionResponseElement> abortCompactionResponseElements, List<Long> requestedCompId) throws MetaException { + + List<CompactionInfo> compactionInfoList = new ArrayList<>(); + String queryText = TxnQueries.SELECT_COMPACTION_QUEUE_BY_COMPID + " WHERE \"CC_ID\" IN (?) " ; + String sqlIN = requestedCompId.stream() + .map(x -> String.valueOf(x)) + .collect(Collectors.joining(",", "(", ")")); + queryText = queryText.replace("(?)", sqlIN); Review Comment: You could simply append the in clause to the end of the query, there's no need of replace here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org For additional commands, e-mail: gitbox-h...@hive.apache.org