This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 1451-external-compactions-feature
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to 
refs/heads/1451-external-compactions-feature by this push:
     new 38d4bad  Got Hadoop metrics working for ext compactions
38d4bad is described below

commit 38d4badfc23acf5b858dc4bc53401f5d47452722
Author: Keith Turner <[email protected]>
AuthorDate: Tue Apr 27 15:48:20 2021 -0400

    Got Hadoop metrics working for ext compactions
---
 .../tserver/compactions/CompactionExecutor.java    |  2 +
 .../tserver/compactions/CompactionManager.java     | 82 +++++++++++++++++-----
 .../tserver/compactions/CompactionService.java     |  4 ++
 .../compactions/ExternalCompactionExecutor.java    | 69 ++++++++++--------
 .../compactions/InternalCompactionExecutor.java    | 19 +++++
 .../metrics/CompactionExecutorsMetrics.java        | 60 ++++++++++++++--
 .../accumulo/tserver/tablet/CompactableImpl.java   | 36 ++++++----
 7 files changed, 209 insertions(+), 63 deletions(-)

diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionExecutor.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionExecutor.java
index 20e8cfa..85581a8 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionExecutor.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionExecutor.java
@@ -20,6 +20,7 @@ package org.apache.accumulo.tserver.compactions;
 
 import java.util.function.Consumer;
 
+import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.spi.compaction.CompactionJob;
 import org.apache.accumulo.core.spi.compaction.CompactionServiceId;
 
