This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 1451-external-compactions-feature
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit b9ddb33c5920663990885bae5a17b90f0976315f
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Thu May 6 13:50:32 2021 -0400

    Cleans up compactors in ZK and use single sched executor in coordinator
---
 .../coordinator/CompactionCoordinator.java         | 78 +++++++++++++++++++---
 .../accumulo/coordinator/CompactionFinalizer.java  | 43 +++++-------
 .../coordinator/DeadCompactionDetector.java        | 42 ++++++------
 .../coordinator/CompactionCoordinatorTest.java     |  5 +-
 .../TestCompactionCoordinatorForOfflineTable.java  | 10 +--
 5 files changed, 115 insertions(+), 63 deletions(-)

diff --git 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
index a6aa936..b7dffb1 100644
--- 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
+++ 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.coordinator.QueueSummaries.PrioTserver;
@@ -60,6 +61,7 @@ import 
org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
 import org.apache.accumulo.core.util.threads.ThreadPools;
 import org.apache.accumulo.fate.util.UtilWaitThread;
 import org.apache.accumulo.fate.zookeeper.ServiceLock;
+import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.server.AbstractServer;
 import org.apache.accumulo.server.GarbageCollectionLogger;
 import org.apache.accumulo.server.ServerOpts;
@@ -111,28 +113,35 @@ public class CompactionCoordinator extends AbstractServer
   // Exposed for tests
   protected volatile Boolean shutdown = false;
 
+  private ScheduledThreadPoolExecutor schedExecutor;
+
   protected CompactionCoordinator(ServerOpts opts, String[] args) {
     super("compaction-coordinator", opts, args);
     aconf = getConfiguration();
-    compactionFinalizer = createCompactionFinalizer();
+    schedExecutor = ThreadPools.createGeneralScheduledExecutorService(aconf);
+    compactionFinalizer = createCompactionFinalizer(schedExecutor);
     tserverSet = createLiveTServerSet();
     setupSecurity();
-    startGCLogger();
+    startGCLogger(schedExecutor);
     printStartupMsg();
+    startCompactionCleaner(schedExecutor);
   }
 
   protected CompactionCoordinator(ServerOpts opts, String[] args, 
AccumuloConfiguration conf) {
     super("compaction-coordinator", opts, args);
     aconf = conf;
-    compactionFinalizer = createCompactionFinalizer();
+    schedExecutor = ThreadPools.createGeneralScheduledExecutorService(aconf);
+    compactionFinalizer = createCompactionFinalizer(schedExecutor);
     tserverSet = createLiveTServerSet();
     setupSecurity();
-    startGCLogger();
+    startGCLogger(schedExecutor);
     printStartupMsg();
+    startCompactionCleaner(schedExecutor);
   }
 
