This is an automated email from the ASF dual-hosted git repository.

dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new c3aed16  HIVE-25631: Initiator speed-up: only read compaction history 
once per loop (Denys Kuzmenko, reviewed by Karen Coppage)
c3aed16 is described below

commit c3aed164ce56a01e00bec2943920f9fc92cac7f0
Author: Denys Kuzmenko <[email protected]>
AuthorDate: Wed Nov 10 09:06:16 2021 +0100

    HIVE-25631: Initiator speed-up: only read compaction history once per loop 
(Denys Kuzmenko, reviewed by Karen Coppage)
    
    Closes #2741
---
 .../hadoop/hive/ql/txn/compactor/Initiator.java    | 81 ++++++++++++++--------
 1 file changed, 52 insertions(+), 29 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index 1b3e964..adebd31 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -63,8 +63,10 @@ import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
+import java.util.LongSummaryStatistics;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
@@ -268,19 +270,51 @@ public class Initiator extends MetaStoreCompactorThread {
         HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT, 
TimeUnit.MILLISECONDS));
   }
 
-  // Figure out if there are any currently running compactions on the same 
table or partition.
-  private boolean lookForCurrentCompactions(ShowCompactResponse compactions,
-                                            CompactionInfo ci) {
-    if (compactions.getCompacts() != null) {
-      for (ShowCompactResponseElement e : compactions.getCompacts()) {
-         if ((e.getState().equals(TxnStore.WORKING_RESPONSE) || 
e.getState().equals(TxnStore.INITIATED_RESPONSE)) &&
-            e.getDbname().equals(ci.dbname) &&
-            e.getTablename().equals(ci.tableName) &&
-            (e.getPartitionname() == null && ci.partName == null ||
-                  e.getPartitionname().equals(ci.partName))) {
-          return true;
-        }
-      }
+  private boolean foundCurrentOrFailedCompactions(ShowCompactResponse 
compactions, CompactionInfo ci) throws MetaException {
+    if (compactions.getCompacts() == null) {
+      return false;
+    }
+    List<ShowCompactResponseElement> filteredElements = 
compactions.getCompacts().stream()
+      .filter(e -> e.getDbname().equals(ci.dbname)
+        && e.getTablename().equals(ci.tableName)
+        && (e.getPartitionname() == null && ci.partName == null || 
e.getPartitionname().equals(ci.partName)))
+      .collect(Collectors.toList());
+
+    // Figure out if there are any currently running compactions on the same 
table or partition.
+    if (filteredElements.stream().anyMatch(
+        e -> TxnStore.WORKING_RESPONSE.equals(e.getState()) || 
TxnStore.INITIATED_RESPONSE.equals(e.getState()))) {
+
+      LOG.info("Found currently initiated or working compaction for " +
+        ci.getFullPartitionName() + " so we will not initiate another 
compaction");
+      return true;
+    }
+
+    // Check if there is already sufficient number of consecutive failures for 
this table/partition
+    // so that no new automatic compactions needs to be scheduled.
+    int failedThreshold = MetastoreConf.getIntVar(conf, 
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD);
+
+    LongSummaryStatistics failedStats = filteredElements.stream()
+      .filter(e -> TxnStore.SUCCEEDED_RESPONSE.equals(e.getState()) || 
TxnStore.FAILED_RESPONSE.equals(e.getState()))
+      
.sorted(Comparator.comparingLong(ShowCompactResponseElement::getId).reversed())
+      .limit(failedThreshold)
+
+      .filter(e -> TxnStore.FAILED_RESPONSE.equals(e.getState()))
+      
.collect(Collectors.summarizingLong(ShowCompactResponseElement::getEnqueueTime));
+
+    // If the last attempt was too long ago, ignore the failed threshold and 
try compaction again
+    long retryTime = MetastoreConf.getTimeVar(conf,
+      MetastoreConf.ConfVars.COMPACTOR_INITIATOR_FAILED_RETRY_TIME, 
TimeUnit.MILLISECONDS);
+
+    boolean needsRetry = (retryTime > 0) && (failedStats.getMax() + retryTime 
< System.currentTimeMillis());
+    if (failedStats.getCount() == failedThreshold && !needsRetry) {
+      LOG.warn("Will not initiate compaction for " + ci.getFullPartitionName() 
+ " since last " +
+        MetastoreConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD + " 
attempts to compact it failed.");
+
+      ci.errorMessage = "Compaction is not initiated since last " +
+        MetastoreConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD + " 
consecutive compaction attempts failed)";
+
+      txnHandler.markFailed(ci);
+      return true;
     }
     return false;
   }
@@ -469,14 +503,11 @@ public class Initiator extends MetaStoreCompactorThread {
 
       LOG.info("Checking to see if we should compact " + 
ci.getFullPartitionName());
 
-      // Check if we already have initiated or are working on a compaction for 
this partition
-      // or table. If so, skip it. If we are just waiting on cleaning we can 
still check,
-      // as it may be time to compact again even though we haven't cleaned.
-      // todo: this is not robust. You can easily run `alter table` to start a 
compaction between
-      // the time currentCompactions is generated and now
-      if (lookForCurrentCompactions(currentCompactions, ci)) {
-        LOG.info("Found currently initiated or working compaction for " +
-            ci.getFullPartitionName() + " so we will not initiate another 
compaction");
+      // Check if we have already initiated or are working on a compaction for 
this table/partition.
+      // Also make sure we haven't exceeded configured number of consecutive 
failures.
+      // If any of the above applies, skip it.
+      // Note: if we are just waiting on cleaning we can still check, as it 
may be time to compact again even though we haven't cleaned.
+      if (foundCurrentOrFailedCompactions(currentCompactions, ci)) {
         return false;
       }
 
@@ -508,14 +539,6 @@ public class Initiator extends MetaStoreCompactorThread {
         return false;
       }
 
-      if (txnHandler.checkFailedCompactions(ci)) {
-        LOG.warn("Will not initiate compaction for " + 
ci.getFullPartitionName() + " since last " +
-            MetastoreConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD + " 
attempts to compact it failed.");
-        ci.errorMessage = "Compaction is not initiated since last " +
-            MetastoreConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD + " 
consecutive compaction attempts failed)";
-        txnHandler.markFailed(ci);
-        return false;
-      }
     } catch (Throwable e) {
       LOG.error("Caught exception while checking compaction eligibility.", e);
       try {

Reply via email to