This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 1451-external-compactions-feature
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to 
refs/heads/1451-external-compactions-feature by this push:
     new 3f0399c  Fixes #2013 and prevents commit after tablet closes
3f0399c is described below

commit 3f0399cf5df69bd8d69cf198d8882a239573f08d
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Tue Apr 13 18:11:22 2021 -0400

    Fixes #2013 and prevents commit after tablet closes
---
 .../coordinator/DeadCompactionDetector.java        | 37 ++++++++++
 .../tserver/compactions/CompactionManager.java     |  4 -
 .../accumulo/tserver/tablet/CompactableImpl.java   | 85 ++++++++++++++--------
 3 files changed, 92 insertions(+), 34 deletions(-)

diff --git 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java
 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java
index ed4419a..1feb7f5 100644
--- 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java
+++ 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java
@@ -20,7 +20,10 @@ package org.apache.accumulo.coordinator;
 
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
@@ -97,6 +100,34 @@ public class DeadCompactionDetector {
     finalizer.failCompactions(tabletCompactions);
   }
 
+  private void detectDanglingFinalStateMarkers() {
+    Iterator<ExternalCompactionId> iter = 
context.getAmple().getExternalCompactionFinalStates()
+        .map(ecfs -> ecfs.getExternalCompactionId()).iterator();
+    Set<ExternalCompactionId> danglingEcids = new HashSet<>();
+
+    while (iter.hasNext()) {
+      danglingEcids.add(iter.next());
+
+      if (danglingEcids.size() > 10000) {
+        checkForDanglingMarkers(danglingEcids);
+        danglingEcids.clear();
+      }
+    }
+
+    checkForDanglingMarkers(danglingEcids);
+  }
+
+  private void checkForDanglingMarkers(Set<ExternalCompactionId> 
danglingEcids) {
+    
context.getAmple().readTablets().forLevel(DataLevel.USER).fetch(ColumnType.ECOMP).build()
+        .stream().flatMap(tm -> tm.getExternalCompactions().keySet().stream())
+        .forEach(danglingEcids::remove);
+
+    danglingEcids.forEach(
+        ecid -> log.debug("Detected dangling external compaction final state 
marker {}", ecid));
+
+    context.getAmple().deleteExternalCompactionFinalStates(danglingEcids);
+  }
+
   public void start() {
     Threads.createThread("DeadCompactionDetector", () -> {
       while (!Thread.currentThread().isInterrupted()) {
@@ -107,6 +138,12 @@ public class DeadCompactionDetector {
           log.warn("Failed to look for dead compactions", e);
         }
 
+        try {
+          detectDanglingFinalStateMarkers();
+        } catch (RuntimeException e) {
+          log.warn("Failed to look for dangling compaction final state 
markers", e);
+        }
+
         // TODO make bigger
         UtilWaitThread.sleep(30_000);
       }
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
index 8228d12..be75ed6 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
@@ -459,8 +459,6 @@ public class CompactionManager {
         compactablesToCheck.add(tablet.asCompactable());
       }
       runningExternalCompactions.remove(extCompactionId);
-    } else {
-      
ctx.getAmple().deleteExternalCompactionFinalStates(List.of(extCompactionId));
     }
   }
 
@@ -481,8 +479,6 @@ public class CompactionManager {
         compactablesToCheck.add(tablet.asCompactable());
       }
       runningExternalCompactions.remove(ecid);
-    } else {
-      ctx.getAmple().deleteExternalCompactionFinalStates(List.of(ecid));
     }
   }
 
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
index 09480f7..177358c 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
@@ -130,6 +130,8 @@ public class CompactableImpl implements Compactable {
   private Map<ExternalCompactionId,ExternalCompactionInfo> externalCompactions 
=
       new ConcurrentHashMap<>();
 
+  private Set<ExternalCompactionId> externalCompactionsCommitting = new 
HashSet<>();
+
   // This interface exists for two purposes. First it allows abstraction of 
new and old
   // implementations for user pluggable file selection code. Second it 
facilitates placing code
   // outside of this class.
@@ -792,16 +794,22 @@ public class CompactableImpl implements Compactable {
   @Override
   public void commitExternalCompaction(ExternalCompactionId extCompactionId, 
long fileSize,
       long entries) {
-    // CBUG double check w/ java docs that only one thread can remove
-    ExternalCompactionInfo ecInfo = externalCompactions.get(extCompactionId);
-
-    if (ecInfo != null) {
-      synchronized (ecInfo) {
-        if (!externalCompactions.containsKey(extCompactionId)) {
-          // since this method is called by RPCs there could be multiple 
concurrent calls so defend
-          // against that
-          return;
-        }
+
+    synchronized (this) {
+      if (closed)
+        return;
+
+      // defend against multiple threads trying to commit the same ECID and 
force tablet close to
+      // wait on any pending commits
+      if (!externalCompactionsCommitting.add(extCompactionId)) {
+        return;
+      }
+    }
+    try {
+
+      ExternalCompactionInfo ecInfo = externalCompactions.get(extCompactionId);
+
+      if (ecInfo != null) {
         log.debug("Attempting to commit external compaction {}", 
extCompactionId);
         // TODO do a sanity check that files exists in dfs?
         StoredTabletFile metaFile = null;
@@ -819,40 +827,55 @@ public class CompactableImpl implements Compactable {
           externalCompactions.remove(extCompactionId);
           log.debug("Completed commit of external compaction {}", 
extCompactionId);
         }
+      } else {
+        log.debug("Ignoring request to commit external compaction that is 
unknown {}",
+            extCompactionId);
       }
 
-    } else {
-      log.debug("Ignoring request to commit external compaction that is 
unknown {}",
-          extCompactionId);
+      
tablet.getContext().getAmple().deleteExternalCompactionFinalStates(List.of(extCompactionId));
+    } finally {
+      synchronized (this) {
+        
Preconditions.checkState(externalCompactionsCommitting.remove(extCompactionId));
+        Preconditions.checkState(!closed);
+        notifyAll();
+      }
     }
-
-    
tablet.getContext().getAmple().deleteExternalCompactionFinalStates(List.of(extCompactionId));
   }
 
   @Override
   public void externalCompactionFailed(ExternalCompactionId ecid) {
-    ExternalCompactionInfo ecInfo = externalCompactions.get(ecid);
-
-    if (ecInfo != null) {
-      synchronized (ecInfo) {
-        if (!externalCompactions.containsKey(ecid)) {
-          // since this method is called by RPCs there could be multiple 
concurrent calls so defend
-          // against that
-          return;
-        }
 
+    synchronized (this) {
+      if (closed)
+        return;
+
+      if (!externalCompactionsCommitting.add(ecid)) {
+        return;
+      }
+    }
+    try {
+
+      ExternalCompactionInfo ecInfo = externalCompactions.get(ecid);
+
+      if (ecInfo != null) {
         // CBUG review following code to ensure its idempotent
         
tablet.getContext().getAmple().mutateTablet(getExtent()).deleteExternalCompaction(ecid)
             .mutate();
         completeCompaction(ecInfo.job, ecInfo.meta.getJobFiles(), null);
         externalCompactions.remove(ecid);
         log.debug("Processed external compaction failure {}", ecid);
+      } else {
+        log.debug("Ignoring request to fail external compaction that is 
unknown {}", ecid);
       }
-    } else {
-      log.debug("Ignoring request to fail external compaction that is unknown 
{}", ecid);
-    }
 
-    
tablet.getContext().getAmple().deleteExternalCompactionFinalStates(List.of(ecid));
+      
tablet.getContext().getAmple().deleteExternalCompactionFinalStates(List.of(ecid));
+    } finally {
+      synchronized (this) {
+        Preconditions.checkState(externalCompactionsCommitting.remove(ecid));
+        Preconditions.checkState(!closed);
+        notifyAll();
+      }
+    }
   }
 
   @Override
@@ -942,8 +965,10 @@ public class CompactableImpl implements Compactable {
 
     closed = true;
 
-    // wait while internal jobs are running
-    while (runnningJobs.stream().anyMatch(job -> 
!job.getExecutor().isExernalId())) {
+    // wait while internal jobs are running or external compactions are 
committing, but do not wait
+    // on external compactions that are running
+    while (runnningJobs.stream().anyMatch(job -> 
!job.getExecutor().isExernalId())
+        || !externalCompactionsCommitting.isEmpty()) {
       try {
         wait(50);
       } catch (InterruptedException e) {

Reply via email to