This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 1451-external-compactions-feature
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to 
refs/heads/1451-external-compactions-feature by this push:
     new f3ece88  Get metrics for external compactions working
f3ece88 is described below

commit f3ece8801ebf027bb1e7b3b665bd14ae90012c6f
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Sun Apr 4 13:12:31 2021 -0400

    Get metrics for external compactions working
    
    This change provides external compaction info to existing tserver compaction
    metrics code. In theory this should cause tservers to emit metrics for 
running
    and queued external compactions, however it has not been tested yet.
    
    This change cleans up unused ExternalCompactionExecutors and 
ExternalCompaction
    informaiton in the tserver.  This cleanup needed to be done to protect 
memory
    usage and support accurate metrics.
    
    Added strong sanity checks to the sets of files passed between a tablet and
    compaction planner. These checks exposed a transient problem that needs 
further
    investigation, but the problem does not prevent compactions.  A tablet is
    trying to pass bad information to a planner for a short time and the sanity
    prevented the information from being passed.
---
 .../accumulo/coordinator/CompactionFinalizer.java  | 16 ++++++---
 .../accumulo/tserver/compactions/Compactable.java  | 16 +++++++++
 .../tserver/compactions/CompactionExecutor.java    |  8 +++--
 .../tserver/compactions/CompactionManager.java     | 24 ++++++++++---
 .../tserver/compactions/CompactionService.java     | 24 +++++++++----
 .../compactions/ExternalCompactionExecutor.java    | 42 +++++++++++-----------
 .../compactions/InternalCompactionExecutor.java    |  8 +++--
 .../accumulo/tserver/tablet/CompactableImpl.java   |  6 ++++
 8 files changed, 102 insertions(+), 42 deletions(-)

diff --git 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java
 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java
