This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new 3dc2bb7 Make DeadCompactionDetector handle network hiccups (#2132) 3dc2bb7 is described below commit 3dc2bb736995ef9a4bd78eedaeda2e2c59585338 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Mon Jun 7 08:18:24 2021 -0400 Make DeadCompactionDetector handle network hiccups (#2132) Modified the DeadCompactionDetector to fail compactions if they are dead for more than two cycles. This should handle the case where there is a transient network issue talking to another component. Closes #2125 --- .../coordinator/DeadCompactionDetector.java | 43 +++++++++++++++++++--- 1 file changed, 38 insertions(+), 5 deletions(-) diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java index 1b5bd28..3b6a3b4 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java @@ -21,8 +21,12 @@ package org.apache.accumulo.coordinator; import java.util.Collection; import java.util.HashMap; import java.util.Map; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.dataImpl.KeyExtent; @@ -40,13 +44,15 @@ public class DeadCompactionDetector { private final ServerContext context; private final CompactionCoordinator coordinator; - private ScheduledThreadPoolExecutor schedExecutor; + private final ScheduledThreadPoolExecutor schedExecutor; + private final ConcurrentHashMap<ExternalCompactionId,Long> deadCompactions; public DeadCompactionDetector(ServerContext context, CompactionCoordinator coordinator, ScheduledThreadPoolExecutor stpe) { this.context = context; this.coordinator = coordinator; this.schedExecutor = stpe; + this.deadCompactions = new ConcurrentHashMap<>(); } private void detectDeadCompactions() { @@ -66,6 +72,9 @@ public class DeadCompactionDetector { }); if (tabletCompactions.isEmpty()) { + // Clear out dead compactions, tservers don't think anything is running + log.trace("Clearing the dead compaction map, no tablets have compactions running"); + this.deadCompactions.clear(); // no need to look for dead compactions when tablets don't have anything recorded as running return; } @@ -74,6 +83,10 @@ public class DeadCompactionDetector { tabletCompactions.forEach((ecid, extent) -> log.trace("Saw {} for {}", ecid, extent)); } + // Remove from the dead map any compactions that the Tablet's + // do not think are running any more. + this.deadCompactions.keySet().retainAll(tabletCompactions.keySet()); + // Determine what compactions are currently running and remove those. // // In order for this overall algorithm to be correct and avoid race conditions, the compactor @@ -84,21 +97,41 @@ public class DeadCompactionDetector { running.forEach((ecid) -> { if (tabletCompactions.remove(ecid) != null) { - log.trace("Removed {} running on a compactor", ecid); + log.trace("Removed compaction {} running on a compactor", ecid); + } + if (this.deadCompactions.remove(ecid) != null) { + log.trace("Removed {} from the dead compaction map, it's running on a compactor", ecid); } }); // Determine which compactions are currently committing and remove those context.getAmple().getExternalCompactionFinalStates() - .map(ecfs -> ecfs.getExternalCompactionId()).forEach(tabletCompactions::remove); + .map(ecfs -> ecfs.getExternalCompactionId()).forEach(ecid -> { + if (tabletCompactions.remove(ecid) != null) { + log.trace("Removed compaction {} that is committing", ecid); + } + if (this.deadCompactions.remove(ecid) != null) { + log.trace("Removed {} from the dead compaction map, it's committing", ecid); + } + }); - tabletCompactions - .forEach((ecid, extent) -> log.debug("Detected dead compaction {} {}", ecid, extent)); + tabletCompactions.forEach((ecid, extent) -> { + log.debug("Possible dead compaction detected {} {}", ecid, extent); + this.deadCompactions.merge(ecid, 1L, Long::sum); + }); // Everything left in tabletCompactions is no longer running anywhere and should be failed. // Its possible that a compaction committed while going through the steps above, if so then // that is ok and marking it failed will end up being a no-op. + Set<ExternalCompactionId> toFail = + this.deadCompactions.entrySet().stream().filter(e -> e.getValue() > 2).map(e -> e.getKey()) + .collect(Collectors.toCollection(TreeSet::new)); + tabletCompactions.keySet().retainAll(toFail); + tabletCompactions.forEach((eci, v) -> { + log.warn("Compaction {} believed to be dead, failing it.", eci); + }); coordinator.compactionFailed(tabletCompactions); + this.deadCompactions.keySet().removeAll(toFail); } public void start() {