veghlaci05 commented on code in PR #3880:
URL: https://github.com/apache/hive/pull/3880#discussion_r1088761769
##########
ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java:
##########
@@ -3458,14 +3458,14 @@ public void testAbortCompaction() throws Exception {
runStatementOnDriver("alter table mydb1.tbl1" + " PARTITION(ds='today')
compact 'MAJOR'");
TestTxnCommands2.runWorker(hiveConf);
- runStatementOnDriver("drop table if exists T1");
- runStatementOnDriver("create table T1 (a int, b int) stored as orc
TBLPROPERTIES ('transactional'='true')");
- runStatementOnDriver("insert into T1 values(0,2)");//makes delta_1_1 in T1
- runStatementOnDriver("insert into T1 values(1,4)");//makes delta_2_2 in T2
+ runStatementOnDriver("drop table if exists myT1");
+ runStatementOnDriver("create table myT1 (a int, b int) stored as orc
TBLPROPERTIES ('transactional'='true')");
+ runStatementOnDriver("insert into myT1 values(0,2)");//makes delta_1_1 in
T1
+ runStatementOnDriver("insert into myT1 values(1,4)");//makes delta_2_2 in
T2
//create failed compaction attempt so that compactor txn is aborted
HiveConf.setBoolVar(hiveConf,
HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION, true);
- runStatementOnDriver("alter table T1 compact 'minor'");
+ runStatementOnDriver("alter table myT1 compact 'minor'");
Review Comment:
Is this change intended? If yes, why?
##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java:
##########
@@ -6266,4 +6271,105 @@ public boolean isWrapperFor(Class<?> iface) throws
SQLException {
}
}
+ @Override
+ @RetrySemantics.SafeToRetry
+ public AbortCompactResponse abortCompactions(AbortCompactionRequest reqst)
throws MetaException, NoSuchCompactionException {
+ Map<Long, AbortCompactionResponseElement> abortCompactionResponseElements
= new HashMap<>();
+ AbortCompactResponse response = new AbortCompactResponse(new HashMap<>());
+ response.setAbortedcompacts(abortCompactionResponseElements);
+
+ List<Long> compactionIdsToAbort = reqst.getCompactionIds();
+ if (compactionIdsToAbort.isEmpty()) {
+ LOG.info("Compaction ids are missing in request. No compactions to
abort");
+ throw new NoSuchCompactionException("Compaction ids missing in request.
No compactions to abort");
+ }
+ reqst.getCompactionIds().forEach(x -> {
+ abortCompactionResponseElements.put(x, new
AbortCompactionResponseElement(x, "Error",
+ "No Such Compaction Id Available"));
+ });
+
+ List<CompactionInfo> eligibleCompactionsToAbort =
findEligibleCompactionsToAbort(abortCompactionResponseElements,
+ compactionIdsToAbort);
+ for (int x = 0; x < eligibleCompactionsToAbort.size(); x++) {
Review Comment:
You could replace it with
`for (CompactionInfo compactionInfo : eligibleCompactionsToAbort) {`
##########
standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnQueries.java:
##########
@@ -18,36 +18,70 @@
package org.apache.hadoop.hive.metastore.txn;
public class TxnQueries {
- public static final String SHOW_COMPACTION_ORDERBY_CLAUSE =
- " ORDER BY CASE " +
- " WHEN \"CC_END\" > \"CC_START\" and \"CC_END\" > \"CC_COMMIT_TIME\" " +
- " THEN \"CC_END\" " +
- " WHEN \"CC_START\" > \"CC_COMMIT_TIME\" " +
- " THEN \"CC_START\" " +
- " ELSE \"CC_COMMIT_TIME\" " +
- " END desc ," +
- " \"CC_ENQUEUE_TIME\" asc";
+ public static final String SHOW_COMPACTION_ORDERBY_CLAUSE =
+ " ORDER BY CASE " +
+ " WHEN \"CC_END\" > \"CC_START\" and \"CC_END\" >
\"CC_COMMIT_TIME\" " +
+ " THEN \"CC_END\" " +
+ " WHEN \"CC_START\" > \"CC_COMMIT_TIME\" " +
+ " THEN \"CC_START\" " +
+ " ELSE \"CC_COMMIT_TIME\" " +
+ " END desc ," +
+ " \"CC_ENQUEUE_TIME\" asc";
- public static final String SHOW_COMPACTION_QUERY =
- "SELECT XX.* FROM ( SELECT " +
- " \"CQ_DATABASE\" AS \"CC_DATABASE\", \"CQ_TABLE\" AS \"CC_TABLE\",
\"CQ_PARTITION\" AS \"CC_PARTITION\", " +
- " \"CQ_STATE\" AS \"CC_STATE\", \"CQ_TYPE\" AS \"CC_TYPE\",
\"CQ_WORKER_ID\" AS \"CC_WORKER_ID\", " +
- " \"CQ_START\" AS \"CC_START\", -1 \"CC_END\", \"CQ_RUN_AS\" AS
\"CC_RUN_AS\", " +
- " \"CQ_HADOOP_JOB_ID\" AS \"CC_HADOOP_JOB_ID\", \"CQ_ID\" AS \"CC_ID\",
\"CQ_ERROR_MESSAGE\" AS \"CC_ERROR_MESSAGE\", " +
- " \"CQ_ENQUEUE_TIME\" AS \"CC_ENQUEUE_TIME\", \"CQ_WORKER_VERSION\" AS
\"CC_WORKER_VERSION\", " +
- " \"CQ_INITIATOR_ID\" AS \"CC_INITIATOR_ID\", \"CQ_INITIATOR_VERSION\" AS
\"CC_INITIATOR_VERSION\", " +
- " \"CQ_CLEANER_START\" AS \"CC_CLEANER_START\", \"CQ_POOL_NAME\" AS
\"CC_POOL_NAME\", \"CQ_TXN_ID\" AS \"CC_TXN_ID\", " +
- " \"CQ_NEXT_TXN_ID\" AS \"CC_NEXT_TXN_ID\", \"CQ_COMMIT_TIME\" AS
\"CC_COMMIT_TIME\", " +
- " \"CQ_HIGHEST_WRITE_ID\" AS \"CC_HIGHEST_WRITE_ID\" " +
- "FROM " +
- " \"COMPACTION_QUEUE\" " +
- "UNION ALL " +
- "SELECT " +
- " \"CC_DATABASE\", \"CC_TABLE\", \"CC_PARTITION\", \"CC_STATE\",
\"CC_TYPE\", \"CC_WORKER_ID\", " +
- " \"CC_START\", \"CC_END\", \"CC_RUN_AS\", \"CC_HADOOP_JOB_ID\",
\"CC_ID\", \"CC_ERROR_MESSAGE\", " +
- " \"CC_ENQUEUE_TIME\", \"CC_WORKER_VERSION\", \"CC_INITIATOR_ID\",
\"CC_INITIATOR_VERSION\", " +
- " -1 , \"CC_POOL_NAME\", \"CC_TXN_ID\", \"CC_NEXT_TXN_ID\",
\"CC_COMMIT_TIME\", " +
- " \"CC_HIGHEST_WRITE_ID\"" +
- "FROM " +
- " \"COMPLETED_COMPACTIONS\" ) XX ";
+ public static final String SHOW_COMPACTION_QUERY =
+ "SELECT XX.* FROM ( SELECT " +
+ " \"CQ_DATABASE\" AS \"CC_DATABASE\", \"CQ_TABLE\" AS
\"CC_TABLE\", \"CQ_PARTITION\" AS \"CC_PARTITION\", " +
+ " \"CQ_STATE\" AS \"CC_STATE\", \"CQ_TYPE\" AS
\"CC_TYPE\", \"CQ_WORKER_ID\" AS \"CC_WORKER_ID\", " +
+ " \"CQ_START\" AS \"CC_START\", -1 \"CC_END\",
\"CQ_RUN_AS\" AS \"CC_RUN_AS\", " +
+ " \"CQ_HADOOP_JOB_ID\" AS \"CC_HADOOP_JOB_ID\", \"CQ_ID\"
AS \"CC_ID\", \"CQ_ERROR_MESSAGE\" AS \"CC_ERROR_MESSAGE\", " +
+ " \"CQ_ENQUEUE_TIME\" AS \"CC_ENQUEUE_TIME\",
\"CQ_WORKER_VERSION\" AS \"CC_WORKER_VERSION\", " +
+ " \"CQ_INITIATOR_ID\" AS \"CC_INITIATOR_ID\",
\"CQ_INITIATOR_VERSION\" AS \"CC_INITIATOR_VERSION\", " +
+ " \"CQ_CLEANER_START\" AS \"CC_CLEANER_START\",
\"CQ_POOL_NAME\" AS \"CC_POOL_NAME\", \"CQ_TXN_ID\" AS \"CC_TXN_ID\", " +
+ " \"CQ_NEXT_TXN_ID\" AS \"CC_NEXT_TXN_ID\",
\"CQ_COMMIT_TIME\" AS \"CC_COMMIT_TIME\", " +
+ " \"CQ_HIGHEST_WRITE_ID\" AS \"CC_HIGHEST_WRITE_ID\" " +
+ "FROM " +
+ " \"COMPACTION_QUEUE\" " +
+ "UNION ALL " +
+ "SELECT " +
+ " \"CC_DATABASE\" , \"CC_TABLE\", \"CC_PARTITION\",
\"CC_STATE\", \"CC_TYPE\", \"CC_WORKER_ID\", " +
+ " \"CC_START\", \"CC_END\", \"CC_RUN_AS\",
\"CC_HADOOP_JOB_ID\", \"CC_ID\", \"CC_ERROR_MESSAGE\", " +
+ " \"CC_ENQUEUE_TIME\", \"CC_WORKER_VERSION\",
\"CC_INITIATOR_ID\", \"CC_INITIATOR_VERSION\", " +
+ " -1 , \"CC_POOL_NAME\", \"CC_TXN_ID\",
\"CC_NEXT_TXN_ID\", \"CC_COMMIT_TIME\", " +
+ " \"CC_HIGHEST_WRITE_ID\"" +
+ "FROM " +
+ " \"COMPLETED_COMPACTIONS\" ) XX ";
+
+
+ public static final String SELECT_COMPACTION_QUEUE_BY_COMPID =
Review Comment:
There is no need for aliases, your are accessing the data by indexes. If the
number and thype of the columns are equal, the coulmn names can differ.
##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java:
##########
@@ -6266,4 +6271,105 @@ public boolean isWrapperFor(Class<?> iface) throws
SQLException {
}
}
+ @Override
+ @RetrySemantics.SafeToRetry
+ public AbortCompactResponse abortCompactions(AbortCompactionRequest reqst)
throws MetaException, NoSuchCompactionException {
+ Map<Long, AbortCompactionResponseElement> abortCompactionResponseElements
= new HashMap<>();
+ AbortCompactResponse response = new AbortCompactResponse(new HashMap<>());
+ response.setAbortedcompacts(abortCompactionResponseElements);
+
+ List<Long> compactionIdsToAbort = reqst.getCompactionIds();
+ if (compactionIdsToAbort.isEmpty()) {
+ LOG.info("Compaction ids are missing in request. No compactions to
abort");
+ throw new NoSuchCompactionException("Compaction ids missing in request.
No compactions to abort");
+ }
+ reqst.getCompactionIds().forEach(x -> {
+ abortCompactionResponseElements.put(x, new
AbortCompactionResponseElement(x, "Error",
+ "No Such Compaction Id Available"));
+ });
+
+ List<CompactionInfo> eligibleCompactionsToAbort =
findEligibleCompactionsToAbort(abortCompactionResponseElements,
+ compactionIdsToAbort);
+ for (int x = 0; x < eligibleCompactionsToAbort.size(); x++) {
+
abortCompactionResponseElements.put(eligibleCompactionsToAbort.get(x).id,
abortCompaction(eligibleCompactionsToAbort.get(x)));
+ }
+ return response;
+ }
+
+ @RetrySemantics.SafeToRetry
+ public AbortCompactionResponseElement abortCompaction(CompactionInfo
compactionInfo) throws MetaException {
+ try {
+ try (Connection dbConn =
getDbConn(Connection.TRANSACTION_READ_COMMITTED, connPoolMutex);
+ PreparedStatement pStmt =
dbConn.prepareStatement(TxnQueries.INSERT_INTO_COMPLETED_COMPACTION)) {
+ compactionInfo.state = TxnStore.ABORTED_STATE;
+ compactionInfo.errorMessage = "Comapction Aborted by Abort Comapction
request.";
+ CompactionInfo.insertIntoCompletedCompactions(pStmt, compactionInfo,
getDbTime(dbConn));
+ int updCount = pStmt.executeUpdate();
+ if (updCount != 1) {
+ LOG.error("Unable to update compaction record: {}. updCnt={}",
compactionInfo, updCount);
+ dbConn.rollback();
+ return new AbortCompactionResponseElement(compactionInfo.id,
+ "Error", "Error while aborting compaction:Unable to update
compaction record in COMPLETED_COMPACTIONS");
+ } else {
+ LOG.debug("Inserted {} entries into COMPLETED_COMPACTIONS",
updCount);
+ try (PreparedStatement stmt = dbConn.prepareStatement("DELETE FROM
\"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = ?")) {
+ stmt.setLong(1, compactionInfo.id);
+ LOG.debug("Going to execute update on COMPACTION_QUEUE ");
+ updCount = stmt.executeUpdate();
+ if (updCount != 1) {
+ LOG.error("Unable to update compaction record: {}. updCnt={}",
compactionInfo, updCount);
+ dbConn.rollback();
+ return new AbortCompactionResponseElement(compactionInfo.id,
+ "Error", "Error while aborting compaction: Unable to
update compaction record in COMPACTION_QUEUE");
+ } else {
+ dbConn.commit();
+ return new AbortCompactionResponseElement(compactionInfo.id,
+ "Success", "Successfully aborted compaction");
+ }
+ } catch (SQLException e) {
+ dbConn.rollback();
+ return new AbortCompactionResponseElement(compactionInfo.id,
+ "Error", "Error while aborting compaction:" +
e.getMessage());
+ }
+ }
+ } catch (SQLException e) {
+ LOG.error("Unable to connect to transaction database: " +
e.getMessage());
+ checkRetryable(e, "abortCompaction(" + compactionInfo + ")");
+ return new AbortCompactionResponseElement(compactionInfo.id,
+ "Error", "Error while aborting compaction:" + e.getMessage());
+ }
+ } catch (RetryException e) {
+ return abortCompaction(compactionInfo);
+ }
+ }
+
+ private List<CompactionInfo> findEligibleCompactionsToAbort(Map<Long,
+ AbortCompactionResponseElement> abortCompactionResponseElements,
List<Long> requestedCompId) throws MetaException {
+
+ List<CompactionInfo> compactionInfoList = new ArrayList<>();
+ String queryText = TxnQueries.SELECT_COMPACTION_QUEUE_BY_COMPID + " WHERE
\"CC_ID\" IN (?) " ;
+ String sqlIN = requestedCompId.stream()
+ .map(x -> String.valueOf(x))
+ .collect(Collectors.joining(",", "(", ")"));
+ queryText = queryText.replace("(?)", sqlIN);
Review Comment:
You could simply append the in clause to the end of the query, there's no
need of replace here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]