This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch 1451-external-compactions-feature
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to
refs/heads/1451-external-compactions-feature by this push:
new 38d4bad Got Hadoop metrics working for ext compactions
38d4bad is described below
commit 38d4badfc23acf5b858dc4bc53401f5d47452722
Author: Keith Turner <[email protected]>
AuthorDate: Tue Apr 27 15:48:20 2021 -0400
Got Hadoop metrics working for ext compactions
---
.../tserver/compactions/CompactionExecutor.java | 2 +
.../tserver/compactions/CompactionManager.java | 82 +++++++++++++++++-----
.../tserver/compactions/CompactionService.java | 4 ++
.../compactions/ExternalCompactionExecutor.java | 69 ++++++++++--------
.../compactions/InternalCompactionExecutor.java | 19 +++++
.../metrics/CompactionExecutorsMetrics.java | 60 ++++++++++++++--
.../accumulo/tserver/tablet/CompactableImpl.java | 36 ++++++----
7 files changed, 209 insertions(+), 63 deletions(-)
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionExecutor.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionExecutor.java
index 20e8cfa..85581a8 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionExecutor.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionExecutor.java
@@ -20,6 +20,7 @@ package org.apache.accumulo.tserver.compactions;
import java.util.function.Consumer;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.spi.compaction.CompactionJob;
import org.apache.accumulo.core.spi.compaction.CompactionServiceId;
@@ -38,4 +39,5 @@ public interface CompactionExecutor {
void stop();
+ void compactableClosed(KeyExtent extent);
}
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
index be75ed6..ccf86b6 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
@@ -18,6 +18,7 @@
*/
package org.apache.accumulo.tserver.compactions;
+import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -77,7 +78,17 @@ public class CompactionManager {
private Map<CompactionExecutorId,ExternalCompactionExecutor>
externalExecutors;
- private Map<ExternalCompactionId,KeyExtent> runningExternalCompactions;
+ private Map<ExternalCompactionId,ExtCompInfo> runningExternalCompactions;
+
+ static class ExtCompInfo {
+ final KeyExtent extent;
+ final CompactionExecutorId executor;
+
+ public ExtCompInfo(KeyExtent extent, CompactionExecutorId executor) {
+ this.extent = extent;
+ this.executor = executor;
+ }
+ }
private class Config {
Map<String,String> planners = new HashMap<>();
@@ -326,6 +337,8 @@ public class CompactionManager {
this.services = Map.copyOf(tmpServices);
this.maxTimeBetweenChecks =
ctx.getConfiguration().getTimeInMillis(Property.TSERV_MAJC_DELAY);
+
+ ceMetrics.setExternalMetricsSupplier(this::getExternalMetrics);
}
public void compactableChanged(Compactable compactable) {
@@ -428,7 +441,8 @@ public class CompactionManager {
ExternalCompactionExecutor extCE = getExternalExecutor(queueName);
var ecJob = extCE.reserveExternalCompaction(priority, compactorId,
externalCompactionId);
if (ecJob != null) {
- runningExternalCompactions.put(ecJob.getExternalCompactionId(),
ecJob.getExtent());
+ runningExternalCompactions.put(ecJob.getExternalCompactionId(),
+ new ExtCompInfo(ecJob.getExtent(), extCE.getId()));
log.debug("Reserved external compaction {}",
ecJob.getExternalCompactionId());
}
return ecJob;
@@ -442,18 +456,19 @@ public class CompactionManager {
return getExternalExecutor(CompactionExecutorId.externalId(queueName));
}
- public void registerExternalCompaction(ExternalCompactionId ecid, KeyExtent
externt) {
- runningExternalCompactions.put(ecid, externt);
+ public void registerExternalCompaction(ExternalCompactionId ecid, KeyExtent
extent,
+ CompactionExecutorId ceid) {
+ runningExternalCompactions.put(ecid, new ExtCompInfo(extent, ceid));
}
public void commitExternalCompaction(ExternalCompactionId extCompactionId,
KeyExtent extentCompacted, Map<KeyExtent,Tablet> currentTablets, long
fileSize,
long entries) {
- KeyExtent extent = runningExternalCompactions.get(extCompactionId);
- if (extent != null) {
- Preconditions.checkState(extent.equals(extentCompacted),
- "Unexpected extent seen on compaction commit %s %s", extent,
extentCompacted);
- Tablet tablet = currentTablets.get(extent);
+ var ecInfo = runningExternalCompactions.get(extCompactionId);
+ if (ecInfo != null) {
+ Preconditions.checkState(ecInfo.extent.equals(extentCompacted),
+ "Unexpected extent seen on compaction commit %s %s", ecInfo.extent,
extentCompacted);
+ Tablet tablet = currentTablets.get(ecInfo.extent);
if (tablet != null) {
tablet.asCompactable().commitExternalCompaction(extCompactionId,
fileSize, entries);
compactablesToCheck.add(tablet.asCompactable());
@@ -463,17 +478,17 @@ public class CompactionManager {
}
public boolean isRunningExternalCompaction(ExternalCompactionId eci,
KeyExtent ke) {
- KeyExtent extent = runningExternalCompactions.get(eci);
- return (null != extent && extent.compareTo(ke) == 0);
+ var ecInfo = runningExternalCompactions.get(eci);
+ return (null != ecInfo && ecInfo.extent.compareTo(ke) == 0);
}
public void externalCompactionFailed(ExternalCompactionId ecid, KeyExtent
extentCompacted,
Map<KeyExtent,Tablet> currentTablets) {
- KeyExtent extent = runningExternalCompactions.get(ecid);
- if (extent != null) {
- Preconditions.checkState(extent.equals(extentCompacted),
- "Unexpected extent seen on compaction commit %s %s", extent,
extentCompacted);
- Tablet tablet = currentTablets.get(extent);
+ var ecInfo = runningExternalCompactions.get(ecid);
+ if (ecInfo != null) {
+ Preconditions.checkState(ecInfo.extent.equals(extentCompacted),
+ "Unexpected extent seen on compaction commit %s %s", ecInfo.extent,
extentCompacted);
+ Tablet tablet = currentTablets.get(ecInfo.extent);
if (tablet != null) {
tablet.asCompactable().externalCompactionFailed(ecid);
compactablesToCheck.add(tablet.asCompactable());
@@ -487,4 +502,39 @@ public class CompactionManager {
.collect(Collectors.toList());
}
+ public static class ExtCompMetric {
+ public CompactionExecutorId ceid;
+ public int running;
+ public int queued;
+ }
+
+ public Collection<ExtCompMetric> getExternalMetrics() {
+ Map<CompactionExecutorId,ExtCompMetric> metrics = new HashMap<>();
+
+ externalExecutors.forEach((eeid, ece) -> {
+ ExtCompMetric ecm = new ExtCompMetric();
+ ecm.ceid = eeid;
+ ecm.queued = ece.getCompactionsQueued(CType.EXTERNAL);
+ metrics.put(eeid, ecm);
+ });
+
+ runningExternalCompactions.values().forEach(eci -> {
+ var ecm = metrics.computeIfAbsent(eci.executor, id -> {
+ var newEcm = new ExtCompMetric();
+ newEcm.ceid = id;
+ return newEcm;
+ });
+
+ ecm.running++;
+ });
+
+ return metrics.values();
+ }
+
+ public void compactableClosed(KeyExtent extent, Set<CompactionServiceId>
servicesUsed,
+ Set<ExternalCompactionId> ecids) {
+ runningExternalCompactions.keySet().removeAll(ecids);
+ servicesUsed.stream().map(services::get).filter(Objects::nonNull)
+ .forEach(compService -> compService.compactableClosed(extent));
+ }
}
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java
index 134c7d4..ee0d145 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java
@@ -441,4 +441,8 @@ public class CompactionService {
}
});
}
+
+ public void compactableClosed(KeyExtent extent) {
+ executors.values().forEach(compExecutor ->
compExecutor.compactableClosed(extent));
+ }
}
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionExecutor.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionExecutor.java
index 6d7757c..a4f3041 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionExecutor.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionExecutor.java
@@ -19,12 +19,17 @@
package org.apache.accumulo.tserver.compactions;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashSet;
+import java.util.List;
import java.util.Set;
import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
import org.apache.accumulo.core.spi.compaction.CompactionExecutorId;
import org.apache.accumulo.core.spi.compaction.CompactionJob;
@@ -34,13 +39,18 @@ import
org.apache.accumulo.tserver.compactions.SubmittedJob.Status;
public class ExternalCompactionExecutor implements CompactionExecutor {
+ // This exist to provide an accurate count of queued compactions for
metrics. The PriorityQueue is
+ // not used because its size may be off due to it containing cancelled
compactions. The collection
+ // below should not contain cancelled compactions. A concurrent set was not
used because those do
+ // not have constant time size operations.
private Set<ExternalJob> queuedTask = Collections.synchronizedSet(new
HashSet<>());
- private class ExternalJob extends SubmittedJob implements
Comparable<ExternalJob> {
+ private class ExternalJob extends SubmittedJob {
private AtomicReference<Status> status = new
AtomicReference<>(Status.QUEUED);
private Compactable compactable;
private CompactionServiceId csid;
private volatile ExternalCompactionId ecid;
+ private AtomicLong cancelCount = new AtomicLong();
public ExternalJob(CompactionJob job, Compactable compactable,
CompactionServiceId csid) {
super(job);
@@ -69,48 +79,32 @@ public class ExternalCompactionExecutor implements
CompactionExecutor {
if (canceled) {
queuedTask.remove(this);
}
+
+ if (canceled && cancelCount.incrementAndGet() % 1024 == 0) {
+ // Occasionally clean the queue of canceled tasks that have hung
around because of their
+ // low priority. This runs periodically, instead of every time
something is canceled, to
+ // avoid hurting performance.
+ queue.removeIf(ej -> ej.getStatus() == Status.CANCELED);
+ }
}
return canceled;
}
- @Override
- public int compareTo(ExternalJob o) {
- return Long.compare(o.getJob().getPriority(), getJob().getPriority());
- }
-
- @Override
- public boolean equals(Object obj) {
- if (null == obj) {
- return false;
- }
- if (obj == this) {
- return true;
- }
- if (obj instanceof ExternalJob) {
- ExternalJob other = (ExternalJob) obj;
- return (this.compareTo(other) == 0);
- }
- return false;
- }
-
- @Override
- public int hashCode() {
- return Long.hashCode(this.getJob().getPriority());
+ public KeyExtent getExtent() {
+ return compactable.getExtent();
}
-
}
private PriorityBlockingQueue<ExternalJob> queue;
private CompactionExecutorId ceid;
- ExternalCompactionExecutor() {
- queue = new PriorityBlockingQueue<ExternalJob>();
- }
-
public ExternalCompactionExecutor(CompactionExecutorId ceid) {
this.ceid = ceid;
- this.queue = new PriorityBlockingQueue<ExternalJob>();
+ Comparator<ExternalJob> comparator = Comparator.comparingLong(ej ->
ej.getJob().getPriority());
+ comparator = comparator.reversed();
+
+ this.queue = new PriorityBlockingQueue<ExternalJob>(100, comparator);
}
@Override
@@ -187,4 +181,19 @@ public class ExternalCompactionExecutor implements
CompactionExecutor {
return new TCompactionQueueSummary(ceid.getExernalName(), priority);
}
+ public CompactionExecutorId getId() {
+ return ceid;
+ }
+
+ @Override
+ public void compactableClosed(KeyExtent extent) {
+ List<ExternalJob> taskToCancel;
+ synchronized (queuedTask) {
+ taskToCancel = queuedTask.stream().filter(ejob ->
ejob.getExtent().equals(extent))
+ .collect(Collectors.toList());
+ }
+
+ taskToCancel.forEach(task -> task.cancel(Status.QUEUED));
+ }
+
}
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java
index 44e74e4..cc7b66d 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java
@@ -21,6 +21,7 @@ package org.apache.accumulo.tserver.compactions;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
+import java.util.List;
import java.util.OptionalInt;
import java.util.Set;
import java.util.concurrent.PriorityBlockingQueue;
@@ -29,13 +30,16 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.spi.compaction.CompactionExecutorId;
import org.apache.accumulo.core.spi.compaction.CompactionJob;
import org.apache.accumulo.core.spi.compaction.CompactionServiceId;
import org.apache.accumulo.core.util.compaction.CompactionJobPrioritizer;
import org.apache.accumulo.core.util.ratelimit.RateLimiter;
import org.apache.accumulo.core.util.threads.ThreadPools;
+import org.apache.accumulo.tserver.compactions.SubmittedJob.Status;
import org.apache.accumulo.tserver.metrics.CompactionExecutorsMetrics;
import org.apache.htrace.wrappers.TraceRunnable;
import org.slf4j.Logger;
@@ -136,6 +140,10 @@ public class InternalCompactionExecutor implements
CompactionExecutor {
return canceled;
}
+
+ public KeyExtent getExtent() {
+ return compactable.getExtent();
+ }
}
private static CompactionJob getJob(Runnable r) {
@@ -221,4 +229,15 @@ public class InternalCompactionExecutor implements
CompactionExecutor {
log.warn("Failed to close metrics {}", ceid, e);
}
}
+
+ @Override
+ public void compactableClosed(KeyExtent extent) {
+ List<CompactionTask> taskToCancel;
+ synchronized (queuedTask) {
+ taskToCancel = queuedTask.stream().filter(ejob ->
ejob.getExtent().equals(extent))
+ .collect(Collectors.toList());
+ }
+
+ taskToCancel.forEach(task -> task.cancel(Status.QUEUED));
+ }
}
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/CompactionExecutorsMetrics.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/CompactionExecutorsMetrics.java
index 1fa8a16..3d7f221 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/CompactionExecutorsMetrics.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/CompactionExecutorsMetrics.java
@@ -18,19 +18,28 @@
*/
package org.apache.accumulo.tserver.metrics;
+import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.function.IntSupplier;
+import java.util.function.Supplier;
import org.apache.accumulo.core.spi.compaction.CompactionExecutorId;
+import org.apache.accumulo.tserver.compactions.CompactionManager.ExtCompMetric;
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
+import com.google.common.collect.Sets;
+
public class CompactionExecutorsMetrics extends TServerMetrics {
private volatile List<CeMetrics> ceml = List.of();
private Map<CompactionExecutorId,CeMetrics> metrics = new HashMap<>();
+ private Map<CompactionExecutorId,ExMetrics> exMetrics = new HashMap<>();
+ private volatile Supplier<Collection<ExtCompMetric>> externalMetricsSupplier;
private static class CeMetrics {
MutableGaugeInt queuedGauge;
@@ -40,6 +49,12 @@ public class CompactionExecutorsMetrics extends
TServerMetrics {
IntSupplier queuedSupplier;
}
+ private static class ExMetrics {
+ MutableGaugeInt queuedGauge;
+ MutableGaugeInt runningGauge;
+
+ }
+
public CompactionExecutorsMetrics() {
super("compactionExecutors");
}
@@ -52,10 +67,10 @@ public class CompactionExecutorsMetrics extends
TServerMetrics {
synchronized (metrics) {
CeMetrics cem = metrics.computeIfAbsent(ceid, id -> {
CeMetrics m = new CeMetrics();
- m.queuedGauge = registry.newGauge(ceid.canonical().replace('.', '_') +
"_queued",
- "Queued compactions for executor " + ceid, 0);
- m.runningGauge = registry.newGauge(ceid.canonical().replace('.', '_')
+ "_running",
- "Running compactions for executor " + ceid, 0);
+ m.queuedGauge = registry.newGauge(id.canonical().replace('.', '_') +
"_queued",
+ "Queued compactions for executor " + id, 0);
+ m.runningGauge = registry.newGauge(id.canonical().replace('.', '_') +
"_running",
+ "Running compactions for executor " + id, 0);
return m;
});
@@ -77,10 +92,47 @@ public class CompactionExecutorsMetrics extends
TServerMetrics {
@Override
public void prepareMetrics() {
+
+ if (externalMetricsSupplier != null) {
+
+ Set<CompactionExecutorId> seenIds = new HashSet<>();
+
+ MetricsRegistry registry = super.getRegistry();
+
+ synchronized (exMetrics) {
+ externalMetricsSupplier.get().forEach(ecm -> {
+ seenIds.add(ecm.ceid);
+
+ ExMetrics exm = exMetrics.computeIfAbsent(ecm.ceid, id -> {
+ ExMetrics m = new ExMetrics();
+ m.queuedGauge = registry.newGauge(id.canonical().replace('.', '_')
+ "_queued",
+ "Queued compactions for executor " + id, 0);
+ m.runningGauge = registry.newGauge(id.canonical().replace('.',
'_') + "_running",
+ "Running compactions for executor " + id, 0);
+ return m;
+ });
+
+ exm.queuedGauge.set(ecm.queued);
+ exm.runningGauge.set(ecm.running);
+
+ });
+
+ Sets.difference(exMetrics.keySet(), seenIds).forEach(unusedId -> {
+ ExMetrics exm = exMetrics.get(unusedId);
+ exm.queuedGauge.set(0);
+ exm.runningGauge.set(0);
+ });
+
+ }
+ }
ceml.forEach(cem -> {
cem.runningGauge.set(cem.runningSupplier.getAsInt());
cem.queuedGauge.set(cem.queuedSupplier.getAsInt());
});
}
+ public void setExternalMetricsSupplier(Supplier<Collection<ExtCompMetric>>
ems) {
+ this.externalMetricsSupplier = ems;
+ }
+
}
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
index f40ec6a..f419a4b 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
@@ -34,6 +34,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -109,6 +110,8 @@ public class CompactableImpl implements Compactable {
private Supplier<Set<CompactionServiceId>> servicesInUse;
+ private Set<CompactionServiceId> servicesUsed = new
ConcurrentSkipListSet<>();
+
// status of special compactions
private enum SpecialStatus {
NEW, SELECTING, SELECTED, NOT_ACTIVE, CANCELED
@@ -191,7 +194,7 @@ public class CompactableImpl implements Compactable {
log.debug("Loaded tablet {} has existing external compaction {} {}",
getExtent(), ecid,
ecMeta);
- manager.registerExternalCompaction(ecid, getExtent());
+ manager.registerExternalCompaction(ecid, getExtent(),
ecMeta.getCompactionExecutorId());
}
});
@@ -631,6 +634,8 @@ public class CompactableImpl implements Compactable {
if (!service.equals(getConfiguredService(kind)))
return Optional.empty();
+ servicesUsed.add(service);
+
var files = tablet.getDatafiles();
// very important to call following outside of lock
@@ -1140,21 +1145,26 @@ public class CompactableImpl implements Compactable {
* should be running and none should be able to start.
*/
public synchronized void close() {
- if (closed)
- return;
+ synchronized (this) {
+ if (closed)
+ return;
- closed = true;
+ closed = true;
- // wait while internal jobs are running or external compactions are
committing, but do not wait
- // on external compactions that are running
- while (runnningJobs.stream().anyMatch(job ->
!job.getExecutor().isExernalId())
- || !externalCompactionsCommitting.isEmpty()) {
- try {
- wait(50);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
+ // wait while internal jobs are running or external compactions are
committing, but do not
+ // wait
+ // on external compactions that are running
+ while (runnningJobs.stream().anyMatch(job ->
!job.getExecutor().isExernalId())
+ || !externalCompactionsCommitting.isEmpty()) {
+ try {
+ wait(50);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
}
}
+
+ manager.compactableClosed(getExtent(), servicesUsed,
externalCompactions.keySet());
}
}