[
https://issues.apache.org/jira/browse/HIVE-26804?focusedWorklogId=841150&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-841150
]
ASF GitHub Bot logged work on HIVE-26804:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 23/Jan/23 15:02
Start Date: 23/Jan/23 15:02
Worklog Time Spent: 10m
Work Description: veghlaci05 commented on code in PR #3880:
URL: https://github.com/apache/hive/pull/3880#discussion_r1084128577
##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java:
##########
@@ -6266,4 +6271,110 @@ public boolean isWrapperFor(Class<?> iface) throws
SQLException {
}
}
+ @Override
+ @RetrySemantics.SafeToRetry
+ public AbortCompactResponse abortCompactions(AbortCompactionRequest reqst)
throws MetaException, NoSuchCompactionException {
+ Map<Long, AbortCompactionResponseElement> abortCompactionResponseElements
= new HashMap<>();
+ AbortCompactResponse response = new AbortCompactResponse(new HashMap<>());
+ response.setAbortedcompacts(abortCompactionResponseElements);
+
+ List<Long> compactionIdsToAbort = reqst.getCompactionIds();
+ if (compactionIdsToAbort.isEmpty()) {
+ LOG.info("Compaction ids are missing in request. No compactions to
abort");
+ throw new NoSuchCompactionException("Compaction ids missing in request.
No compactions to abort");
+ }
+ reqst.getCompactionIds().forEach(x -> {
+ addAbortCompactionResponse(abortCompactionResponseElements,x, "No Such
Compaction Id Available","Error");
+ });
+
+ List<CompactionInfo> eligibleCompactionsToAbort =
findEligibleCompactionsToAbort(abortCompactionResponseElements,compactionIdsToAbort);
+ for (int x = 0; x < eligibleCompactionsToAbort.size(); x++) {
+ abortCompaction(abortCompactionResponseElements,
eligibleCompactionsToAbort.get(x));
+ }
+ return response;
+ }
+
+ @RetrySemantics.SafeToRetry
+ public void abortCompaction(Map<Long, AbortCompactionResponseElement>
abortCompactionResponseElements,
+ CompactionInfo compactionInfo) throws
MetaException {
+ try {
+ try (Connection dbConn =
getDbConn(Connection.TRANSACTION_READ_COMMITTED, connPoolMutex);
+ PreparedStatement pStmt =
dbConn.prepareStatement(TxnQueries.INSERT_INTO_COMPLETED_COMPACTION)) {
+ compactionInfo.state = TxnStore.ABORTED_STATE;
+ compactionInfo.errorMessage = "Comapction Aborted by Abort Comapction
request.";
+ CompactionInfo.insertIntoCompletedCompactions(pStmt, compactionInfo,
getDbTime(dbConn));
+ int updCount = pStmt.executeUpdate();
+ if (updCount != 1) {
+ LOG.error("Unable to update compaction record: {}. updCnt={}",
compactionInfo, updCount);
+ dbConn.rollback();
+ addAbortCompactionResponse(abortCompactionResponseElements,
compactionInfo.id,
+ "Error while aborting compaction:Unable to update compaction
record in COMPLETED_COMPACTIONS", "Error");
+ } else {
+ LOG.debug("Inserted {} entries into COMPLETED_COMPACTIONS",
updCount);
+ try (PreparedStatement stmt = dbConn.prepareStatement("DELETE FROM
\"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = ?")) {
+ stmt.setLong(1, compactionInfo.id);
+ LOG.debug("Going to execute update on COMPACTION_QUEUE ");
+ updCount = stmt.executeUpdate();
+ if (updCount != 1) {
+ LOG.error("Unable to update compaction record: {}. updCnt={}",
compactionInfo, updCount);
+ dbConn.rollback();
+ addAbortCompactionResponse(abortCompactionResponseElements,
compactionInfo.id,
+ "Error while aborting compaction: Unable to update
compaction record in COMPACTION_QUEUE", "Error");
+ } else {
+ dbConn.commit();
+ addAbortCompactionResponse(abortCompactionResponseElements,
compactionInfo.id, "Successfully aborted compaction",
+ "Success");
+ }
+ } catch (SQLException e) {
+ dbConn.rollback();
+ addAbortCompactionResponse(abortCompactionResponseElements,
compactionInfo.id,
+ "Error while aborting compaction:"+e.getMessage(),
"Error");
+ }
+ }
+ } catch (SQLException e) {
+ LOG.error("Unable to connect to transaction database: " +
e.getMessage());
+ checkRetryable(e, "abortCompaction(" + compactionInfo + ")");
+ addAbortCompactionResponse(abortCompactionResponseElements,
compactionInfo.id,
+ "Error while aborting compaction:"+ e.getMessage(), "Error" );
+ }
+ } catch (RetryException e) {
+ abortCompaction(abortCompactionResponseElements, compactionInfo);
+ }
+ }
+
+ private List<CompactionInfo> findEligibleCompactionsToAbort(Map<Long,
+ AbortCompactionResponseElement> abortCompactionResponseElements,
List<Long> requestedCompId) throws MetaException {
+
+ List<CompactionInfo> compactionInfoList = new ArrayList<>();
+ String queryText = TxnQueries.SELECT_COMPACTION_QUEUE_BY_COMPID + " WHERE
\"CC_ID\" IN " +
+ "(" + Joiner.on(',').join(requestedCompId) + ") ";
+ try (Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ PreparedStatement pStmt = dbConn.prepareStatement(queryText)) {
+ try (ResultSet rs = pStmt.executeQuery()) {
+ while (rs.next()) {
+ if (checkIfCompactionEligibleToAbort(rs.getString(5).charAt(0))) {
+
compactionInfoList.add(CompactionInfo.loadFullFromCompactionQueue(rs));
+ } else {
+ addAbortCompactionResponse(abortCompactionResponseElements,
rs.getLong(1),
+ "Error while aborting compaction as compaction is in state-" +
+ CompactionState.fromSqlConst(rs.getString(5).charAt(0)),
"Error");
+ }
+ }
+ }
+ } catch (SQLException e) {
+ throw new MetaException("Unable to select from transaction database-" +
StringUtils.stringifyException(e));
+ }
+ return compactionInfoList;
+ }
+
+ private boolean checkIfCompactionEligibleToAbort(char state) {
+
+ return
CompactionState.INITIATED.equals(CompactionState.fromSqlConst(state));
+ }
Review Comment:
You could eliminate this method completely. It's a one liner and there's
only one usage
##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java:
##########
@@ -6266,4 +6271,110 @@ public boolean isWrapperFor(Class<?> iface) throws
SQLException {
}
}
+ @Override
+ @RetrySemantics.SafeToRetry
+ public AbortCompactResponse abortCompactions(AbortCompactionRequest reqst)
throws MetaException, NoSuchCompactionException {
+ Map<Long, AbortCompactionResponseElement> abortCompactionResponseElements
= new HashMap<>();
+ AbortCompactResponse response = new AbortCompactResponse(new HashMap<>());
+ response.setAbortedcompacts(abortCompactionResponseElements);
+
+ List<Long> compactionIdsToAbort = reqst.getCompactionIds();
+ if (compactionIdsToAbort.isEmpty()) {
+ LOG.info("Compaction ids are missing in request. No compactions to
abort");
+ throw new NoSuchCompactionException("Compaction ids missing in request.
No compactions to abort");
+ }
+ reqst.getCompactionIds().forEach(x -> {
+ addAbortCompactionResponse(abortCompactionResponseElements,x, "No Such
Compaction Id Available","Error");
+ });
+
+ List<CompactionInfo> eligibleCompactionsToAbort =
findEligibleCompactionsToAbort(abortCompactionResponseElements,compactionIdsToAbort);
+ for (int x = 0; x < eligibleCompactionsToAbort.size(); x++) {
+ abortCompaction(abortCompactionResponseElements,
eligibleCompactionsToAbort.get(x));
+ }
+ return response;
+ }
+
+ @RetrySemantics.SafeToRetry
+ public void abortCompaction(Map<Long, AbortCompactionResponseElement>
abortCompactionResponseElements,
+ CompactionInfo compactionInfo) throws
MetaException {
+ try {
+ try (Connection dbConn =
getDbConn(Connection.TRANSACTION_READ_COMMITTED, connPoolMutex);
+ PreparedStatement pStmt =
dbConn.prepareStatement(TxnQueries.INSERT_INTO_COMPLETED_COMPACTION)) {
+ compactionInfo.state = TxnStore.ABORTED_STATE;
+ compactionInfo.errorMessage = "Comapction Aborted by Abort Comapction
request.";
+ CompactionInfo.insertIntoCompletedCompactions(pStmt, compactionInfo,
getDbTime(dbConn));
+ int updCount = pStmt.executeUpdate();
+ if (updCount != 1) {
+ LOG.error("Unable to update compaction record: {}. updCnt={}",
compactionInfo, updCount);
+ dbConn.rollback();
+ addAbortCompactionResponse(abortCompactionResponseElements,
compactionInfo.id,
+ "Error while aborting compaction:Unable to update compaction
record in COMPLETED_COMPACTIONS", "Error");
+ } else {
+ LOG.debug("Inserted {} entries into COMPLETED_COMPACTIONS",
updCount);
+ try (PreparedStatement stmt = dbConn.prepareStatement("DELETE FROM
\"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = ?")) {
+ stmt.setLong(1, compactionInfo.id);
+ LOG.debug("Going to execute update on COMPACTION_QUEUE ");
+ updCount = stmt.executeUpdate();
+ if (updCount != 1) {
+ LOG.error("Unable to update compaction record: {}. updCnt={}",
compactionInfo, updCount);
+ dbConn.rollback();
+ addAbortCompactionResponse(abortCompactionResponseElements,
compactionInfo.id,
+ "Error while aborting compaction: Unable to update
compaction record in COMPACTION_QUEUE", "Error");
+ } else {
+ dbConn.commit();
+ addAbortCompactionResponse(abortCompactionResponseElements,
compactionInfo.id, "Successfully aborted compaction",
+ "Success");
+ }
+ } catch (SQLException e) {
+ dbConn.rollback();
+ addAbortCompactionResponse(abortCompactionResponseElements,
compactionInfo.id,
+ "Error while aborting compaction:"+e.getMessage(),
"Error");
+ }
+ }
+ } catch (SQLException e) {
+ LOG.error("Unable to connect to transaction database: " +
e.getMessage());
+ checkRetryable(e, "abortCompaction(" + compactionInfo + ")");
+ addAbortCompactionResponse(abortCompactionResponseElements,
compactionInfo.id,
+ "Error while aborting compaction:"+ e.getMessage(), "Error" );
+ }
+ } catch (RetryException e) {
+ abortCompaction(abortCompactionResponseElements, compactionInfo);
+ }
+ }
+
+ private List<CompactionInfo> findEligibleCompactionsToAbort(Map<Long,
+ AbortCompactionResponseElement> abortCompactionResponseElements,
List<Long> requestedCompId) throws MetaException {
+
+ List<CompactionInfo> compactionInfoList = new ArrayList<>();
+ String queryText = TxnQueries.SELECT_COMPACTION_QUEUE_BY_COMPID + " WHERE
\"CC_ID\" IN " +
+ "(" + Joiner.on(',').join(requestedCompId) + ") ";
+ try (Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ PreparedStatement pStmt = dbConn.prepareStatement(queryText)) {
+ try (ResultSet rs = pStmt.executeQuery()) {
+ while (rs.next()) {
+ if (checkIfCompactionEligibleToAbort(rs.getString(5).charAt(0))) {
+
compactionInfoList.add(CompactionInfo.loadFullFromCompactionQueue(rs));
+ } else {
+ addAbortCompactionResponse(abortCompactionResponseElements,
rs.getLong(1),
+ "Error while aborting compaction as compaction is in state-" +
+ CompactionState.fromSqlConst(rs.getString(5).charAt(0)),
"Error");
+ }
+ }
+ }
+ } catch (SQLException e) {
+ throw new MetaException("Unable to select from transaction database-" +
StringUtils.stringifyException(e));
+ }
+ return compactionInfoList;
+ }
+
+ private boolean checkIfCompactionEligibleToAbort(char state) {
+
+ return
CompactionState.INITIATED.equals(CompactionState.fromSqlConst(state));
+ }
+
+ private void addAbortCompactionResponse(Map<Long,
AbortCompactionResponseElement> abortCompactionResponseElements,
+ long id, String message, String
status) {
+
+ abortCompactionResponseElements.put(id, new
AbortCompactionResponseElement(id, status, message));
+ }
Review Comment:
This the recent changes this method become a one-liner. You could eliminate
it completely
##########
ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java:
##########
@@ -3418,6 +3422,100 @@ public void testShowCompactionOrder() throws Exception {
Assert.assertEquals(TxnStore.REFUSED_RESPONSE, compacts.get(5).getState());
}
+
+ @Test
+ public void testAbortCompaction() throws Exception {
+
+ d.destroy();
+ hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict");
+ d = new Driver(hiveConf);
+ //generate some compaction history
+ runStatementOnDriver("drop database if exists mydb1 cascade");
+ runStatementOnDriver("create database mydb1");
+
+ runStatementOnDriver("create table mydb1.tbl0 " + "(a int, b int)
partitioned by (p string) clustered by (a) into " +
+ BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES
('transactional'='true')");
+ runStatementOnDriver("insert into mydb1.tbl0" + " PARTITION(p) " +
+ "
values(1,2,'p1'),(3,4,'p1'),(1,2,'p2'),(3,4,'p2'),(1,2,'p3'),(3,4,'p3')");
+ runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION(p='p1')
compact 'MAJOR'");
+ runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION(p='p2')
compact 'MAJOR'");
+ TestTxnCommands2.runWorker(hiveConf);
+ TestTxnCommands2.runCleaner(hiveConf);
+ runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION(p='p3')
compact 'MAJOR'");
+ runStatementOnDriver("insert into mydb1.tbl0" + " PARTITION(p) " +
+ "
values(4,5,'p1'),(6,7,'p1'),(4,5,'p2'),(6,7,'p2'),(4,5,'p3'),(6,7,'p3')");
+ TestTxnCommands2.runWorker(hiveConf);
+ TestTxnCommands2.runCleaner(hiveConf);
+ runStatementOnDriver("insert into mydb1.tbl0" + " PARTITION(p) " +
+ "
values(11,12,'p1'),(13,14,'p1'),(11,12,'p2'),(13,14,'p2'),(11,12,'p3'),(13,14,'p3')");
+ runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION (p='p1')
compact 'MINOR'");
+ TestTxnCommands2.runWorker(hiveConf);
+
+ runStatementOnDriver("create table mydb1.tbl1 " + "(a int, b int)
partitioned by (ds string) clustered by (a) into " +
+ BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES
('transactional'='true')");
+ runStatementOnDriver("insert into mydb1.tbl1" + " PARTITION(ds) " +
+ "
values(1,2,'today'),(3,4,'today'),(1,2,'tomorrow'),(3,4,'tomorrow'),(1,2,'yesterday'),(3,4,'yesterday')");
+ runStatementOnDriver("alter table mydb1.tbl1" + " PARTITION(ds='today')
compact 'MAJOR'");
+ TestTxnCommands2.runWorker(hiveConf);
+
+ runStatementOnDriver("drop table if exists T1");
+ runStatementOnDriver("create table T1 (a int, b int) stored as orc
TBLPROPERTIES ('transactional'='true')");
+ runStatementOnDriver("insert into T1 values(0,2)");//makes delta_1_1 in T1
+ runStatementOnDriver("insert into T1 values(1,4)");//makes delta_2_2 in T2
+
+ //create failed compaction attempt so that compactor txn is aborted
+ HiveConf.setBoolVar(hiveConf,
HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION, true);
+ runStatementOnDriver("alter table T1 compact 'minor'");
+ TestTxnCommands2.runWorker(hiveConf);
+ // Verify compaction order
+ List<ShowCompactResponseElement> compacts =
+ txnHandler.showCompact(new ShowCompactRequest()).getCompacts();
+ Assert.assertEquals(6, compacts.size());
+ Assert.assertEquals(TxnStore.INITIATED_RESPONSE,
compacts.get(0).getState());
+ Assert.assertEquals(TxnStore.REFUSED_RESPONSE, compacts.get(1).getState());
+ Assert.assertEquals(TxnStore.CLEANING_RESPONSE,
compacts.get(2).getState());
+ Assert.assertEquals(TxnStore.CLEANING_RESPONSE,
compacts.get(3).getState());
+ Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE,
compacts.get(4).getState());
+ Assert.assertEquals(TxnStore.REFUSED_RESPONSE, compacts.get(5).getState());
+
+ long initiatedStateCompId = compacts.get(0).getId();
+ List<Long> refusedStateCompIds =
Arrays.asList(compacts.get(1).getId(),compacts.get(5).getId());
+ List<Long> compactionsToAbort = Arrays.asList(Long.parseLong("12"),
compacts.get(0).getId(),
+ compacts.get(1).getId(), compacts.get(2).getId(),
compacts.get(3).getId(), compacts.get(4).getId(),
+ compacts.get(5).getId());
Review Comment:
You could use a single map here where the long is the id and the value is an
AbortCompactionResponseElement with the expected id, status, and message.
```
Map<Long, AbortCompactionResponseElement> map = new HashMap<Long,
AbortCompactionResponseElement>() {{
put(compacts.get(0).getId(),new
AbortCompactionResponseElement(compacts.get(0).getId(), "Suceess",
"Successfully aborted compaction"));
...
...
}};
```
That would greatly simplify the assertions below.
##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java:
##########
@@ -6266,4 +6271,110 @@ public boolean isWrapperFor(Class<?> iface) throws
SQLException {
}
}
+ @Override
+ @RetrySemantics.SafeToRetry
+ public AbortCompactResponse abortCompactions(AbortCompactionRequest reqst)
throws MetaException, NoSuchCompactionException {
+ Map<Long, AbortCompactionResponseElement> abortCompactionResponseElements
= new HashMap<>();
+ AbortCompactResponse response = new AbortCompactResponse(new HashMap<>());
+ response.setAbortedcompacts(abortCompactionResponseElements);
+
+ List<Long> compactionIdsToAbort = reqst.getCompactionIds();
+ if (compactionIdsToAbort.isEmpty()) {
+ LOG.info("Compaction ids are missing in request. No compactions to
abort");
+ throw new NoSuchCompactionException("Compaction ids missing in request.
No compactions to abort");
+ }
+ reqst.getCompactionIds().forEach(x -> {
+ addAbortCompactionResponse(abortCompactionResponseElements,x, "No Such
Compaction Id Available","Error");
+ });
+
+ List<CompactionInfo> eligibleCompactionsToAbort =
findEligibleCompactionsToAbort(abortCompactionResponseElements,compactionIdsToAbort);
+ for (int x = 0; x < eligibleCompactionsToAbort.size(); x++) {
+ abortCompaction(abortCompactionResponseElements,
eligibleCompactionsToAbort.get(x));
+ }
+ return response;
+ }
+
+ @RetrySemantics.SafeToRetry
+ public void abortCompaction(Map<Long, AbortCompactionResponseElement>
abortCompactionResponseElements,
Review Comment:
You could return with AbortCompactionResponseElement instead of passing the
map
Issue Time Tracking
-------------------
Worklog Id: (was: 841150)
Time Spent: 3h 20m (was: 3h 10m)
> Cancel Compactions in initiated state
> -------------------------------------
>
> Key: HIVE-26804
> URL: https://issues.apache.org/jira/browse/HIVE-26804
> Project: Hive
> Issue Type: New Feature
> Components: Hive
> Reporter: KIRTI RUGE
> Assignee: KIRTI RUGE
> Priority: Major
> Labels: pull-request-available
> Time Spent: 3h 20m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)