@@ -38,4 +39,5 @@ public interface CompactionExecutor {
 
   void stop();
 
+  void compactableClosed(KeyExtent extent);
 }
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
index be75ed6..ccf86b6 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
@@ -18,6 +18,7 @@
  */
 package org.apache.accumulo.tserver.compactions;
 
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -77,7 +78,17 @@ public class CompactionManager {
 
   private Map<CompactionExecutorId,ExternalCompactionExecutor> 
externalExecutors;
 
-  private Map<ExternalCompactionId,KeyExtent> runningExternalCompactions;
+  private Map<ExternalCompactionId,ExtCompInfo> runningExternalCompactions;
+
+  static class ExtCompInfo {
+    final KeyExtent extent;
+    final CompactionExecutorId executor;
+
+    public ExtCompInfo(KeyExtent extent, CompactionExecutorId executor) {
+      this.extent = extent;
+      this.executor = executor;
+    }
+  }
 
   private class Config {
     Map<String,String> planners = new HashMap<>();
@@ -326,6 +337,8 @@ public class CompactionManager {
     this.services = Map.copyOf(tmpServices);
 
     this.maxTimeBetweenChecks = 
ctx.getConfiguration().getTimeInMillis(Property.TSERV_MAJC_DELAY);
+
+    ceMetrics.setExternalMetricsSupplier(this::getExternalMetrics);
   }
 
   public void compactableChanged(Compactable compactable) {
@@ -428,7 +441,8 @@ public class CompactionManager {
     ExternalCompactionExecutor extCE = getExternalExecutor(queueName);
     var ecJob = extCE.reserveExternalCompaction(priority, compactorId, 
externalCompactionId);
     if (ecJob != null) {
-      runningExternalCompactions.put(ecJob.getExternalCompactionId(), 
ecJob.getExtent());
+      runningExternalCompactions.put(ecJob.getExternalCompactionId(),
+          new ExtCompInfo(ecJob.getExtent(), extCE.getId()));
       log.debug("Reserved external compaction {}", 
ecJob.getExternalCompactionId());
     }
     return ecJob;
@@ -442,18 +456,19 @@ public class CompactionManager {
     return getExternalExecutor(CompactionExecutorId.externalId(queueName));
   }
 
-  public void registerExternalCompaction(ExternalCompactionId ecid, KeyExtent 
externt) {
-    runningExternalCompactions.put(ecid, externt);
+  public void registerExternalCompaction(ExternalCompactionId ecid, KeyExtent 
extent,
+      CompactionExecutorId ceid) {
+    runningExternalCompactions.put(ecid, new ExtCompInfo(extent, ceid));
   }
 
   public void commitExternalCompaction(ExternalCompactionId extCompactionId,
       KeyExtent extentCompacted, Map<KeyExtent,Tablet> currentTablets, long 
fileSize,
       long entries) {
-    KeyExtent extent = runningExternalCompactions.get(extCompactionId);
-    if (extent != null) {
-      Preconditions.checkState(extent.equals(extentCompacted),
-          "Unexpected extent seen on compaction commit %s %s", extent, 
extentCompacted);
-      Tablet tablet = currentTablets.get(extent);
+    var ecInfo = runningExternalCompactions.get(extCompactionId);
+    if (ecInfo != null) {
+      Preconditions.checkState(ecInfo.extent.equals(extentCompacted),
+          "Unexpected extent seen on compaction commit %s %s", ecInfo.extent, 
extentCompacted);
+      Tablet tablet = currentTablets.get(ecInfo.extent);
       if (tablet != null) {
         tablet.asCompactable().commitExternalCompaction(extCompactionId, 
fileSize, entries);
         compactablesToCheck.add(tablet.asCompactable());
@@ -463,17 +478,17 @@ public class CompactionManager {
   }
 
   public boolean isRunningExternalCompaction(ExternalCompactionId eci, 
KeyExtent ke) {
-    KeyExtent extent = runningExternalCompactions.get(eci);
-    return (null != extent && extent.compareTo(ke) == 0);
+    var ecInfo = runningExternalCompactions.get(eci);
+    return (null != ecInfo && ecInfo.extent.compareTo(ke) == 0);
   }
 
   public void externalCompactionFailed(ExternalCompactionId ecid, KeyExtent 
extentCompacted,
       Map<KeyExtent,Tablet> currentTablets) {
-    KeyExtent extent = runningExternalCompactions.get(ecid);
-    if (extent != null) {
-      Preconditions.checkState(extent.equals(extentCompacted),
-          "Unexpected extent seen on compaction commit %s %s", extent, 
extentCompacted);
-      Tablet tablet = currentTablets.get(extent);
+    var ecInfo = runningExternalCompactions.get(ecid);
+    if (ecInfo != null) {
+      Preconditions.checkState(ecInfo.extent.equals(extentCompacted),
+          "Unexpected extent seen on compaction commit %s %s", ecInfo.extent, 
extentCompacted);
+      Tablet tablet = currentTablets.get(ecInfo.extent);
       if (tablet != null) {
         tablet.asCompactable().externalCompactionFailed(ecid);
         compactablesToCheck.add(tablet.asCompactable());
@@ -487,4 +502,39 @@ public class CompactionManager {
         .collect(Collectors.toList());
   }
 
+  public static class ExtCompMetric {
+    public CompactionExecutorId ceid;
+    public int running;
+    public int queued;
+  }
+
+  public Collection<ExtCompMetric> getExternalMetrics() {
+    Map<CompactionExecutorId,ExtCompMetric> metrics = new HashMap<>();
+
+    externalExecutors.forEach((eeid, ece) -> {
+      ExtCompMetric ecm = new ExtCompMetric();
+      ecm.ceid = eeid;
+      ecm.queued = ece.getCompactionsQueued(CType.EXTERNAL);
+      metrics.put(eeid, ecm);
+    });
+
+    runningExternalCompactions.values().forEach(eci -> {
+      var ecm = metrics.computeIfAbsent(eci.executor, id -> {
+        var newEcm = new ExtCompMetric();
+        newEcm.ceid = id;
+        return newEcm;
+      });
+
+      ecm.running++;
+    });
+
+    return metrics.values();
+  }
+
+  public void compactableClosed(KeyExtent extent, Set<CompactionServiceId> 
servicesUsed,
+      Set<ExternalCompactionId> ecids) {
+    runningExternalCompactions.keySet().removeAll(ecids);
+    servicesUsed.stream().map(services::get).filter(Objects::nonNull)
+        .forEach(compService -> compService.compactableClosed(extent));
+  }
 }
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java
index 134c7d4..ee0d145 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java
@@ -441,4 +441,8 @@ public class CompactionService {
       }
     });
   }
+
+  public void compactableClosed(KeyExtent extent) {
+    executors.values().forEach(compExecutor -> 
compExecutor.compactableClosed(extent));
+  }
 }
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionExecutor.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionExecutor.java
index 6d7757c..a4f3041 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionExecutor.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionExecutor.java
@@ -19,12 +19,17 @@
 package org.apache.accumulo.tserver.compactions;
 
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
+import java.util.stream.Collectors;
 
+import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
 import org.apache.accumulo.core.spi.compaction.CompactionExecutorId;
 import org.apache.accumulo.core.spi.compaction.CompactionJob;
@@ -34,13 +39,18 @@ import 
org.apache.accumulo.tserver.compactions.SubmittedJob.Status;
 
 public class ExternalCompactionExecutor implements CompactionExecutor {
 
+  // This exist to provide an accurate count of queued compactions for 
metrics. The PriorityQueue is
+  // not used because its size may be off due to it containing cancelled 
compactions. The collection
+  // below should not contain cancelled compactions. A concurrent set was not 
used because those do
+  // not have constant time size operations.
   private Set<ExternalJob> queuedTask = Collections.synchronizedSet(new 
HashSet<>());
 
-  private class ExternalJob extends SubmittedJob implements 
Comparable<ExternalJob> {
+  private class ExternalJob extends SubmittedJob {
     private AtomicReference<Status> status = new 
AtomicReference<>(Status.QUEUED);
     private Compactable compactable;
     private CompactionServiceId csid;
     private volatile ExternalCompactionId ecid;
+    private AtomicLong cancelCount = new AtomicLong();
 
     public ExternalJob(CompactionJob job, Compactable compactable, 
CompactionServiceId csid) {
       super(job);
@@ -69,48 +79,32 @@ public class ExternalCompactionExecutor implements 
CompactionExecutor {
         if (canceled) {
           queuedTask.remove(this);
         }
+
+        if (canceled && cancelCount.incrementAndGet() % 1024 == 0) {
+          // Occasionally clean the queue of canceled tasks that have hung 
around because of their
+          // low priority. This runs periodically, instead of every time 
something is canceled, to
+          // avoid hurting performance.
+          queue.removeIf(ej -> ej.getStatus() == Status.CANCELED);
+        }
       }
 
       return canceled;
     }
 
-    @Override
-    public int compareTo(ExternalJob o) {
-      return Long.compare(o.getJob().getPriority(), getJob().getPriority());
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (null == obj) {
-        return false;
-      }
-      if (obj == this) {
-        return true;
-      }
-      if (obj instanceof ExternalJob) {
-        ExternalJob other = (ExternalJob) obj;
-        return (this.compareTo(other) == 0);
-      }
-      return false;
-    }
-
-    @Override
-    public int hashCode() {
-      return Long.hashCode(this.getJob().getPriority());
+    public KeyExtent getExtent() {
+      return compactable.getExtent();
     }
-
   }
 
   private PriorityBlockingQueue<ExternalJob> queue;
   private CompactionExecutorId ceid;
 
-  ExternalCompactionExecutor() {
-    queue = new PriorityBlockingQueue<ExternalJob>();
-  }
-
   public ExternalCompactionExecutor(CompactionExecutorId ceid) {
     this.ceid = ceid;
-    this.queue = new PriorityBlockingQueue<ExternalJob>();
+    Comparator<ExternalJob> comparator = Comparator.comparingLong(ej -> 
ej.getJob().getPriority());
+    comparator = comparator.reversed();
+
+    this.queue = new PriorityBlockingQueue<ExternalJob>(100, comparator);
   }
 
   @Override
@@ -187,4 +181,19 @@ public class ExternalCompactionExecutor implements 
CompactionExecutor {
     return new TCompactionQueueSummary(ceid.getExernalName(), priority);
   }
 
+  public CompactionExecutorId getId() {
+    return ceid;
+  }
+
+  @Override
+  public void compactableClosed(KeyExtent extent) {
+    List<ExternalJob> taskToCancel;
+    synchronized (queuedTask) {
+      taskToCancel = queuedTask.stream().filter(ejob -> 
ejob.getExtent().equals(extent))
+          .collect(Collectors.toList());
+    }
+
+    taskToCancel.forEach(task -> task.cancel(Status.QUEUED));
+  }
+
 }
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java
index 44e74e4..cc7b66d 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java
@@ -21,6 +21,7 @@ package org.apache.accumulo.tserver.compactions;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashSet;
+import java.util.List;
 import java.util.OptionalInt;
 import java.util.Set;
 import java.util.concurrent.PriorityBlockingQueue;
@@ -29,13 +30,16 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
+import java.util.stream.Collectors;
 
+import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.spi.compaction.CompactionExecutorId;
 import org.apache.accumulo.core.spi.compaction.CompactionJob;
 import org.apache.accumulo.core.spi.compaction.CompactionServiceId;
 import org.apache.accumulo.core.util.compaction.CompactionJobPrioritizer;
 import org.apache.accumulo.core.util.ratelimit.RateLimiter;
 import org.apache.accumulo.core.util.threads.ThreadPools;
+import org.apache.accumulo.tserver.compactions.SubmittedJob.Status;
 import org.apache.accumulo.tserver.metrics.CompactionExecutorsMetrics;
 import org.apache.htrace.wrappers.TraceRunnable;
 import org.slf4j.Logger;
@@ -136,6 +140,10 @@ public class InternalCompactionExecutor implements 
CompactionExecutor {
 
       return canceled;
     }
+
+    public KeyExtent getExtent() {
+      return compactable.getExtent();
+    }
   }
 
   private static CompactionJob getJob(Runnable r) {
@@ -221,4 +229,15 @@ public class InternalCompactionExecutor implements 
CompactionExecutor {
       log.warn("Failed to close metrics {}", ceid, e);
     }
   }
+
+  @Override
+  public void compactableClosed(KeyExtent extent) {
+    List<CompactionTask> taskToCancel;
+    synchronized (queuedTask) {
+      taskToCancel = queuedTask.stream().filter(ejob -> 
ejob.getExtent().equals(extent))
+          .collect(Collectors.toList());
+    }
+
+    taskToCancel.forEach(task -> task.cancel(Status.QUEUED));
+  }
 }
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/CompactionExecutorsMetrics.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/CompactionExecutorsMetrics.java
index 1fa8a16..3d7f221 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/CompactionExecutorsMetrics.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/CompactionExecutorsMetrics.java
@@ -18,19 +18,28 @@
  */
 package org.apache.accumulo.tserver.metrics;
 
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.function.IntSupplier;
+import java.util.function.Supplier;
 
 import org.apache.accumulo.core.spi.compaction.CompactionExecutorId;
+import org.apache.accumulo.tserver.compactions.CompactionManager.ExtCompMetric;
 import org.apache.hadoop.metrics2.lib.MetricsRegistry;
 import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
 
+import com.google.common.collect.Sets;
+
 public class CompactionExecutorsMetrics extends TServerMetrics {
 
   private volatile List<CeMetrics> ceml = List.of();
   private Map<CompactionExecutorId,CeMetrics> metrics = new HashMap<>();
+  private Map<CompactionExecutorId,ExMetrics> exMetrics = new HashMap<>();
+  private volatile Supplier<Collection<ExtCompMetric>> externalMetricsSupplier;
 
   private static class CeMetrics {
     MutableGaugeInt queuedGauge;
@@ -40,6 +49,12 @@ public class CompactionExecutorsMetrics extends 
TServerMetrics {
     IntSupplier queuedSupplier;
   }
 
+  private static class ExMetrics {
+    MutableGaugeInt queuedGauge;
+    MutableGaugeInt runningGauge;
+
+  }
+
   public CompactionExecutorsMetrics() {
     super("compactionExecutors");
   }
@@ -52,10 +67,10 @@ public class CompactionExecutorsMetrics extends 
TServerMetrics {
     synchronized (metrics) {
       CeMetrics cem = metrics.computeIfAbsent(ceid, id -> {
         CeMetrics m = new CeMetrics();
-        m.queuedGauge = registry.newGauge(ceid.canonical().replace('.', '_') + 
"_queued",
-            "Queued compactions for executor " + ceid, 0);
-        m.runningGauge = registry.newGauge(ceid.canonical().replace('.', '_') 
+ "_running",
-            "Running compactions for executor " + ceid, 0);
+        m.queuedGauge = registry.newGauge(id.canonical().replace('.', '_') + 
"_queued",
+            "Queued compactions for executor " + id, 0);
+        m.runningGauge = registry.newGauge(id.canonical().replace('.', '_') + 
"_running",
+            "Running compactions for executor " + id, 0);
         return m;
       });
 
@@ -77,10 +92,47 @@ public class CompactionExecutorsMetrics extends 
TServerMetrics {
 
   @Override
   public void prepareMetrics() {
+
+    if (externalMetricsSupplier != null) {
+
+      Set<CompactionExecutorId> seenIds = new HashSet<>();
+
+      MetricsRegistry registry = super.getRegistry();
+
+      synchronized (exMetrics) {
+        externalMetricsSupplier.get().forEach(ecm -> {
+          seenIds.add(ecm.ceid);
+
+          ExMetrics exm = exMetrics.computeIfAbsent(ecm.ceid, id -> {
+            ExMetrics m = new ExMetrics();
+            m.queuedGauge = registry.newGauge(id.canonical().replace('.', '_') 
+ "_queued",
+                "Queued compactions for executor " + id, 0);
+            m.runningGauge = registry.newGauge(id.canonical().replace('.', 
'_') + "_running",
+                "Running compactions for executor " + id, 0);
+            return m;
+          });
+
+          exm.queuedGauge.set(ecm.queued);
+          exm.runningGauge.set(ecm.running);
+
+        });
+
+        Sets.difference(exMetrics.keySet(), seenIds).forEach(unusedId -> {
+          ExMetrics exm = exMetrics.get(unusedId);
+          exm.queuedGauge.set(0);
+          exm.runningGauge.set(0);
+        });
+
+      }
+    }
     ceml.forEach(cem -> {
       cem.runningGauge.set(cem.runningSupplier.getAsInt());
       cem.queuedGauge.set(cem.queuedSupplier.getAsInt());
     });
   }
 
+  public void setExternalMetricsSupplier(Supplier<Collection<ExtCompMetric>> 
ems) {
+    this.externalMetricsSupplier = ems;
+  }
+
 }
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
index f40ec6a..f419a4b 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
@@ -34,6 +34,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -109,6 +110,8 @@ public class CompactableImpl implements Compactable {
 
   private Supplier<Set<CompactionServiceId>> servicesInUse;
 
+  private Set<CompactionServiceId> servicesUsed = new 
ConcurrentSkipListSet<>();
+
   // status of special compactions
   private enum SpecialStatus {
     NEW, SELECTING, SELECTED, NOT_ACTIVE, CANCELED
@@ -191,7 +194,7 @@ public class CompactableImpl implements Compactable {
 
         log.debug("Loaded tablet {} has existing external compaction {} {}", 
getExtent(), ecid,
             ecMeta);
-        manager.registerExternalCompaction(ecid, getExtent());
+        manager.registerExternalCompaction(ecid, getExtent(), 
ecMeta.getCompactionExecutorId());
       }
     });
 
@@ -631,6 +634,8 @@ public class CompactableImpl implements Compactable {
     if (!service.equals(getConfiguredService(kind)))
       return Optional.empty();
 
+    servicesUsed.add(service);
+
     var files = tablet.getDatafiles();
 
     // very important to call following outside of lock
@@ -1140,21 +1145,26 @@ public class CompactableImpl implements Compactable {
    * should be running and none should be able to start.
    */
   public synchronized void close() {
-    if (closed)
-      return;
+    synchronized (this) {
+      if (closed)
+        return;
 
-    closed = true;
+      closed = true;
 
-    // wait while internal jobs are running or external compactions are 
committing, but do not wait
-    // on external compactions that are running
-    while (runnningJobs.stream().anyMatch(job -> 
!job.getExecutor().isExernalId())
-        || !externalCompactionsCommitting.isEmpty()) {
-      try {
-        wait(50);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new RuntimeException(e);
+      // wait while internal jobs are running or external compactions are 
committing, but do not
+      // wait
+      // on external compactions that are running
+      while (runnningJobs.stream().anyMatch(job -> 
!job.getExecutor().isExernalId())
+          || !externalCompactionsCommitting.isEmpty()) {
+        try {
+          wait(50);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new RuntimeException(e);
+        }
       }
     }
+
+    manager.compactableClosed(getExtent(), servicesUsed, 
externalCompactions.keySet());
   }
 }

Reply via email to