This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch 1451-external-compactions-feature in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit b9ddb33c5920663990885bae5a17b90f0976315f Author: Keith Turner <ktur...@apache.org> AuthorDate: Thu May 6 13:50:32 2021 -0400 Cleans up compactors in ZK and use single sched executor in coordinator --- .../coordinator/CompactionCoordinator.java | 78 +++++++++++++++++++--- .../accumulo/coordinator/CompactionFinalizer.java | 43 +++++------- .../coordinator/DeadCompactionDetector.java | 42 ++++++------ .../coordinator/CompactionCoordinatorTest.java | 5 +- .../TestCompactionCoordinatorForOfflineTable.java | 10 +-- 5 files changed, 115 insertions(+), 63 deletions(-) diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java index a6aa936..b7dffb1 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.accumulo.coordinator.QueueSummaries.PrioTserver; @@ -60,6 +61,7 @@ import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.fate.util.UtilWaitThread; import org.apache.accumulo.fate.zookeeper.ServiceLock; +import org.apache.accumulo.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.server.AbstractServer; import org.apache.accumulo.server.GarbageCollectionLogger; import org.apache.accumulo.server.ServerOpts; @@ -111,28 +113,35 @@ public class CompactionCoordinator extends AbstractServer // Exposed for tests protected volatile Boolean shutdown = false; + private ScheduledThreadPoolExecutor schedExecutor; + protected CompactionCoordinator(ServerOpts opts, String[] args) { super("compaction-coordinator", opts, args); aconf = getConfiguration(); - compactionFinalizer = createCompactionFinalizer(); + schedExecutor = ThreadPools.createGeneralScheduledExecutorService(aconf); + compactionFinalizer = createCompactionFinalizer(schedExecutor); tserverSet = createLiveTServerSet(); setupSecurity(); - startGCLogger(); + startGCLogger(schedExecutor); printStartupMsg(); + startCompactionCleaner(schedExecutor); } protected CompactionCoordinator(ServerOpts opts, String[] args, AccumuloConfiguration conf) { super("compaction-coordinator", opts, args); aconf = conf; - compactionFinalizer = createCompactionFinalizer(); + schedExecutor = ThreadPools.createGeneralScheduledExecutorService(aconf); + compactionFinalizer = createCompactionFinalizer(schedExecutor); tserverSet = createLiveTServerSet(); setupSecurity(); - startGCLogger(); + startGCLogger(schedExecutor); printStartupMsg(); + startCompactionCleaner(schedExecutor); } - protected CompactionFinalizer createCompactionFinalizer() { - return new CompactionFinalizer(getContext()); + protected CompactionFinalizer + createCompactionFinalizer(ScheduledThreadPoolExecutor schedExecutor) { + return new CompactionFinalizer(getContext(), schedExecutor); } protected LiveTServerSet createLiveTServerSet() { @@ -144,10 +153,13 @@ public class CompactionCoordinator extends AbstractServer security = AuditedSecurityOperation.getInstance(getContext()); } - protected void startGCLogger() { - ThreadPools.createGeneralScheduledExecutorService(aconf).scheduleWithFixedDelay( - () -> gcLogger.logGCInfo(getConfiguration()), 0, TIME_BETWEEN_CHECKS, - TimeUnit.MILLISECONDS); + protected void startGCLogger(ScheduledThreadPoolExecutor schedExecutor) { + schedExecutor.scheduleWithFixedDelay(() -> gcLogger.logGCInfo(getConfiguration()), 0, + TIME_BETWEEN_CHECKS, TimeUnit.MILLISECONDS); + } + + private void startCompactionCleaner(ScheduledThreadPoolExecutor schedExecutor) { + schedExecutor.scheduleWithFixedDelay(() -> cleanUpCompactors(), 0, 5, TimeUnit.MINUTES); } protected void printStartupMsg() { @@ -353,7 +365,7 @@ public class CompactionCoordinator extends AbstractServer } tserverSet.startListeningForTabletServerChanges(); - new DeadCompactionDetector(getContext(), compactionFinalizer).start(); + new DeadCompactionDetector(getContext(), compactionFinalizer, schedExecutor).start(); LOG.info("Starting loop to check tservers for compaction summaries"); while (!shutdown) { @@ -698,6 +710,50 @@ public class CompactionCoordinator extends AbstractServer } } + private void deleteEmpty(ZooReaderWriter zoorw, String path) + throws KeeperException, InterruptedException { + try { + LOG.debug("Deleting empty ZK node {}", path); + zoorw.delete(path); + } catch (KeeperException.NotEmptyException e) { + LOG.debug("Failed to delete {} its not empty, likely an expected race condition.", path); + } + } + + private void cleanUpCompactors() { + final String compactorQueuesPath = getContext().getZooKeeperRoot() + Constants.ZCOMPACTORS; + + var zoorw = getContext().getZooReaderWriter(); + + try { + var queues = zoorw.getChildren(compactorQueuesPath); + + for (String queue : queues) { + String qpath = compactorQueuesPath + "/" + queue; + + var compactors = zoorw.getChildren(qpath); + + if (compactors.isEmpty()) { + deleteEmpty(zoorw, qpath); + } + + for (String compactor : compactors) { + String cpath = compactorQueuesPath + "/" + queue + "/" + compactor; + var lockNodes = zoorw.getChildren(compactorQueuesPath + "/" + queue + "/" + compactor); + if (lockNodes.isEmpty()) { + deleteEmpty(zoorw, cpath); + } + } + } + + } catch (KeeperException | RuntimeException e) { + LOG.warn("Failed to clean up compactors", e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + public static void main(String[] args) throws Exception { try (CompactionCoordinator compactor = new CompactionCoordinator(new ServerOpts(), args)) { compactor.runServer(); diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java index 7e673b0..6a8e256 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java @@ -27,6 +27,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -43,7 +44,6 @@ import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.util.threads.ThreadPools; -import org.apache.accumulo.fate.util.UtilWaitThread; import org.apache.accumulo.server.ServerContext; import org.apache.thrift.TException; import org.slf4j.Logger; @@ -59,7 +59,7 @@ public class CompactionFinalizer { private final BlockingQueue<ExternalCompactionFinalState> pendingNotifications; private final long tserverCheckInterval; - protected CompactionFinalizer(ServerContext context) { + protected CompactionFinalizer(ServerContext context, ScheduledThreadPoolExecutor schedExecutor) { this.context = context; this.pendingNotifications = new ArrayBlockingQueue<>(1000); @@ -70,18 +70,16 @@ public class CompactionFinalizer { this.ntfyExecutor = ThreadPools.createThreadPool(3, max, 1, TimeUnit.MINUTES, "Compaction Finalizer Notifier", false); - ThreadPools.createFixedThreadPool(3, "Compaction Finalizer Notifier", false); this.backgroundExecutor = - ThreadPools.createFixedThreadPool(2, "Compaction Finalizer Background Task", false); + ThreadPools.createFixedThreadPool(1, "Compaction Finalizer Background Task", false); backgroundExecutor.execute(() -> { processPending(); }); - backgroundExecutor.execute(() -> { - notifyTservers(); - }); + schedExecutor.scheduleWithFixedDelay(() -> notifyTservers(), 0, tserverCheckInterval, + TimeUnit.MILLISECONDS); } public void commitCompaction(ExternalCompactionId ecid, KeyExtent extent, long fileSize, @@ -198,25 +196,20 @@ public class CompactionFinalizer { } private void notifyTservers() { - while (!Thread.interrupted()) { - try { - Iterator<ExternalCompactionFinalState> finalStates = - context.getAmple().getExternalCompactionFinalStates().iterator(); - while (finalStates.hasNext()) { - ExternalCompactionFinalState state = finalStates.next(); - LOG.info( - "Found external compaction in final state: {}, queueing for tserver notification", - state); - pendingNotifications.put(state); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } catch (RuntimeException e) { - LOG.warn("Failed to notify tservers", e); + try { + Iterator<ExternalCompactionFinalState> finalStates = + context.getAmple().getExternalCompactionFinalStates().iterator(); + while (finalStates.hasNext()) { + ExternalCompactionFinalState state = finalStates.next(); + LOG.info("Found external compaction in final state: {}, queueing for tserver notification", + state); + pendingNotifications.put(state); } - - UtilWaitThread.sleep(tserverCheckInterval); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } catch (RuntimeException e) { + LOG.warn("Failed to notify tservers", e); } } } diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java index afd2e2d..0ab2773 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java @@ -24,6 +24,8 @@ import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.Set; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.dataImpl.KeyExtent; @@ -31,8 +33,6 @@ import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; -import org.apache.accumulo.core.util.threads.Threads; -import org.apache.accumulo.fate.util.UtilWaitThread; import org.apache.accumulo.server.ServerContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,10 +43,13 @@ public class DeadCompactionDetector { private final ServerContext context; private final CompactionFinalizer finalizer; + private ScheduledThreadPoolExecutor schedExecutor; - public DeadCompactionDetector(ServerContext context, CompactionFinalizer finalizer) { + public DeadCompactionDetector(ServerContext context, CompactionFinalizer finalizer, + ScheduledThreadPoolExecutor stpe) { this.context = context; this.finalizer = finalizer; + this.schedExecutor = stpe; } private void detectDeadCompactions() { @@ -131,24 +134,21 @@ public class DeadCompactionDetector { } public void start() { - Threads.createThread("DeadCompactionDetector", () -> { - while (!Thread.currentThread().isInterrupted()) { - long interval = this.context.getConfiguration() - .getTimeInMillis(Property.COORDINATOR_DEAD_COMPACTOR_CHECK_INTERVAL); - try { - detectDeadCompactions(); - } catch (RuntimeException e) { - log.warn("Failed to look for dead compactions", e); - } - - try { - detectDanglingFinalStateMarkers(); - } catch (RuntimeException e) { - log.warn("Failed to look for dangling compaction final state markers", e); - } - - UtilWaitThread.sleep(interval); + long interval = this.context.getConfiguration() + .getTimeInMillis(Property.COORDINATOR_DEAD_COMPACTOR_CHECK_INTERVAL); + + schedExecutor.scheduleWithFixedDelay(() -> { + try { + detectDeadCompactions(); + } catch (RuntimeException e) { + log.warn("Failed to look for dead compactions", e); } - }).start(); + + try { + detectDanglingFinalStateMarkers(); + } catch (RuntimeException e) { + log.warn("Failed to look for dangling compaction final state markers", e); + } + }, 0, interval, TimeUnit.MILLISECONDS); } } diff --git a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java index 0b0628e..172bed2 100644 --- a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java +++ b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java @@ -32,6 +32,7 @@ import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; import java.util.UUID; +import java.util.concurrent.ScheduledThreadPoolExecutor; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.dataImpl.KeyExtent; @@ -106,7 +107,7 @@ public class CompactionCoordinatorTest { } @Override - protected CompactionFinalizer createCompactionFinalizer() { + protected CompactionFinalizer createCompactionFinalizer(ScheduledThreadPoolExecutor stpe) { return null; } @@ -119,7 +120,7 @@ public class CompactionCoordinatorTest { protected void setupSecurity() {} @Override - protected void startGCLogger() {} + protected void startGCLogger(ScheduledThreadPoolExecutor stpe) {} @Override protected void printStartupMsg() {} diff --git a/test/src/main/java/org/apache/accumulo/test/TestCompactionCoordinatorForOfflineTable.java b/test/src/main/java/org/apache/accumulo/test/TestCompactionCoordinatorForOfflineTable.java index ed1306a..4ac3476 100644 --- a/test/src/main/java/org/apache/accumulo/test/TestCompactionCoordinatorForOfflineTable.java +++ b/test/src/main/java/org/apache/accumulo/test/TestCompactionCoordinatorForOfflineTable.java @@ -19,6 +19,7 @@ package org.apache.accumulo.test; import java.util.List; +import java.util.concurrent.ScheduledThreadPoolExecutor; import org.apache.accumulo.coordinator.CompactionFinalizer; import org.apache.accumulo.core.dataImpl.KeyExtent; @@ -38,10 +39,11 @@ public class TestCompactionCoordinatorForOfflineTable extends TestCompactionCoor private static final Logger LOG = LoggerFactory.getLogger(NonNotifyingCompactionFinalizer.class); - NonNotifyingCompactionFinalizer(ServerContext context) { - super(context); + NonNotifyingCompactionFinalizer(ServerContext context, ScheduledThreadPoolExecutor stpe) { + super(context, stpe); } + @Override public void commitCompaction(ExternalCompactionId ecid, KeyExtent extent, long fileSize, long fileEntries) { @@ -63,8 +65,8 @@ public class TestCompactionCoordinatorForOfflineTable extends TestCompactionCoor } @Override - protected CompactionFinalizer createCompactionFinalizer() { - return new NonNotifyingCompactionFinalizer(getContext()); + protected CompactionFinalizer createCompactionFinalizer(ScheduledThreadPoolExecutor stpe) { + return new NonNotifyingCompactionFinalizer(getContext(), stpe); } public static void main(String[] args) throws Exception {