rkirtir commented on code in PR #3880: URL: https://github.com/apache/hive/pull/3880#discussion_r1085143309
########## ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java: ########## @@ -3418,6 +3422,100 @@ public void testShowCompactionOrder() throws Exception { Assert.assertEquals(TxnStore.REFUSED_RESPONSE, compacts.get(5).getState()); } + + @Test + public void testAbortCompaction() throws Exception { + + d.destroy(); + hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict"); + d = new Driver(hiveConf); + //generate some compaction history + runStatementOnDriver("drop database if exists mydb1 cascade"); + runStatementOnDriver("create database mydb1"); + + runStatementOnDriver("create table mydb1.tbl0 " + "(a int, b int) partitioned by (p string) clustered by (a) into " + + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + runStatementOnDriver("insert into mydb1.tbl0" + " PARTITION(p) " + + " values(1,2,'p1'),(3,4,'p1'),(1,2,'p2'),(3,4,'p2'),(1,2,'p3'),(3,4,'p3')"); + runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION(p='p1') compact 'MAJOR'"); + runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION(p='p2') compact 'MAJOR'"); + TestTxnCommands2.runWorker(hiveConf); + TestTxnCommands2.runCleaner(hiveConf); + runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION(p='p3') compact 'MAJOR'"); + runStatementOnDriver("insert into mydb1.tbl0" + " PARTITION(p) " + + " values(4,5,'p1'),(6,7,'p1'),(4,5,'p2'),(6,7,'p2'),(4,5,'p3'),(6,7,'p3')"); + TestTxnCommands2.runWorker(hiveConf); + TestTxnCommands2.runCleaner(hiveConf); + runStatementOnDriver("insert into mydb1.tbl0" + " PARTITION(p) " + + " values(11,12,'p1'),(13,14,'p1'),(11,12,'p2'),(13,14,'p2'),(11,12,'p3'),(13,14,'p3')"); + runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION (p='p1') compact 'MINOR'"); + TestTxnCommands2.runWorker(hiveConf); + + runStatementOnDriver("create table mydb1.tbl1 " + "(a int, b int) partitioned by (ds string) clustered by (a) into " + + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + runStatementOnDriver("insert into mydb1.tbl1" + " PARTITION(ds) " + + " values(1,2,'today'),(3,4,'today'),(1,2,'tomorrow'),(3,4,'tomorrow'),(1,2,'yesterday'),(3,4,'yesterday')"); + runStatementOnDriver("alter table mydb1.tbl1" + " PARTITION(ds='today') compact 'MAJOR'"); + TestTxnCommands2.runWorker(hiveConf); + + runStatementOnDriver("drop table if exists T1"); + runStatementOnDriver("create table T1 (a int, b int) stored as orc TBLPROPERTIES ('transactional'='true')"); + runStatementOnDriver("insert into T1 values(0,2)");//makes delta_1_1 in T1 + runStatementOnDriver("insert into T1 values(1,4)");//makes delta_2_2 in T2 + + //create failed compaction attempt so that compactor txn is aborted + HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION, true); + runStatementOnDriver("alter table T1 compact 'minor'"); + TestTxnCommands2.runWorker(hiveConf); + // Verify compaction order + List<ShowCompactResponseElement> compacts = + txnHandler.showCompact(new ShowCompactRequest()).getCompacts(); + Assert.assertEquals(6, compacts.size()); + Assert.assertEquals(TxnStore.INITIATED_RESPONSE, compacts.get(0).getState()); + Assert.assertEquals(TxnStore.REFUSED_RESPONSE, compacts.get(1).getState()); + Assert.assertEquals(TxnStore.CLEANING_RESPONSE, compacts.get(2).getState()); + Assert.assertEquals(TxnStore.CLEANING_RESPONSE, compacts.get(3).getState()); + Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, compacts.get(4).getState()); + Assert.assertEquals(TxnStore.REFUSED_RESPONSE, compacts.get(5).getState()); + + long initiatedStateCompId = compacts.get(0).getId(); + List<Long> refusedStateCompIds = Arrays.asList(compacts.get(1).getId(),compacts.get(5).getId()); + List<Long> compactionsToAbort = Arrays.asList(Long.parseLong("12"), compacts.get(0).getId(), + compacts.get(1).getId(), compacts.get(2).getId(), compacts.get(3).getId(), compacts.get(4).getId(), + compacts.get(5).getId()); Review Comment: fixed ########## standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java: ########## @@ -6266,4 +6271,110 @@ public boolean isWrapperFor(Class<?> iface) throws SQLException { } } + @Override + @RetrySemantics.SafeToRetry + public AbortCompactResponse abortCompactions(AbortCompactionRequest reqst) throws MetaException, NoSuchCompactionException { + Map<Long, AbortCompactionResponseElement> abortCompactionResponseElements = new HashMap<>(); + AbortCompactResponse response = new AbortCompactResponse(new HashMap<>()); + response.setAbortedcompacts(abortCompactionResponseElements); + + List<Long> compactionIdsToAbort = reqst.getCompactionIds(); + if (compactionIdsToAbort.isEmpty()) { + LOG.info("Compaction ids are missing in request. No compactions to abort"); + throw new NoSuchCompactionException("Compaction ids missing in request. No compactions to abort"); + } + reqst.getCompactionIds().forEach(x -> { + addAbortCompactionResponse(abortCompactionResponseElements,x, "No Such Compaction Id Available","Error"); + }); + + List<CompactionInfo> eligibleCompactionsToAbort = findEligibleCompactionsToAbort(abortCompactionResponseElements,compactionIdsToAbort); + for (int x = 0; x < eligibleCompactionsToAbort.size(); x++) { + abortCompaction(abortCompactionResponseElements, eligibleCompactionsToAbort.get(x)); + } + return response; + } + + @RetrySemantics.SafeToRetry + public void abortCompaction(Map<Long, AbortCompactionResponseElement> abortCompactionResponseElements, + CompactionInfo compactionInfo) throws MetaException { + try { + try (Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED, connPoolMutex); + PreparedStatement pStmt = dbConn.prepareStatement(TxnQueries.INSERT_INTO_COMPLETED_COMPACTION)) { + compactionInfo.state = TxnStore.ABORTED_STATE; + compactionInfo.errorMessage = "Comapction Aborted by Abort Comapction request."; + CompactionInfo.insertIntoCompletedCompactions(pStmt, compactionInfo, getDbTime(dbConn)); + int updCount = pStmt.executeUpdate(); + if (updCount != 1) { + LOG.error("Unable to update compaction record: {}. updCnt={}", compactionInfo, updCount); + dbConn.rollback(); + addAbortCompactionResponse(abortCompactionResponseElements, compactionInfo.id, + "Error while aborting compaction:Unable to update compaction record in COMPLETED_COMPACTIONS", "Error"); + } else { + LOG.debug("Inserted {} entries into COMPLETED_COMPACTIONS", updCount); + try (PreparedStatement stmt = dbConn.prepareStatement("DELETE FROM \"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = ?")) { + stmt.setLong(1, compactionInfo.id); + LOG.debug("Going to execute update on COMPACTION_QUEUE "); + updCount = stmt.executeUpdate(); + if (updCount != 1) { + LOG.error("Unable to update compaction record: {}. updCnt={}", compactionInfo, updCount); + dbConn.rollback(); + addAbortCompactionResponse(abortCompactionResponseElements, compactionInfo.id, + "Error while aborting compaction: Unable to update compaction record in COMPACTION_QUEUE", "Error"); + } else { + dbConn.commit(); + addAbortCompactionResponse(abortCompactionResponseElements, compactionInfo.id, "Successfully aborted compaction", + "Success"); + } + } catch (SQLException e) { + dbConn.rollback(); + addAbortCompactionResponse(abortCompactionResponseElements, compactionInfo.id, + "Error while aborting compaction:"+e.getMessage(), "Error"); + } + } + } catch (SQLException e) { + LOG.error("Unable to connect to transaction database: " + e.getMessage()); + checkRetryable(e, "abortCompaction(" + compactionInfo + ")"); + addAbortCompactionResponse(abortCompactionResponseElements, compactionInfo.id, + "Error while aborting compaction:"+ e.getMessage(), "Error" ); + } + } catch (RetryException e) { + abortCompaction(abortCompactionResponseElements, compactionInfo); + } + } + + private List<CompactionInfo> findEligibleCompactionsToAbort(Map<Long, + AbortCompactionResponseElement> abortCompactionResponseElements, List<Long> requestedCompId) throws MetaException { + + List<CompactionInfo> compactionInfoList = new ArrayList<>(); + String queryText = TxnQueries.SELECT_COMPACTION_QUEUE_BY_COMPID + " WHERE \"CC_ID\" IN " + + "(" + Joiner.on(',').join(requestedCompId) + ") "; + try (Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + PreparedStatement pStmt = dbConn.prepareStatement(queryText)) { + try (ResultSet rs = pStmt.executeQuery()) { + while (rs.next()) { + if (checkIfCompactionEligibleToAbort(rs.getString(5).charAt(0))) { + compactionInfoList.add(CompactionInfo.loadFullFromCompactionQueue(rs)); + } else { + addAbortCompactionResponse(abortCompactionResponseElements, rs.getLong(1), + "Error while aborting compaction as compaction is in state-" + + CompactionState.fromSqlConst(rs.getString(5).charAt(0)), "Error"); + } + } + } + } catch (SQLException e) { + throw new MetaException("Unable to select from transaction database-" + StringUtils.stringifyException(e)); + } + return compactionInfoList; + } + + private boolean checkIfCompactionEligibleToAbort(char state) { + + return CompactionState.INITIATED.equals(CompactionState.fromSqlConst(state)); + } Review Comment: fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org For additional commands, e-mail: gitbox-h...@hive.apache.org