This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch 1451-external-compactions-feature in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/1451-external-compactions-feature by this push: new f3ece88 Get metrics for external compactions working f3ece88 is described below commit f3ece8801ebf027bb1e7b3b665bd14ae90012c6f Author: Keith Turner <ktur...@apache.org> AuthorDate: Sun Apr 4 13:12:31 2021 -0400 Get metrics for external compactions working This change provides external compaction info to existing tserver compaction metrics code. In theory this should cause tservers to emit metrics for running and queued external compactions, however it has not been tested yet. This change cleans up unused ExternalCompactionExecutors and ExternalCompaction informaiton in the tserver. This cleanup needed to be done to protect memory usage and support accurate metrics. Added strong sanity checks to the sets of files passed between a tablet and compaction planner. These checks exposed a transient problem that needs further investigation, but the problem does not prevent compactions. A tablet is trying to pass bad information to a planner for a short time and the sanity prevented the information from being passed. --- .../accumulo/coordinator/CompactionFinalizer.java | 16 ++++++--- .../accumulo/tserver/compactions/Compactable.java | 16 +++++++++ .../tserver/compactions/CompactionExecutor.java | 8 +++-- .../tserver/compactions/CompactionManager.java | 24 ++++++++++--- .../tserver/compactions/CompactionService.java | 24 +++++++++---- .../compactions/ExternalCompactionExecutor.java | 42 +++++++++++----------- .../compactions/InternalCompactionExecutor.java | 8 +++-- .../accumulo/tserver/tablet/CompactableImpl.java | 6 ++++ 8 files changed, 102 insertions(+), 42 deletions(-) diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java index f7e19bc..a96f891 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java @@ -53,7 +53,8 @@ public class CompactionFinalizer { private static final Logger LOG = LoggerFactory.getLogger(CompactionFinalizer.class); private final ServerContext context; - private final ExecutorService executor; + private final ExecutorService ntfyExecutor; + private final ExecutorService backgroundExecutor; private final BlockingQueue<ExternalCompactionFinalState> pendingNotifications; CompactionFinalizer(ServerContext context) { @@ -62,13 +63,17 @@ public class CompactionFinalizer { // CBUG configure thread factory // CBUG make pool size configurable? - this.executor = ThreadPools.createFixedThreadPool(3, "CompactionFinalizer", false); + this.ntfyExecutor = + ThreadPools.createFixedThreadPool(3, "Compaction Finalizer Notifyer", false); - executor.execute(() -> { + this.backgroundExecutor = + ThreadPools.createFixedThreadPool(2, "Compaction Finalizer Background Task", false); + + backgroundExecutor.execute(() -> { processPending(); }); - executor.execute(() -> { + backgroundExecutor.execute(() -> { notifyTservers(); }); } @@ -145,7 +150,8 @@ public class CompactionFinalizer { if (tabletMetadata != null && tabletMetadata.getExtent().equals(ecfs.getExtent()) && tabletMetadata.getLocation() != null && tabletMetadata.getLocation().getType() == LocationType.CURRENT) { - futures.add(executor.submit(() -> notifyTserver(tabletMetadata.getLocation(), ecfs))); + futures + .add(ntfyExecutor.submit(() -> notifyTserver(tabletMetadata.getLocation(), ecfs))); } else { // this is an unknown tablet so need to delete its final state marker from metadata // table diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/Compactable.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/Compactable.java index b1a920e..be70321 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/Compactable.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/Compactable.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.SortedMap; +import java.util.function.Consumer; import java.util.stream.Collectors; import org.apache.accumulo.core.client.admin.compaction.CompactableFile; @@ -38,6 +39,8 @@ import org.apache.accumulo.core.spi.compaction.CompactionKind; import org.apache.accumulo.core.spi.compaction.CompactionServiceId; import org.apache.accumulo.core.util.ratelimit.RateLimiter; +import com.google.common.base.Preconditions; + /** * Interface between compaction service and tablet. */ @@ -66,6 +69,17 @@ public interface Compactable { .map(stf -> new CompactableFileImpl(stf, allFiles.get(stf))).collect(Collectors.toSet())); this.compacting = Set.copyOf(running); + + // Do some sanity checks on sets of files + Preconditions.checkArgument(this.allFiles.containsAll(this.candidates), + "Candidates not in set of all files %s %s", this.allFiles, this.candidates); + var compactingFiles = + compacting.stream().flatMap(job -> job.getFiles().stream()).collect(Collectors.toSet()); + Preconditions.checkArgument(this.allFiles.containsAll(compactingFiles), + "Compacting not in set of all files %s %s", this.allFiles, compactingFiles); + Preconditions.checkArgument(Collections.disjoint(compactingFiles, this.candidates), + "Compacting and candidates overlap %s %s", compactingFiles, this.candidates); + this.executionHints = executionHints; } @@ -98,4 +112,6 @@ public interface Compactable { void externalCompactionFailed(ExternalCompactionId ecid); boolean isActive(ExternalCompactionId ecid); + + void getExternalCompactionIds(Consumer<ExternalCompactionId> tmp); } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionExecutor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionExecutor.java index 974d1f3..20e8cfa 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionExecutor.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionExecutor.java @@ -28,9 +28,13 @@ public interface CompactionExecutor { SubmittedJob submit(CompactionServiceId csid, CompactionJob job, Compactable compactable, Consumer<Compactable> completionCallback); - int getCompactionsRunning(); + enum CType { + INTERNAL, EXTERNAL + } - int getCompactionsQueued(); + int getCompactionsRunning(CType ctype); + + int getCompactionsQueued(CType ctype); void stop(); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java index e68294a..5584b61 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java @@ -19,6 +19,7 @@ package org.apache.accumulo.tserver.compactions; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; @@ -42,6 +43,7 @@ import org.apache.accumulo.core.tabletserver.thrift.TCompactionQueueSummary; import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.fate.util.Retry; import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.tserver.compactions.CompactionExecutor.CType; import org.apache.accumulo.tserver.metrics.CompactionExecutorsMetrics; import org.apache.accumulo.tserver.tablet.Tablet; import org.slf4j.Logger; @@ -74,7 +76,6 @@ public class CompactionManager { private Map<CompactionExecutorId,ExternalCompactionExecutor> externalExecutors; - // TODO this may need to be garbage collected... also will need to be populated when tablet load private Map<ExternalCompactionId,KeyExtent> runningExternalCompactions; private class Config { @@ -235,11 +236,15 @@ public class CompactionManager { long passed = TimeUnit.MILLISECONDS.convert(System.nanoTime() - lastCheckAllTime, TimeUnit.NANOSECONDS); if (passed >= maxTimeBetweenChecks) { + HashSet<ExternalCompactionId> observedEcids = new HashSet<>(); for (Compactable compactable : compactables) { last = compactable; compact(compactable); + compactable.getExternalCompactionIds(observedEcids::add); } lastCheckAllTime = System.nanoTime(); + // clean up any external compactions that are not currently running + runningExternalCompactions.keySet().retainAll(observedEcids); } else { var compactable = compactablesToCheck.poll(maxTimeBetweenChecks - passed, TimeUnit.MILLISECONDS); @@ -370,6 +375,12 @@ public class CompactionManager { } this.services = Map.copyOf(tmpServices); + + HashSet<CompactionExecutorId> activeExternalExecs = new HashSet<>(); + services.values().forEach(cs -> cs.getExternalExecutorsInUse(activeExternalExecs::add)); + // clean up an external compactors that are no longer in use by any compaction service + externalExecutors.keySet().retainAll(activeExternalExecs); + } } catch (RuntimeException e) { log.error("Failed to reconfigure compaction services ", e); @@ -398,14 +409,16 @@ public class CompactionManager { } public int getCompactionsRunning() { - return services.values().stream().mapToInt(CompactionService::getCompactionsRunning).sum(); + return services.values().stream().mapToInt(cs -> cs.getCompactionsRunning(CType.INTERNAL)).sum() + + runningExternalCompactions.size(); } public int getCompactionsQueued() { - return services.values().stream().mapToInt(CompactionService::getCompactionsQueued).sum(); + return services.values().stream().mapToInt(cs -> cs.getCompactionsQueued(CType.INTERNAL)).sum() + + externalExecutors.values().stream() + .mapToInt(ee -> ee.getCompactionsQueued(CType.EXTERNAL)).sum(); } - // CBUG would be nice to create a CompactorId type and use that instead of string. public ExternalCompactionJob reserveExternalCompaction(String queueName, long priority, String compactorId, ExternalCompactionId externalCompactionId) { log.debug("Attempting to reserve external compaction, queue:{} priority:{} compactor:{}", @@ -439,6 +452,7 @@ public class CompactionManager { Tablet tablet = currentTablets.get(extent); if (tablet != null) { tablet.asCompactable().commitExternalCompaction(extCompactionId, fileSize, entries); + compactablesToCheck.add(tablet.asCompactable()); } runningExternalCompactions.remove(extCompactionId); } @@ -456,13 +470,13 @@ public class CompactionManager { Tablet tablet = currentTablets.get(extent); if (tablet != null) { tablet.asCompactable().externalCompactionFailed(ecid); + compactablesToCheck.add(tablet.asCompactable()); } runningExternalCompactions.remove(ecid); } } public List<TCompactionQueueSummary> getCompactionQueueSummaries() { - // TODO Auto-generated method stub return externalExecutors.values().stream().map(ece -> ece.summarize()) .collect(Collectors.toList()); } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java index 6ec0831..134c7d4 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java @@ -58,6 +58,7 @@ import org.apache.accumulo.core.util.ratelimit.SharedRateLimiterFactory; import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.ServiceEnvironmentImpl; +import org.apache.accumulo.tserver.compactions.CompactionExecutor.CType; import org.apache.accumulo.tserver.compactions.SubmittedJob.Status; import org.apache.accumulo.tserver.metrics.CompactionExecutorsMetrics; import org.slf4j.Logger; @@ -70,8 +71,6 @@ public class CompactionService { private CompactionPlanner planner; private Map<CompactionExecutorId,CompactionExecutor> executors; private final CompactionServiceId myId; - // CBUG does this need to be populated w/ external compactions when a tablet is loaded OR should - // this even contain external compactions? private Map<KeyExtent,Collection<SubmittedJob>> submittedJobs = new ConcurrentHashMap<>(); private ServerContext serverCtx; private String plannerClassName; @@ -210,6 +209,10 @@ public class CompactionService { } } } else if (status == Status.RUNNING) { + // Note the submitted jobs set may not contain external compactions that started on another + // tserver. However, this is ok as the purpose of this check is to look for compaction jobs + // that transitioned from QUEUED to RUNNING during planning. Any external compactions + // started on another tserver will not make this transition during planning. for (CompactionJob job : jobs) { if (!Collections.disjoint(submittedJob.getJob().getFiles(), job.getFiles())) { return false; @@ -406,7 +409,6 @@ public class CompactionService { }); Sets.difference(executors.keySet(), tmpExecutors.keySet()).forEach(ceid -> { - // TODO may not make sense for external executors.get(ceid).stop(); }); @@ -424,11 +426,19 @@ public class CompactionService { executors.values().forEach(CompactionExecutor::stop); } - int getCompactionsRunning() { - return executors.values().stream().mapToInt(CompactionExecutor::getCompactionsRunning).sum(); + int getCompactionsRunning(CType ctype) { + return executors.values().stream().mapToInt(ce -> ce.getCompactionsRunning(ctype)).sum(); } - int getCompactionsQueued() { - return executors.values().stream().mapToInt(CompactionExecutor::getCompactionsQueued).sum(); + int getCompactionsQueued(CType ctype) { + return executors.values().stream().mapToInt(ce -> ce.getCompactionsQueued(ctype)).sum(); + } + + public void getExternalExecutorsInUse(Consumer<CompactionExecutorId> idConsumer) { + executors.forEach((ceid, ce) -> { + if (ce instanceof ExternalCompactionExecutor) { + idConsumer.accept(ceid); + } + }); } } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionExecutor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionExecutor.java index 944dc9c..7785b19 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionExecutor.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionExecutor.java @@ -18,6 +18,9 @@ */ package org.apache.accumulo.tserver.compactions; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; @@ -31,21 +34,19 @@ import org.apache.accumulo.tserver.compactions.SubmittedJob.Status; public class ExternalCompactionExecutor implements CompactionExecutor { + private Set<ExternalJob> queuedTask = Collections.synchronizedSet(new HashSet<>()); + private class ExternalJob extends SubmittedJob implements Comparable<ExternalJob> { private AtomicReference<Status> status = new AtomicReference<>(Status.QUEUED); private Compactable compactable; private CompactionServiceId csid; - private Consumer<Compactable> completionCallback; - private final long queuedTime; private volatile ExternalCompactionId ecid; - public ExternalJob(CompactionJob job, Compactable compactable, CompactionServiceId csid, - Consumer<Compactable> completionCallback) { + public ExternalJob(CompactionJob job, Compactable compactable, CompactionServiceId csid) { super(job); this.compactable = compactable; this.csid = csid; - this.completionCallback = completionCallback; - queuedTime = System.currentTimeMillis(); + queuedTask.add(this); } @Override @@ -65,6 +66,9 @@ public class ExternalCompactionExecutor implements CompactionExecutor { if (expectedStatus == Status.QUEUED) { canceled = status.compareAndSet(expectedStatus, Status.CANCELED); + if (canceled) { + queuedTask.remove(this); + } } return canceled; @@ -92,28 +96,27 @@ public class ExternalCompactionExecutor implements CompactionExecutor { @Override public SubmittedJob submit(CompactionServiceId csid, CompactionJob job, Compactable compactable, Consumer<Compactable> completionCallback) { - ExternalJob extJob = new ExternalJob(job, compactable, csid, completionCallback); + ExternalJob extJob = new ExternalJob(job, compactable, csid); queue.add(extJob); return extJob; } @Override - public int getCompactionsRunning() { - // TODO Auto-generated method stub + public int getCompactionsRunning(CType ctype) { + if (ctype == CType.EXTERNAL) + throw new UnsupportedOperationException(); return 0; } @Override - public int getCompactionsQueued() { - // TODO Auto-generated method stub - return 0; + public int getCompactionsQueued(CType ctype) { + if (ctype != CType.EXTERNAL) + return 0; + return queuedTask.size(); } @Override - public void stop() { - // TODO Auto-generated method stub - - } + public void stop() {} ExternalCompactionJob reserveExternalCompaction(long priority, String compactorId, ExternalCompactionId externalCompactionId) { @@ -132,6 +135,7 @@ public class ExternalCompactionExecutor implements CompactionExecutor { var ecj = extJob.compactable.reserveExternalCompaction(extJob.csid, extJob.getJob(), compactorId, externalCompactionId); extJob.ecid = ecj.getExternalCompactionId(); + queuedTask.remove(extJob); return ecj; } else { // TODO could this cause a stack overflow? @@ -142,16 +146,12 @@ public class ExternalCompactionExecutor implements CompactionExecutor { queue.add(extJob); } - // TODO Auto-generated method stub return null; } // TODO maybe create non-thrift type to avoid thrift types all over the code public TCompactionQueueSummary summarize() { - // TODO maybe try to keep this precomputed to avoid looping over entire queue for each request - // TODO if count is not needed would not even need to loop over entire queue - // TODO cast to int is problematic - int count = (int) queue.stream().filter(extJob -> extJob.status.get() == Status.QUEUED).count(); + int count = queuedTask.size(); long priority = 0; ExternalJob topJob = queue.peek(); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java index ace93d8..44e74e4 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java @@ -198,12 +198,16 @@ public class InternalCompactionExecutor implements CompactionExecutor { } @Override - public int getCompactionsRunning() { + public int getCompactionsRunning(CType ctype) { + if (ctype != CType.INTERNAL) + return 0; return threadPool.getActiveCount(); } @Override - public int getCompactionsQueued() { + public int getCompactionsQueued(CType ctype) { + if (ctype != CType.INTERNAL) + return 0; return queuedTask.size(); } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java index 534c9b7..97175de 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java @@ -33,6 +33,7 @@ import java.util.SortedMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -858,6 +859,11 @@ public class CompactableImpl implements Compactable { } @Override + public void getExternalCompactionIds(Consumer<ExternalCompactionId> idConsumer) { + externalCompactions.forEach((ecid, eci) -> idConsumer.accept(ecid)); + } + + @Override public CompactionServiceId getConfiguredService(CompactionKind kind) { Map<String,String> debugHints = null;