This is an automated email from the ASF dual-hosted git repository.
dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new c3aed16 HIVE-25631: Initiator speed-up: only read compaction history
once per loop (Denys Kuzmenko, reviewed by Karen Coppage)
c3aed16 is described below
commit c3aed164ce56a01e00bec2943920f9fc92cac7f0
Author: Denys Kuzmenko <[email protected]>
AuthorDate: Wed Nov 10 09:06:16 2021 +0100
HIVE-25631: Initiator speed-up: only read compaction history once per loop
(Denys Kuzmenko, reviewed by Karen Coppage)
Closes #2741
---
.../hadoop/hive/ql/txn/compactor/Initiator.java | 81 ++++++++++++++--------
1 file changed, 52 insertions(+), 29 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index 1b3e964..adebd31 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -63,8 +63,10 @@ import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
+import java.util.LongSummaryStatistics;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
@@ -268,19 +270,51 @@ public class Initiator extends MetaStoreCompactorThread {
HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT,
TimeUnit.MILLISECONDS));
}
- // Figure out if there are any currently running compactions on the same
table or partition.
- private boolean lookForCurrentCompactions(ShowCompactResponse compactions,
- CompactionInfo ci) {
- if (compactions.getCompacts() != null) {
- for (ShowCompactResponseElement e : compactions.getCompacts()) {
- if ((e.getState().equals(TxnStore.WORKING_RESPONSE) ||
e.getState().equals(TxnStore.INITIATED_RESPONSE)) &&
- e.getDbname().equals(ci.dbname) &&
- e.getTablename().equals(ci.tableName) &&
- (e.getPartitionname() == null && ci.partName == null ||
- e.getPartitionname().equals(ci.partName))) {
- return true;
- }
- }
+ private boolean foundCurrentOrFailedCompactions(ShowCompactResponse
compactions, CompactionInfo ci) throws MetaException {
+ if (compactions.getCompacts() == null) {
+ return false;
+ }
+ List<ShowCompactResponseElement> filteredElements =
compactions.getCompacts().stream()
+ .filter(e -> e.getDbname().equals(ci.dbname)
+ && e.getTablename().equals(ci.tableName)
+ && (e.getPartitionname() == null && ci.partName == null ||
e.getPartitionname().equals(ci.partName)))
+ .collect(Collectors.toList());
+
+ // Figure out if there are any currently running compactions on the same
table or partition.
+ if (filteredElements.stream().anyMatch(
+ e -> TxnStore.WORKING_RESPONSE.equals(e.getState()) ||
TxnStore.INITIATED_RESPONSE.equals(e.getState()))) {
+
+ LOG.info("Found currently initiated or working compaction for " +
+ ci.getFullPartitionName() + " so we will not initiate another
compaction");
+ return true;
+ }
+
+ // Check if there is already sufficient number of consecutive failures for
this table/partition
+ // so that no new automatic compactions needs to be scheduled.
+ int failedThreshold = MetastoreConf.getIntVar(conf,
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD);
+
+ LongSummaryStatistics failedStats = filteredElements.stream()
+ .filter(e -> TxnStore.SUCCEEDED_RESPONSE.equals(e.getState()) ||
TxnStore.FAILED_RESPONSE.equals(e.getState()))
+
.sorted(Comparator.comparingLong(ShowCompactResponseElement::getId).reversed())
+ .limit(failedThreshold)
+
+ .filter(e -> TxnStore.FAILED_RESPONSE.equals(e.getState()))
+
.collect(Collectors.summarizingLong(ShowCompactResponseElement::getEnqueueTime));
+
+ // If the last attempt was too long ago, ignore the failed threshold and
try compaction again
+ long retryTime = MetastoreConf.getTimeVar(conf,
+ MetastoreConf.ConfVars.COMPACTOR_INITIATOR_FAILED_RETRY_TIME,
TimeUnit.MILLISECONDS);
+
+ boolean needsRetry = (retryTime > 0) && (failedStats.getMax() + retryTime
< System.currentTimeMillis());
+ if (failedStats.getCount() == failedThreshold && !needsRetry) {
+ LOG.warn("Will not initiate compaction for " + ci.getFullPartitionName()
+ " since last " +
+ MetastoreConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD + "
attempts to compact it failed.");
+
+ ci.errorMessage = "Compaction is not initiated since last " +
+ MetastoreConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD + "
consecutive compaction attempts failed)";
+
+ txnHandler.markFailed(ci);
+ return true;
}
return false;
}
@@ -469,14 +503,11 @@ public class Initiator extends MetaStoreCompactorThread {
LOG.info("Checking to see if we should compact " +
ci.getFullPartitionName());
- // Check if we already have initiated or are working on a compaction for
this partition
- // or table. If so, skip it. If we are just waiting on cleaning we can
still check,
- // as it may be time to compact again even though we haven't cleaned.
- // todo: this is not robust. You can easily run `alter table` to start a
compaction between
- // the time currentCompactions is generated and now
- if (lookForCurrentCompactions(currentCompactions, ci)) {
- LOG.info("Found currently initiated or working compaction for " +
- ci.getFullPartitionName() + " so we will not initiate another
compaction");
+ // Check if we have already initiated or are working on a compaction for
this table/partition.
+ // Also make sure we haven't exceeded configured number of consecutive
failures.
+ // If any of the above applies, skip it.
+ // Note: if we are just waiting on cleaning we can still check, as it
may be time to compact again even though we haven't cleaned.
+ if (foundCurrentOrFailedCompactions(currentCompactions, ci)) {
return false;
}
@@ -508,14 +539,6 @@ public class Initiator extends MetaStoreCompactorThread {
return false;
}
- if (txnHandler.checkFailedCompactions(ci)) {
- LOG.warn("Will not initiate compaction for " +
ci.getFullPartitionName() + " since last " +
- MetastoreConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD + "
attempts to compact it failed.");
- ci.errorMessage = "Compaction is not initiated since last " +
- MetastoreConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD + "
consecutive compaction attempts failed)";
- txnHandler.markFailed(ci);
- return false;
- }
} catch (Throwable e) {
LOG.error("Caught exception while checking compaction eligibility.", e);
try {