[
https://issues.apache.org/jira/browse/HIVE-26804?focusedWorklogId=841971&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-841971
]
ASF GitHub Bot logged work on HIVE-26804:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 27/Jan/23 10:15
Start Date: 27/Jan/23 10:15
Worklog Time Spent: 10m
Work Description: veghlaci05 commented on code in PR #3880:
URL: https://github.com/apache/hive/pull/3880#discussion_r1088761769
##########
ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java:
##########
@@ -3458,14 +3458,14 @@ public void testAbortCompaction() throws Exception {
runStatementOnDriver("alter table mydb1.tbl1" + " PARTITION(ds='today')
compact 'MAJOR'");
TestTxnCommands2.runWorker(hiveConf);
- runStatementOnDriver("drop table if exists T1");
- runStatementOnDriver("create table T1 (a int, b int) stored as orc
TBLPROPERTIES ('transactional'='true')");
- runStatementOnDriver("insert into T1 values(0,2)");//makes delta_1_1 in T1
- runStatementOnDriver("insert into T1 values(1,4)");//makes delta_2_2 in T2
+ runStatementOnDriver("drop table if exists myT1");
+ runStatementOnDriver("create table myT1 (a int, b int) stored as orc
TBLPROPERTIES ('transactional'='true')");
+ runStatementOnDriver("insert into myT1 values(0,2)");//makes delta_1_1 in
T1
+ runStatementOnDriver("insert into myT1 values(1,4)");//makes delta_2_2 in
T2
//create failed compaction attempt so that compactor txn is aborted
HiveConf.setBoolVar(hiveConf,
HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION, true);
- runStatementOnDriver("alter table T1 compact 'minor'");
+ runStatementOnDriver("alter table myT1 compact 'minor'");
Review Comment:
Is this change intended? If yes, why?
##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java:
##########
@@ -6266,4 +6271,105 @@ public boolean isWrapperFor(Class<?> iface) throws
SQLException {
}
}
+ @Override
+ @RetrySemantics.SafeToRetry
+ public AbortCompactResponse abortCompactions(AbortCompactionRequest reqst)
throws MetaException, NoSuchCompactionException {
+ Map<Long, AbortCompactionResponseElement> abortCompactionResponseElements
= new HashMap<>();
+ AbortCompactResponse response = new AbortCompactResponse(new HashMap<>());
+ response.setAbortedcompacts(abortCompactionResponseElements);
+
+ List<Long> compactionIdsToAbort = reqst.getCompactionIds();
+ if (compactionIdsToAbort.isEmpty()) {
+ LOG.info("Compaction ids are missing in request. No compactions to
abort");
+ throw new NoSuchCompactionException("Compaction ids missing in request.
No compactions to abort");
+ }
+ reqst.getCompactionIds().forEach(x -> {
+ abortCompactionResponseElements.put(x, new
AbortCompactionResponseElement(x, "Error",
+ "No Such Compaction Id Available"));
+ });
+
+ List<CompactionInfo> eligibleCompactionsToAbort =
findEligibleCompactionsToAbort(abortCompactionResponseElements,
+ compactionIdsToAbort);
+ for (int x = 0; x < eligibleCompactionsToAbort.size(); x++) {
Review Comment:
You could replace it with
`for (CompactionInfo compactionInfo : eligibleCompactionsToAbort) {`
##########
standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnQueries.java:
##########
@@ -18,36 +18,70 @@
package org.apache.hadoop.hive.metastore.txn;
public class TxnQueries {
- public static final String SHOW_COMPACTION_ORDERBY_CLAUSE =
- " ORDER BY CASE " +
- " WHEN \"CC_END\" > \"CC_START\" and \"CC_END\" > \"CC_COMMIT_TIME\" " +
- " THEN \"CC_END\" " +
- " WHEN \"CC_START\" > \"CC_COMMIT_TIME\" " +
- " THEN \"CC_START\" " +
- " ELSE \"CC_COMMIT_TIME\" " +
- " END desc ," +
- " \"CC_ENQUEUE_TIME\" asc";
+ public static final String SHOW_COMPACTION_ORDERBY_CLAUSE =
+ " ORDER BY CASE " +
+ " WHEN \"CC_END\" > \"CC_START\" and \"CC_END\" >
\"CC_COMMIT_TIME\" " +
+ " THEN \"CC_END\" " +
+ " WHEN \"CC_START\" > \"CC_COMMIT_TIME\" " +
+ " THEN \"CC_START\" " +
+ " ELSE \"CC_COMMIT_TIME\" " +
+ " END desc ," +
+ " \"CC_ENQUEUE_TIME\" asc";
- public static final String SHOW_COMPACTION_QUERY =
- "SELECT XX.* FROM ( SELECT " +
- " \"CQ_DATABASE\" AS \"CC_DATABASE\", \"CQ_TABLE\" AS \"CC_TABLE\",
\"CQ_PARTITION\" AS \"CC_PARTITION\", " +
- " \"CQ_STATE\" AS \"CC_STATE\", \"CQ_TYPE\" AS \"CC_TYPE\",
\"CQ_WORKER_ID\" AS \"CC_WORKER_ID\", " +
- " \"CQ_START\" AS \"CC_START\", -1 \"CC_END\", \"CQ_RUN_AS\" AS
\"CC_RUN_AS\", " +
- " \"CQ_HADOOP_JOB_ID\" AS \"CC_HADOOP_JOB_ID\", \"CQ_ID\" AS \"CC_ID\",
\"CQ_ERROR_MESSAGE\" AS \"CC_ERROR_MESSAGE\", " +
- " \"CQ_ENQUEUE_TIME\" AS \"CC_ENQUEUE_TIME\", \"CQ_WORKER_VERSION\" AS
\"CC_WORKER_VERSION\", " +
- " \"CQ_INITIATOR_ID\" AS \"CC_INITIATOR_ID\", \"CQ_INITIATOR_VERSION\" AS
\"CC_INITIATOR_VERSION\", " +
- " \"CQ_CLEANER_START\" AS \"CC_CLEANER_START\", \"CQ_POOL_NAME\" AS
\"CC_POOL_NAME\", \"CQ_TXN_ID\" AS \"CC_TXN_ID\", " +
- " \"CQ_NEXT_TXN_ID\" AS \"CC_NEXT_TXN_ID\", \"CQ_COMMIT_TIME\" AS
\"CC_COMMIT_TIME\", " +
- " \"CQ_HIGHEST_WRITE_ID\" AS \"CC_HIGHEST_WRITE_ID\" " +
- "FROM " +
- " \"COMPACTION_QUEUE\" " +
- "UNION ALL " +
- "SELECT " +
- " \"CC_DATABASE\", \"CC_TABLE\", \"CC_PARTITION\", \"CC_STATE\",
\"CC_TYPE\", \"CC_WORKER_ID\", " +
- " \"CC_START\", \"CC_END\", \"CC_RUN_AS\", \"CC_HADOOP_JOB_ID\",
\"CC_ID\", \"CC_ERROR_MESSAGE\", " +
- " \"CC_ENQUEUE_TIME\", \"CC_WORKER_VERSION\", \"CC_INITIATOR_ID\",
\"CC_INITIATOR_VERSION\", " +
- " -1 , \"CC_POOL_NAME\", \"CC_TXN_ID\", \"CC_NEXT_TXN_ID\",
\"CC_COMMIT_TIME\", " +
- " \"CC_HIGHEST_WRITE_ID\"" +
- "FROM " +
- " \"COMPLETED_COMPACTIONS\" ) XX ";
+ public static final String SHOW_COMPACTION_QUERY =
+ "SELECT XX.* FROM ( SELECT " +
+ " \"CQ_DATABASE\" AS \"CC_DATABASE\", \"CQ_TABLE\" AS
\"CC_TABLE\", \"CQ_PARTITION\" AS \"CC_PARTITION\", " +
+ " \"CQ_STATE\" AS \"CC_STATE\", \"CQ_TYPE\" AS
\"CC_TYPE\", \"CQ_WORKER_ID\" AS \"CC_WORKER_ID\", " +
+ " \"CQ_START\" AS \"CC_START\", -1 \"CC_END\",
\"CQ_RUN_AS\" AS \"CC_RUN_AS\", " +
+ " \"CQ_HADOOP_JOB_ID\" AS \"CC_HADOOP_JOB_ID\", \"CQ_ID\"
AS \"CC_ID\", \"CQ_ERROR_MESSAGE\" AS \"CC_ERROR_MESSAGE\", " +
+ " \"CQ_ENQUEUE_TIME\" AS \"CC_ENQUEUE_TIME\",
\"CQ_WORKER_VERSION\" AS \"CC_WORKER_VERSION\", " +
+ " \"CQ_INITIATOR_ID\" AS \"CC_INITIATOR_ID\",
\"CQ_INITIATOR_VERSION\" AS \"CC_INITIATOR_VERSION\", " +
+ " \"CQ_CLEANER_START\" AS \"CC_CLEANER_START\",
\"CQ_POOL_NAME\" AS \"CC_POOL_NAME\", \"CQ_TXN_ID\" AS \"CC_TXN_ID\", " +
+ " \"CQ_NEXT_TXN_ID\" AS \"CC_NEXT_TXN_ID\",
\"CQ_COMMIT_TIME\" AS \"CC_COMMIT_TIME\", " +
+ " \"CQ_HIGHEST_WRITE_ID\" AS \"CC_HIGHEST_WRITE_ID\" " +
+ "FROM " +
+ " \"COMPACTION_QUEUE\" " +
+ "UNION ALL " +
+ "SELECT " +
+ " \"CC_DATABASE\" , \"CC_TABLE\", \"CC_PARTITION\",
\"CC_STATE\", \"CC_TYPE\", \"CC_WORKER_ID\", " +
+ " \"CC_START\", \"CC_END\", \"CC_RUN_AS\",
\"CC_HADOOP_JOB_ID\", \"CC_ID\", \"CC_ERROR_MESSAGE\", " +
+ " \"CC_ENQUEUE_TIME\", \"CC_WORKER_VERSION\",
\"CC_INITIATOR_ID\", \"CC_INITIATOR_VERSION\", " +
+ " -1 , \"CC_POOL_NAME\", \"CC_TXN_ID\",
\"CC_NEXT_TXN_ID\", \"CC_COMMIT_TIME\", " +
+ " \"CC_HIGHEST_WRITE_ID\"" +
+ "FROM " +
+ " \"COMPLETED_COMPACTIONS\" ) XX ";
+
+
+ public static final String SELECT_COMPACTION_QUEUE_BY_COMPID =
Review Comment:
There is no need for aliases, your are accessing the data by indexes. If the
number and thype of the columns are equal, the coulmn names can differ.
##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java:
##########
@@ -6266,4 +6271,105 @@ public boolean isWrapperFor(Class<?> iface) throws
SQLException {
}
}
+ @Override
+ @RetrySemantics.SafeToRetry
+ public AbortCompactResponse abortCompactions(AbortCompactionRequest reqst)
throws MetaException, NoSuchCompactionException {
+ Map<Long, AbortCompactionResponseElement> abortCompactionResponseElements
= new HashMap<>();
+ AbortCompactResponse response = new AbortCompactResponse(new HashMap<>());
+ response.setAbortedcompacts(abortCompactionResponseElements);
+
+ List<Long> compactionIdsToAbort = reqst.getCompactionIds();
+ if (compactionIdsToAbort.isEmpty()) {
+ LOG.info("Compaction ids are missing in request. No compactions to
abort");
+ throw new NoSuchCompactionException("Compaction ids missing in request.
No compactions to abort");
+ }
+ reqst.getCompactionIds().forEach(x -> {
+ abortCompactionResponseElements.put(x, new
AbortCompactionResponseElement(x, "Error",
+ "No Such Compaction Id Available"));
+ });
+
+ List<CompactionInfo> eligibleCompactionsToAbort =
findEligibleCompactionsToAbort(abortCompactionResponseElements,
+ compactionIdsToAbort);
+ for (int x = 0; x < eligibleCompactionsToAbort.size(); x++) {
+
abortCompactionResponseElements.put(eligibleCompactionsToAbort.get(x).id,
abortCompaction(eligibleCompactionsToAbort.get(x)));
+ }
+ return response;
+ }
+
+ @RetrySemantics.SafeToRetry
+ public AbortCompactionResponseElement abortCompaction(CompactionInfo
compactionInfo) throws MetaException {
+ try {
+ try (Connection dbConn =
getDbConn(Connection.TRANSACTION_READ_COMMITTED, connPoolMutex);
+ PreparedStatement pStmt =
dbConn.prepareStatement(TxnQueries.INSERT_INTO_COMPLETED_COMPACTION)) {
+ compactionInfo.state = TxnStore.ABORTED_STATE;
+ compactionInfo.errorMessage = "Comapction Aborted by Abort Comapction
request.";
+ CompactionInfo.insertIntoCompletedCompactions(pStmt, compactionInfo,
getDbTime(dbConn));
+ int updCount = pStmt.executeUpdate();
+ if (updCount != 1) {
+ LOG.error("Unable to update compaction record: {}. updCnt={}",
compactionInfo, updCount);
+ dbConn.rollback();
+ return new AbortCompactionResponseElement(compactionInfo.id,
+ "Error", "Error while aborting compaction:Unable to update
compaction record in COMPLETED_COMPACTIONS");
+ } else {
+ LOG.debug("Inserted {} entries into COMPLETED_COMPACTIONS",
updCount);
+ try (PreparedStatement stmt = dbConn.prepareStatement("DELETE FROM
\"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = ?")) {
+ stmt.setLong(1, compactionInfo.id);
+ LOG.debug("Going to execute update on COMPACTION_QUEUE ");
+ updCount = stmt.executeUpdate();
+ if (updCount != 1) {
+ LOG.error("Unable to update compaction record: {}. updCnt={}",
compactionInfo, updCount);
+ dbConn.rollback();
+ return new AbortCompactionResponseElement(compactionInfo.id,
+ "Error", "Error while aborting compaction: Unable to
update compaction record in COMPACTION_QUEUE");
+ } else {
+ dbConn.commit();
+ return new AbortCompactionResponseElement(compactionInfo.id,
+ "Success", "Successfully aborted compaction");
+ }
+ } catch (SQLException e) {
+ dbConn.rollback();
+ return new AbortCompactionResponseElement(compactionInfo.id,
+ "Error", "Error while aborting compaction:" +
e.getMessage());
+ }
+ }
+ } catch (SQLException e) {
+ LOG.error("Unable to connect to transaction database: " +
e.getMessage());
+ checkRetryable(e, "abortCompaction(" + compactionInfo + ")");
+ return new AbortCompactionResponseElement(compactionInfo.id,
+ "Error", "Error while aborting compaction:" + e.getMessage());
+ }
+ } catch (RetryException e) {
+ return abortCompaction(compactionInfo);
+ }
+ }
+
+ private List<CompactionInfo> findEligibleCompactionsToAbort(Map<Long,
+ AbortCompactionResponseElement> abortCompactionResponseElements,
List<Long> requestedCompId) throws MetaException {
+
+ List<CompactionInfo> compactionInfoList = new ArrayList<>();
+ String queryText = TxnQueries.SELECT_COMPACTION_QUEUE_BY_COMPID + " WHERE
\"CC_ID\" IN (?) " ;
+ String sqlIN = requestedCompId.stream()
+ .map(x -> String.valueOf(x))
+ .collect(Collectors.joining(",", "(", ")"));
+ queryText = queryText.replace("(?)", sqlIN);
Review Comment:
You could simply append the in clause to the end of the query, there's no
need of replace here.
Issue Time Tracking
-------------------
Worklog Id: (was: 841971)
Time Spent: 4h 20m (was: 4h 10m)
> Cancel Compactions in initiated state
> -------------------------------------
>
> Key: HIVE-26804
> URL: https://issues.apache.org/jira/browse/HIVE-26804
> Project: Hive
> Issue Type: New Feature
> Components: Hive
> Reporter: KIRTI RUGE
> Assignee: KIRTI RUGE
> Priority: Major
> Labels: pull-request-available
> Time Spent: 4h 20m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)