This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch 1451-external-compactions-feature in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/1451-external-compactions-feature by this push: new 3f0399c Fixes #2013 and prevents commit after tablet closes 3f0399c is described below commit 3f0399cf5df69bd8d69cf198d8882a239573f08d Author: Keith Turner <ktur...@apache.org> AuthorDate: Tue Apr 13 18:11:22 2021 -0400 Fixes #2013 and prevents commit after tablet closes --- .../coordinator/DeadCompactionDetector.java | 37 ++++++++++ .../tserver/compactions/CompactionManager.java | 4 - .../accumulo/tserver/tablet/CompactableImpl.java | 85 ++++++++++++++-------- 3 files changed, 92 insertions(+), 34 deletions(-) diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java index ed4419a..1feb7f5 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java @@ -20,7 +20,10 @@ package org.apache.accumulo.coordinator; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; import java.util.Map; +import java.util.Set; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; @@ -97,6 +100,34 @@ public class DeadCompactionDetector { finalizer.failCompactions(tabletCompactions); } + private void detectDanglingFinalStateMarkers() { + Iterator<ExternalCompactionId> iter = context.getAmple().getExternalCompactionFinalStates() + .map(ecfs -> ecfs.getExternalCompactionId()).iterator(); + Set<ExternalCompactionId> danglingEcids = new HashSet<>(); + + while (iter.hasNext()) { + danglingEcids.add(iter.next()); + + if (danglingEcids.size() > 10000) { + checkForDanglingMarkers(danglingEcids); + danglingEcids.clear(); + } + } + + checkForDanglingMarkers(danglingEcids); + } + + private void checkForDanglingMarkers(Set<ExternalCompactionId> danglingEcids) { + context.getAmple().readTablets().forLevel(DataLevel.USER).fetch(ColumnType.ECOMP).build() + .stream().flatMap(tm -> tm.getExternalCompactions().keySet().stream()) + .forEach(danglingEcids::remove); + + danglingEcids.forEach( + ecid -> log.debug("Detected dangling external compaction final state marker {}", ecid)); + + context.getAmple().deleteExternalCompactionFinalStates(danglingEcids); + } + public void start() { Threads.createThread("DeadCompactionDetector", () -> { while (!Thread.currentThread().isInterrupted()) { @@ -107,6 +138,12 @@ public class DeadCompactionDetector { log.warn("Failed to look for dead compactions", e); } + try { + detectDanglingFinalStateMarkers(); + } catch (RuntimeException e) { + log.warn("Failed to look for dangling compaction final state markers", e); + } + // TODO make bigger UtilWaitThread.sleep(30_000); } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java index 8228d12..be75ed6 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java @@ -459,8 +459,6 @@ public class CompactionManager { compactablesToCheck.add(tablet.asCompactable()); } runningExternalCompactions.remove(extCompactionId); - } else { - ctx.getAmple().deleteExternalCompactionFinalStates(List.of(extCompactionId)); } } @@ -481,8 +479,6 @@ public class CompactionManager { compactablesToCheck.add(tablet.asCompactable()); } runningExternalCompactions.remove(ecid); - } else { - ctx.getAmple().deleteExternalCompactionFinalStates(List.of(ecid)); } } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java index 09480f7..177358c 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java @@ -130,6 +130,8 @@ public class CompactableImpl implements Compactable { private Map<ExternalCompactionId,ExternalCompactionInfo> externalCompactions = new ConcurrentHashMap<>(); + private Set<ExternalCompactionId> externalCompactionsCommitting = new HashSet<>(); + // This interface exists for two purposes. First it allows abstraction of new and old // implementations for user pluggable file selection code. Second it facilitates placing code // outside of this class. @@ -792,16 +794,22 @@ public class CompactableImpl implements Compactable { @Override public void commitExternalCompaction(ExternalCompactionId extCompactionId, long fileSize, long entries) { - // CBUG double check w/ java docs that only one thread can remove - ExternalCompactionInfo ecInfo = externalCompactions.get(extCompactionId); - - if (ecInfo != null) { - synchronized (ecInfo) { - if (!externalCompactions.containsKey(extCompactionId)) { - // since this method is called by RPCs there could be multiple concurrent calls so defend - // against that - return; - } + + synchronized (this) { + if (closed) + return; + + // defend against multiple threads trying to commit the same ECID and force tablet close to + // wait on any pending commits + if (!externalCompactionsCommitting.add(extCompactionId)) { + return; + } + } + try { + + ExternalCompactionInfo ecInfo = externalCompactions.get(extCompactionId); + + if (ecInfo != null) { log.debug("Attempting to commit external compaction {}", extCompactionId); // TODO do a sanity check that files exists in dfs? StoredTabletFile metaFile = null; @@ -819,40 +827,55 @@ public class CompactableImpl implements Compactable { externalCompactions.remove(extCompactionId); log.debug("Completed commit of external compaction {}", extCompactionId); } + } else { + log.debug("Ignoring request to commit external compaction that is unknown {}", + extCompactionId); } - } else { - log.debug("Ignoring request to commit external compaction that is unknown {}", - extCompactionId); + tablet.getContext().getAmple().deleteExternalCompactionFinalStates(List.of(extCompactionId)); + } finally { + synchronized (this) { + Preconditions.checkState(externalCompactionsCommitting.remove(extCompactionId)); + Preconditions.checkState(!closed); + notifyAll(); + } } - - tablet.getContext().getAmple().deleteExternalCompactionFinalStates(List.of(extCompactionId)); } @Override public void externalCompactionFailed(ExternalCompactionId ecid) { - ExternalCompactionInfo ecInfo = externalCompactions.get(ecid); - - if (ecInfo != null) { - synchronized (ecInfo) { - if (!externalCompactions.containsKey(ecid)) { - // since this method is called by RPCs there could be multiple concurrent calls so defend - // against that - return; - } + synchronized (this) { + if (closed) + return; + + if (!externalCompactionsCommitting.add(ecid)) { + return; + } + } + try { + + ExternalCompactionInfo ecInfo = externalCompactions.get(ecid); + + if (ecInfo != null) { // CBUG review following code to ensure its idempotent tablet.getContext().getAmple().mutateTablet(getExtent()).deleteExternalCompaction(ecid) .mutate(); completeCompaction(ecInfo.job, ecInfo.meta.getJobFiles(), null); externalCompactions.remove(ecid); log.debug("Processed external compaction failure {}", ecid); + } else { + log.debug("Ignoring request to fail external compaction that is unknown {}", ecid); } - } else { - log.debug("Ignoring request to fail external compaction that is unknown {}", ecid); - } - tablet.getContext().getAmple().deleteExternalCompactionFinalStates(List.of(ecid)); + tablet.getContext().getAmple().deleteExternalCompactionFinalStates(List.of(ecid)); + } finally { + synchronized (this) { + Preconditions.checkState(externalCompactionsCommitting.remove(ecid)); + Preconditions.checkState(!closed); + notifyAll(); + } + } } @Override @@ -942,8 +965,10 @@ public class CompactableImpl implements Compactable { closed = true; - // wait while internal jobs are running - while (runnningJobs.stream().anyMatch(job -> !job.getExecutor().isExernalId())) { + // wait while internal jobs are running or external compactions are committing, but do not wait + // on external compactions that are running + while (runnningJobs.stream().anyMatch(job -> !job.getExecutor().isExernalId()) + || !externalCompactionsCommitting.isEmpty()) { try { wait(50); } catch (InterruptedException e) {