-  protected CompactionFinalizer createCompactionFinalizer() {
-    return new CompactionFinalizer(getContext());
+  protected CompactionFinalizer
+      createCompactionFinalizer(ScheduledThreadPoolExecutor schedExecutor) {
+    return new CompactionFinalizer(getContext(), schedExecutor);
   }
 
   protected LiveTServerSet createLiveTServerSet() {
@@ -144,10 +153,13 @@ public class CompactionCoordinator extends AbstractServer
     security = AuditedSecurityOperation.getInstance(getContext());
   }
 
-  protected void startGCLogger() {
-    
ThreadPools.createGeneralScheduledExecutorService(aconf).scheduleWithFixedDelay(
-        () -> gcLogger.logGCInfo(getConfiguration()), 0, TIME_BETWEEN_CHECKS,
-        TimeUnit.MILLISECONDS);
+  protected void startGCLogger(ScheduledThreadPoolExecutor schedExecutor) {
+    schedExecutor.scheduleWithFixedDelay(() -> 
gcLogger.logGCInfo(getConfiguration()), 0,
+        TIME_BETWEEN_CHECKS, TimeUnit.MILLISECONDS);
+  }
+
+  private void startCompactionCleaner(ScheduledThreadPoolExecutor 
schedExecutor) {
+    schedExecutor.scheduleWithFixedDelay(() -> cleanUpCompactors(), 0, 5, 
TimeUnit.MINUTES);
   }
 
   protected void printStartupMsg() {
@@ -353,7 +365,7 @@ public class CompactionCoordinator extends AbstractServer
     }
 
     tserverSet.startListeningForTabletServerChanges();
-    new DeadCompactionDetector(getContext(), compactionFinalizer).start();
+    new DeadCompactionDetector(getContext(), compactionFinalizer, 
schedExecutor).start();
 
     LOG.info("Starting loop to check tservers for compaction summaries");
     while (!shutdown) {
@@ -698,6 +710,50 @@ public class CompactionCoordinator extends AbstractServer
     }
   }
 
+  private void deleteEmpty(ZooReaderWriter zoorw, String path)
+      throws KeeperException, InterruptedException {
+    try {
+      LOG.debug("Deleting empty ZK node {}", path);
+      zoorw.delete(path);
+    } catch (KeeperException.NotEmptyException e) {
+      LOG.debug("Failed to delete {} its not empty, likely an expected race 
condition.", path);
+    }
+  }
+
+  private void cleanUpCompactors() {
+    final String compactorQueuesPath = getContext().getZooKeeperRoot() + 
Constants.ZCOMPACTORS;
+
+    var zoorw = getContext().getZooReaderWriter();
+
+    try {
+      var queues = zoorw.getChildren(compactorQueuesPath);
+
+      for (String queue : queues) {
+        String qpath = compactorQueuesPath + "/" + queue;
+
+        var compactors = zoorw.getChildren(qpath);
+
+        if (compactors.isEmpty()) {
+          deleteEmpty(zoorw, qpath);
+        }
+
+        for (String compactor : compactors) {
+          String cpath = compactorQueuesPath + "/" + queue + "/" + compactor;
+          var lockNodes = zoorw.getChildren(compactorQueuesPath + "/" + queue 
+ "/" + compactor);
+          if (lockNodes.isEmpty()) {
+            deleteEmpty(zoorw, cpath);
+          }
+        }
+      }
+
+    } catch (KeeperException | RuntimeException e) {
+      LOG.warn("Failed to clean up compactors", e);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException(e);
+    }
+  }
+
   public static void main(String[] args) throws Exception {
     try (CompactionCoordinator compactor = new CompactionCoordinator(new 
ServerOpts(), args)) {
       compactor.runServer();
diff --git 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java
 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java
index 7e673b0..6a8e256 100644
--- 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java
+++ 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java
@@ -27,6 +27,7 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
@@ -43,7 +44,6 @@ import org.apache.accumulo.core.rpc.ThriftUtil;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
 import org.apache.accumulo.core.trace.TraceUtil;
 import org.apache.accumulo.core.util.threads.ThreadPools;
-import org.apache.accumulo.fate.util.UtilWaitThread;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
@@ -59,7 +59,7 @@ public class CompactionFinalizer {
   private final BlockingQueue<ExternalCompactionFinalState> 
pendingNotifications;
   private final long tserverCheckInterval;
 
-  protected CompactionFinalizer(ServerContext context) {
+  protected CompactionFinalizer(ServerContext context, 
ScheduledThreadPoolExecutor schedExecutor) {
     this.context = context;
     this.pendingNotifications = new ArrayBlockingQueue<>(1000);
 
@@ -70,18 +70,16 @@ public class CompactionFinalizer {
 
     this.ntfyExecutor = ThreadPools.createThreadPool(3, max, 1, 
TimeUnit.MINUTES,
         "Compaction Finalizer Notifier", false);
-    ThreadPools.createFixedThreadPool(3, "Compaction Finalizer Notifier", 
false);
 
     this.backgroundExecutor =
-        ThreadPools.createFixedThreadPool(2, "Compaction Finalizer Background 
Task", false);
+        ThreadPools.createFixedThreadPool(1, "Compaction Finalizer Background 
Task", false);
 
     backgroundExecutor.execute(() -> {
       processPending();
     });
 
-    backgroundExecutor.execute(() -> {
-      notifyTservers();
-    });
+    schedExecutor.scheduleWithFixedDelay(() -> notifyTservers(), 0, 
tserverCheckInterval,
+        TimeUnit.MILLISECONDS);
   }
 
   public void commitCompaction(ExternalCompactionId ecid, KeyExtent extent, 
long fileSize,
@@ -198,25 +196,20 @@ public class CompactionFinalizer {
   }
 
   private void notifyTservers() {
-    while (!Thread.interrupted()) {
-      try {
-        Iterator<ExternalCompactionFinalState> finalStates =
-            context.getAmple().getExternalCompactionFinalStates().iterator();
-        while (finalStates.hasNext()) {
-          ExternalCompactionFinalState state = finalStates.next();
-          LOG.info(
-              "Found external compaction in final state: {}, queueing for 
tserver notification",
-              state);
-          pendingNotifications.put(state);
-        }
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new RuntimeException(e);
-      } catch (RuntimeException e) {
-        LOG.warn("Failed to notify tservers", e);
+    try {
+      Iterator<ExternalCompactionFinalState> finalStates =
+          context.getAmple().getExternalCompactionFinalStates().iterator();
+      while (finalStates.hasNext()) {
+        ExternalCompactionFinalState state = finalStates.next();
+        LOG.info("Found external compaction in final state: {}, queueing for 
tserver notification",
+            state);
+        pendingNotifications.put(state);
       }
-
-      UtilWaitThread.sleep(tserverCheckInterval);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException(e);
+    } catch (RuntimeException e) {
+      LOG.warn("Failed to notify tservers", e);
     }
   }
 }
diff --git 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java
 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java
index afd2e2d..0ab2773 100644
--- 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java
+++ 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java
@@ -24,6 +24,8 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
@@ -31,8 +33,6 @@ import 
org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
 import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
 import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
-import org.apache.accumulo.core.util.threads.Threads;
-import org.apache.accumulo.fate.util.UtilWaitThread;
 import org.apache.accumulo.server.ServerContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,10 +43,13 @@ public class DeadCompactionDetector {
 
   private final ServerContext context;
   private final CompactionFinalizer finalizer;
+  private ScheduledThreadPoolExecutor schedExecutor;
 
-  public DeadCompactionDetector(ServerContext context, CompactionFinalizer 
finalizer) {
+  public DeadCompactionDetector(ServerContext context, CompactionFinalizer 
finalizer,
+      ScheduledThreadPoolExecutor stpe) {
     this.context = context;
     this.finalizer = finalizer;
+    this.schedExecutor = stpe;
   }
 
   private void detectDeadCompactions() {
@@ -131,24 +134,21 @@ public class DeadCompactionDetector {
   }
 
   public void start() {
-    Threads.createThread("DeadCompactionDetector", () -> {
-      while (!Thread.currentThread().isInterrupted()) {
-        long interval = this.context.getConfiguration()
-            
.getTimeInMillis(Property.COORDINATOR_DEAD_COMPACTOR_CHECK_INTERVAL);
-        try {
-          detectDeadCompactions();
-        } catch (RuntimeException e) {
-          log.warn("Failed to look for dead compactions", e);
-        }
-
-        try {
-          detectDanglingFinalStateMarkers();
-        } catch (RuntimeException e) {
-          log.warn("Failed to look for dangling compaction final state 
markers", e);
-        }
-
-        UtilWaitThread.sleep(interval);
+    long interval = this.context.getConfiguration()
+        .getTimeInMillis(Property.COORDINATOR_DEAD_COMPACTOR_CHECK_INTERVAL);
+
+    schedExecutor.scheduleWithFixedDelay(() -> {
+      try {
+        detectDeadCompactions();
+      } catch (RuntimeException e) {
+        log.warn("Failed to look for dead compactions", e);
       }
-    }).start();
+
+      try {
+        detectDanglingFinalStateMarkers();
+      } catch (RuntimeException e) {
+        log.warn("Failed to look for dangling compaction final state markers", 
e);
+      }
+    }, 0, interval, TimeUnit.MILLISECONDS);
   }
 }
diff --git 
a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
 
b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
index 0b0628e..172bed2 100644
--- 
a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
+++ 
b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
@@ -32,6 +32,7 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.UUID;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
@@ -106,7 +107,7 @@ public class CompactionCoordinatorTest {
     }
 
     @Override
-    protected CompactionFinalizer createCompactionFinalizer() {
+    protected CompactionFinalizer 
createCompactionFinalizer(ScheduledThreadPoolExecutor stpe) {
       return null;
     }
 
@@ -119,7 +120,7 @@ public class CompactionCoordinatorTest {
     protected void setupSecurity() {}
 
     @Override
-    protected void startGCLogger() {}
+    protected void startGCLogger(ScheduledThreadPoolExecutor stpe) {}
 
     @Override
     protected void printStartupMsg() {}
diff --git 
a/test/src/main/java/org/apache/accumulo/test/TestCompactionCoordinatorForOfflineTable.java
 
b/test/src/main/java/org/apache/accumulo/test/TestCompactionCoordinatorForOfflineTable.java
index ed1306a..4ac3476 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/TestCompactionCoordinatorForOfflineTable.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/TestCompactionCoordinatorForOfflineTable.java
@@ -19,6 +19,7 @@
 package org.apache.accumulo.test;
 
 import java.util.List;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 
 import org.apache.accumulo.coordinator.CompactionFinalizer;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
@@ -38,10 +39,11 @@ public class TestCompactionCoordinatorForOfflineTable 
extends TestCompactionCoor
     private static final Logger LOG =
         LoggerFactory.getLogger(NonNotifyingCompactionFinalizer.class);
 
-    NonNotifyingCompactionFinalizer(ServerContext context) {
-      super(context);
+    NonNotifyingCompactionFinalizer(ServerContext context, 
ScheduledThreadPoolExecutor stpe) {
+      super(context, stpe);
     }
 
+    @Override
     public void commitCompaction(ExternalCompactionId ecid, KeyExtent extent, 
long fileSize,
         long fileEntries) {
 
@@ -63,8 +65,8 @@ public class TestCompactionCoordinatorForOfflineTable extends 
TestCompactionCoor
   }
 
   @Override
-  protected CompactionFinalizer createCompactionFinalizer() {
-    return new NonNotifyingCompactionFinalizer(getContext());
+  protected CompactionFinalizer 
createCompactionFinalizer(ScheduledThreadPoolExecutor stpe) {
+    return new NonNotifyingCompactionFinalizer(getContext(), stpe);
   }
 
   public static void main(String[] args) throws Exception {

Reply via email to