index f7e19bc..a96f891 100644
--- 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java
+++ 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java
@@ -53,7 +53,8 @@ public class CompactionFinalizer {
   private static final Logger LOG = 
LoggerFactory.getLogger(CompactionFinalizer.class);
 
   private final ServerContext context;
-  private final ExecutorService executor;
+  private final ExecutorService ntfyExecutor;
+  private final ExecutorService backgroundExecutor;
   private final BlockingQueue<ExternalCompactionFinalState> 
pendingNotifications;
 
   CompactionFinalizer(ServerContext context) {
@@ -62,13 +63,17 @@ public class CompactionFinalizer {
     // CBUG configure thread factory
     // CBUG make pool size configurable?
 
-    this.executor = ThreadPools.createFixedThreadPool(3, 
"CompactionFinalizer", false);
+    this.ntfyExecutor =
+        ThreadPools.createFixedThreadPool(3, "Compaction Finalizer Notifyer", 
false);
 
-    executor.execute(() -> {
+    this.backgroundExecutor =
+        ThreadPools.createFixedThreadPool(2, "Compaction Finalizer Background 
Task", false);
+
+    backgroundExecutor.execute(() -> {
       processPending();
     });
 
-    executor.execute(() -> {
+    backgroundExecutor.execute(() -> {
       notifyTservers();
     });
   }
@@ -145,7 +150,8 @@ public class CompactionFinalizer {
           if (tabletMetadata != null && 
tabletMetadata.getExtent().equals(ecfs.getExtent())
               && tabletMetadata.getLocation() != null
               && tabletMetadata.getLocation().getType() == 
LocationType.CURRENT) {
-            futures.add(executor.submit(() -> 
notifyTserver(tabletMetadata.getLocation(), ecfs)));
+            futures
+                .add(ntfyExecutor.submit(() -> 
notifyTserver(tabletMetadata.getLocation(), ecfs)));
           } else {
             // this is an unknown tablet so need to delete its final state 
marker from metadata
             // table
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/Compactable.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/Compactable.java
index b1a920e..be70321 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/Compactable.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/Compactable.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.SortedMap;
+import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
 import org.apache.accumulo.core.client.admin.compaction.CompactableFile;
@@ -38,6 +39,8 @@ import org.apache.accumulo.core.spi.compaction.CompactionKind;
 import org.apache.accumulo.core.spi.compaction.CompactionServiceId;
 import org.apache.accumulo.core.util.ratelimit.RateLimiter;
 
+import com.google.common.base.Preconditions;
+
 /**
  * Interface between compaction service and tablet.
  */
@@ -66,6 +69,17 @@ public interface Compactable {
           .map(stf -> new CompactableFileImpl(stf, 
allFiles.get(stf))).collect(Collectors.toSet()));
 
       this.compacting = Set.copyOf(running);
+
+      // Do some sanity checks on sets of files
+      Preconditions.checkArgument(this.allFiles.containsAll(this.candidates),
+          "Candidates not in set of all files %s %s", this.allFiles, 
this.candidates);
+      var compactingFiles =
+          compacting.stream().flatMap(job -> 
job.getFiles().stream()).collect(Collectors.toSet());
+      Preconditions.checkArgument(this.allFiles.containsAll(compactingFiles),
+          "Compacting not in set of all files %s %s", this.allFiles, 
compactingFiles);
+      Preconditions.checkArgument(Collections.disjoint(compactingFiles, 
this.candidates),
+          "Compacting and candidates overlap %s %s", compactingFiles, 
this.candidates);
+
       this.executionHints = executionHints;
     }
 
@@ -98,4 +112,6 @@ public interface Compactable {
   void externalCompactionFailed(ExternalCompactionId ecid);
 
   boolean isActive(ExternalCompactionId ecid);
+
+  void getExternalCompactionIds(Consumer<ExternalCompactionId> tmp);
 }
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionExecutor.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionExecutor.java
index 974d1f3..20e8cfa 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionExecutor.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionExecutor.java
@@ -28,9 +28,13 @@ public interface CompactionExecutor {
   SubmittedJob submit(CompactionServiceId csid, CompactionJob job, Compactable 
compactable,
       Consumer<Compactable> completionCallback);
 
-  int getCompactionsRunning();
+  enum CType {
+    INTERNAL, EXTERNAL
+  }
 
-  int getCompactionsQueued();
+  int getCompactionsRunning(CType ctype);
+
+  int getCompactionsQueued(CType ctype);
 
   void stop();
 
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
index e68294a..5584b61 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
@@ -19,6 +19,7 @@
 package org.apache.accumulo.tserver.compactions;
 
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -42,6 +43,7 @@ import 
org.apache.accumulo.core.tabletserver.thrift.TCompactionQueueSummary;
 import org.apache.accumulo.core.util.threads.Threads;
 import org.apache.accumulo.fate.util.Retry;
 import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.tserver.compactions.CompactionExecutor.CType;
 import org.apache.accumulo.tserver.metrics.CompactionExecutorsMetrics;
 import org.apache.accumulo.tserver.tablet.Tablet;
 import org.slf4j.Logger;
@@ -74,7 +76,6 @@ public class CompactionManager {
 
   private Map<CompactionExecutorId,ExternalCompactionExecutor> 
externalExecutors;
 
-  // TODO this may need to be garbage collected... also will need to be 
populated when tablet load
   private Map<ExternalCompactionId,KeyExtent> runningExternalCompactions;
 
   private class Config {
@@ -235,11 +236,15 @@ public class CompactionManager {
         long passed = TimeUnit.MILLISECONDS.convert(System.nanoTime() - 
lastCheckAllTime,
             TimeUnit.NANOSECONDS);
         if (passed >= maxTimeBetweenChecks) {
+          HashSet<ExternalCompactionId> observedEcids = new HashSet<>();
           for (Compactable compactable : compactables) {
             last = compactable;
             compact(compactable);
+            compactable.getExternalCompactionIds(observedEcids::add);
           }
           lastCheckAllTime = System.nanoTime();
+          // clean up any external compactions that are not currently running
+          runningExternalCompactions.keySet().retainAll(observedEcids);
         } else {
           var compactable =
               compactablesToCheck.poll(maxTimeBetweenChecks - passed, 
TimeUnit.MILLISECONDS);
@@ -370,6 +375,12 @@ public class CompactionManager {
         }
 
         this.services = Map.copyOf(tmpServices);
+
+        HashSet<CompactionExecutorId> activeExternalExecs = new HashSet<>();
+        services.values().forEach(cs -> 
cs.getExternalExecutorsInUse(activeExternalExecs::add));
+        // clean up an external compactors that are no longer in use by any 
compaction service
+        externalExecutors.keySet().retainAll(activeExternalExecs);
+
       }
     } catch (RuntimeException e) {
       log.error("Failed to reconfigure compaction services ", e);
@@ -398,14 +409,16 @@ public class CompactionManager {
   }
 
   public int getCompactionsRunning() {
-    return 
services.values().stream().mapToInt(CompactionService::getCompactionsRunning).sum();
+    return services.values().stream().mapToInt(cs -> 
cs.getCompactionsRunning(CType.INTERNAL)).sum()
+        + runningExternalCompactions.size();
   }
 
   public int getCompactionsQueued() {
-    return 
services.values().stream().mapToInt(CompactionService::getCompactionsQueued).sum();
+    return services.values().stream().mapToInt(cs -> 
cs.getCompactionsQueued(CType.INTERNAL)).sum()
+        + externalExecutors.values().stream()
+            .mapToInt(ee -> ee.getCompactionsQueued(CType.EXTERNAL)).sum();
   }
 
-  // CBUG would be nice to create a CompactorId type and use that instead of 
string.
   public ExternalCompactionJob reserveExternalCompaction(String queueName, 
long priority,
       String compactorId, ExternalCompactionId externalCompactionId) {
     log.debug("Attempting to reserve external compaction, queue:{} priority:{} 
compactor:{}",
@@ -439,6 +452,7 @@ public class CompactionManager {
       Tablet tablet = currentTablets.get(extent);
       if (tablet != null) {
         tablet.asCompactable().commitExternalCompaction(extCompactionId, 
fileSize, entries);
+        compactablesToCheck.add(tablet.asCompactable());
       }
       runningExternalCompactions.remove(extCompactionId);
     }
@@ -456,13 +470,13 @@ public class CompactionManager {
       Tablet tablet = currentTablets.get(extent);
       if (tablet != null) {
         tablet.asCompactable().externalCompactionFailed(ecid);
+        compactablesToCheck.add(tablet.asCompactable());
       }
       runningExternalCompactions.remove(ecid);
     }
   }
 
   public List<TCompactionQueueSummary> getCompactionQueueSummaries() {
-    // TODO Auto-generated method stub
     return externalExecutors.values().stream().map(ece -> ece.summarize())
         .collect(Collectors.toList());
   }
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java
index 6ec0831..134c7d4 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java
@@ -58,6 +58,7 @@ import 
org.apache.accumulo.core.util.ratelimit.SharedRateLimiterFactory;
 import org.apache.accumulo.core.util.threads.ThreadPools;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.ServiceEnvironmentImpl;
+import org.apache.accumulo.tserver.compactions.CompactionExecutor.CType;
 import org.apache.accumulo.tserver.compactions.SubmittedJob.Status;
 import org.apache.accumulo.tserver.metrics.CompactionExecutorsMetrics;
 import org.slf4j.Logger;
@@ -70,8 +71,6 @@ public class CompactionService {
   private CompactionPlanner planner;
   private Map<CompactionExecutorId,CompactionExecutor> executors;
   private final CompactionServiceId myId;
-  // CBUG does this need to be populated w/ external compactions when a tablet 
is loaded OR should
-  // this even contain external compactions?
   private Map<KeyExtent,Collection<SubmittedJob>> submittedJobs = new 
ConcurrentHashMap<>();
   private ServerContext serverCtx;
   private String plannerClassName;
@@ -210,6 +209,10 @@ public class CompactionService {
           }
         }
       } else if (status == Status.RUNNING) {
+        // Note the submitted jobs set may not contain external compactions 
that started on another
+        // tserver. However, this is ok as the purpose of this check is to 
look for compaction jobs
+        // that transitioned from QUEUED to RUNNING during planning. Any 
external compactions
+        // started on another tserver will not make this transition during 
planning.
         for (CompactionJob job : jobs) {
           if (!Collections.disjoint(submittedJob.getJob().getFiles(), 
job.getFiles())) {
             return false;
@@ -406,7 +409,6 @@ public class CompactionService {
     });
 
     Sets.difference(executors.keySet(), tmpExecutors.keySet()).forEach(ceid -> 
{
-      // TODO may not make sense for external
       executors.get(ceid).stop();
     });
 
@@ -424,11 +426,19 @@ public class CompactionService {
     executors.values().forEach(CompactionExecutor::stop);
   }
 
-  int getCompactionsRunning() {
-    return 
executors.values().stream().mapToInt(CompactionExecutor::getCompactionsRunning).sum();
+  int getCompactionsRunning(CType ctype) {
+    return executors.values().stream().mapToInt(ce -> 
ce.getCompactionsRunning(ctype)).sum();
   }
 
-  int getCompactionsQueued() {
-    return 
executors.values().stream().mapToInt(CompactionExecutor::getCompactionsQueued).sum();
+  int getCompactionsQueued(CType ctype) {
+    return executors.values().stream().mapToInt(ce -> 
ce.getCompactionsQueued(ctype)).sum();
+  }
+
+  public void getExternalExecutorsInUse(Consumer<CompactionExecutorId> 
idConsumer) {
+    executors.forEach((ceid, ce) -> {
+      if (ce instanceof ExternalCompactionExecutor) {
+        idConsumer.accept(ceid);
+      }
+    });
   }
 }
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionExecutor.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionExecutor.java
index 944dc9c..7785b19 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionExecutor.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionExecutor.java
@@ -18,6 +18,9 @@
  */
 package org.apache.accumulo.tserver.compactions;
 
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
@@ -31,21 +34,19 @@ import 
org.apache.accumulo.tserver.compactions.SubmittedJob.Status;
 
 public class ExternalCompactionExecutor implements CompactionExecutor {
 
+  private Set<ExternalJob> queuedTask = Collections.synchronizedSet(new 
HashSet<>());
+
   private class ExternalJob extends SubmittedJob implements 
Comparable<ExternalJob> {
     private AtomicReference<Status> status = new 
AtomicReference<>(Status.QUEUED);
     private Compactable compactable;
     private CompactionServiceId csid;
-    private Consumer<Compactable> completionCallback;
-    private final long queuedTime;
     private volatile ExternalCompactionId ecid;
 
-    public ExternalJob(CompactionJob job, Compactable compactable, 
CompactionServiceId csid,
-        Consumer<Compactable> completionCallback) {
+    public ExternalJob(CompactionJob job, Compactable compactable, 
CompactionServiceId csid) {
       super(job);
       this.compactable = compactable;
       this.csid = csid;
-      this.completionCallback = completionCallback;
-      queuedTime = System.currentTimeMillis();
+      queuedTask.add(this);
     }
 
     @Override
@@ -65,6 +66,9 @@ public class ExternalCompactionExecutor implements 
CompactionExecutor {
 
       if (expectedStatus == Status.QUEUED) {
         canceled = status.compareAndSet(expectedStatus, Status.CANCELED);
+        if (canceled) {
+          queuedTask.remove(this);
+        }
       }
 
       return canceled;
@@ -92,28 +96,27 @@ public class ExternalCompactionExecutor implements 
CompactionExecutor {
   @Override
   public SubmittedJob submit(CompactionServiceId csid, CompactionJob job, 
Compactable compactable,
       Consumer<Compactable> completionCallback) {
-    ExternalJob extJob = new ExternalJob(job, compactable, csid, 
completionCallback);
+    ExternalJob extJob = new ExternalJob(job, compactable, csid);
     queue.add(extJob);
     return extJob;
   }
 
   @Override
-  public int getCompactionsRunning() {
-    // TODO Auto-generated method stub
+  public int getCompactionsRunning(CType ctype) {
+    if (ctype == CType.EXTERNAL)
+      throw new UnsupportedOperationException();
     return 0;
   }
 
   @Override
-  public int getCompactionsQueued() {
-    // TODO Auto-generated method stub
-    return 0;
+  public int getCompactionsQueued(CType ctype) {
+    if (ctype != CType.EXTERNAL)
+      return 0;
+    return queuedTask.size();
   }
 
   @Override
-  public void stop() {
-    // TODO Auto-generated method stub
-
-  }
+  public void stop() {}
 
   ExternalCompactionJob reserveExternalCompaction(long priority, String 
compactorId,
       ExternalCompactionId externalCompactionId) {
@@ -132,6 +135,7 @@ public class ExternalCompactionExecutor implements 
CompactionExecutor {
         var ecj = extJob.compactable.reserveExternalCompaction(extJob.csid, 
extJob.getJob(),
             compactorId, externalCompactionId);
         extJob.ecid = ecj.getExternalCompactionId();
+        queuedTask.remove(extJob);
         return ecj;
       } else {
         // TODO could this cause a stack overflow?
@@ -142,16 +146,12 @@ public class ExternalCompactionExecutor implements 
CompactionExecutor {
       queue.add(extJob);
     }
 
-    // TODO Auto-generated method stub
     return null;
   }
 
   // TODO maybe create non-thrift type to avoid thrift types all over the code
   public TCompactionQueueSummary summarize() {
-    // TODO maybe try to keep this precomputed to avoid looping over entire 
queue for each request
-    // TODO if count is not needed would not even need to loop over entire 
queue
-    // TODO cast to int is problematic
-    int count = (int) queue.stream().filter(extJob -> extJob.status.get() == 
Status.QUEUED).count();
+    int count = queuedTask.size();
 
     long priority = 0;
     ExternalJob topJob = queue.peek();
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java
index ace93d8..44e74e4 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java
@@ -198,12 +198,16 @@ public class InternalCompactionExecutor implements 
CompactionExecutor {
   }
 
   @Override
-  public int getCompactionsRunning() {
+  public int getCompactionsRunning(CType ctype) {
+    if (ctype != CType.INTERNAL)
+      return 0;
     return threadPool.getActiveCount();
   }
 
   @Override
-  public int getCompactionsQueued() {
+  public int getCompactionsQueued(CType ctype) {
+    if (ctype != CType.INTERNAL)
+      return 0;
     return queuedTask.size();
   }
 
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
index 534c9b7..97175de 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
@@ -33,6 +33,7 @@ import java.util.SortedMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
@@ -858,6 +859,11 @@ public class CompactableImpl implements Compactable {
   }
 
   @Override
+  public void getExternalCompactionIds(Consumer<ExternalCompactionId> 
idConsumer) {
+    externalCompactions.forEach((ecid, eci) -> idConsumer.accept(ecid));
+  }
+
+  @Override
   public CompactionServiceId getConfiguredService(CompactionKind kind) {
 
     Map<String,String> debugHints = null;

Reply via email to