This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new 6e745d16fa Add Metrics for CompactionJobPriority Queues (#3551) 6e745d16fa is described below commit 6e745d16fa40524b60f63adc2873ab1f42b2454a Author: Daniel Roberts <ddani...@gmail.com> AuthorDate: Wed Jul 19 22:38:35 2023 -0400 Add Metrics for CompactionJobPriority Queues (#3551) Adds the following metrics for CompactionJobPriority: * Number of Queues * Queue Size * Number of Queued Jobs * Number of Dequeued Jobs * Number of Rejected Jobs * Lowest Job Priority Metric Other changes in commit * Adds configurable property to change CompactionQueue size and test rejectedJobs. See #3635 for more details. * Switches metric names to dot notation * Adds CompactionPriorityQueueMetricsIT * Adds User defined CompactionResourceGroup override to MAC * Adds configurable property to change CompactionQueue size and test rejectedJobs. See #3635 for more details. --- .../org/apache/accumulo/core/conf/Property.java | 3 + .../accumulo/core/metrics/MetricsProducer.java | 55 ++- .../apache/accumulo/core/metrics/MetricsUtil.java | 30 ++ .../accumulo/core/metrics/MetricsUtilTest.java | 43 +++ .../miniclusterImpl/MiniAccumuloClusterImpl.java | 5 +- .../miniclusterImpl/MiniAccumuloConfigImpl.java | 2 + .../java/org/apache/accumulo/manager/Manager.java | 3 +- .../queue/CompactionJobPriorityQueue.java | 28 +- .../compaction/queue/CompactionJobQueues.java | 46 ++- .../accumulo/manager/metrics/ManagerMetrics.java | 3 + .../accumulo/manager/metrics/QueueMetrics.java | 100 ++++++ .../CompactionPriorityQueueMetricsIT.java | 397 +++++++++++++++++++++ 12 files changed, 707 insertions(+), 8 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index 3fca886afc..8cdf429882 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -373,6 +373,9 @@ public enum Property { "1.10.0"), MANAGER_SPLIT_WORKER_THREADS("manager.split.inspection.threadpool.size", "8", PropertyType.COUNT, "The number of threads used to inspect tablets files to find split points.", "4.0.0"), + + MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE("manager.compaction.major.service.queue.size", + "10000", PropertyType.COUNT, "The max size of the priority queue", "4.0"), // properties that are specific to scan server behavior @Experimental SSERV_PREFIX("sserver.", null, PropertyType.PREFIX, diff --git a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java index 322b236e58..820a432bd2 100644 --- a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java +++ b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java @@ -48,7 +48,7 @@ import io.micrometer.core.instrument.MeterRegistry; * <td>N/A</td> * <td>N/A</td> * <td>{@link #METRICS_LOW_MEMORY}</td> - * <td>Guage</td> + * <td>Gauge</td> * <td>reports 1 when process memory usage is above threshold, 0 when memory is okay</td> * </tr> * <tr> @@ -59,6 +59,48 @@ import io.micrometer.core.instrument.MeterRegistry; * <td></td> * </tr> * <tr> + * <td>N/A</td> + * <td>N/A</td> + * <td>{@link #METRICS_COMPACTOR_JOB_PRIORITY_QUEUES}</td> + * <td>Gauge</td> + * <td></td> + * </tr> + * <tr> + * <td>N/A</td> + * <td>N/A</td> + * <td>{@link #METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH}</td> + * <td>Gauge</td> + * <td></td> + * </tr> + * <tr> + * <td>N/A</td> + * <td>N/A</td> + * <td>{@link #METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_PRIORITY}</td> + * <td>Gauge</td> + * <td></td> + * </tr> + * <tr> + * <td>N/A</td> + * <td>N/A</td> + * <td>{@link #METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED}</td> + * <td>Gauge</td> + * <td></td> + * </tr> + * <tr> + * <td>N/A</td> + * <td>N/A</td> + * <td>{@link #METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_DEQUEUED}</td> + * <td>Gauge</td> + * <td></td> + * </tr> + * <tr> + * <td>N/A</td> + * <td>N/A</td> + * <td>{@link #METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_REJECTED}</td> + * <td>Gauge</td> + * <td></td> + * </tr> + * <tr> * <td>currentFateOps</td> * <td>Gauge</td> * <td>{@link #METRICS_FATE_TOTAL_IN_PROGRESS}</td> @@ -597,6 +639,17 @@ public interface MetricsProducer { String METRICS_LOW_MEMORY = "accumulo.detected.low.memory"; String METRICS_COMPACTOR_PREFIX = "accumulo.compactor."; String METRICS_COMPACTOR_MAJC_STUCK = METRICS_COMPACTOR_PREFIX + "majc.stuck"; + String METRICS_COMPACTOR_JOB_PRIORITY_QUEUES = METRICS_COMPACTOR_PREFIX + "queue.count"; + String METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH = METRICS_COMPACTOR_PREFIX + "queue.length"; + String METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_DEQUEUED = + METRICS_COMPACTOR_PREFIX + "queue.jobs.dequeued"; + String METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED = + METRICS_COMPACTOR_PREFIX + "queue.jobs.queued"; + String METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_REJECTED = + METRICS_COMPACTOR_PREFIX + "queue.jobs.rejected"; + + String METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_PRIORITY = + METRICS_COMPACTOR_PREFIX + "queue.jobs.priority"; String METRICS_FATE_PREFIX = "accumulo.fate."; String METRICS_FATE_TYPE_IN_PROGRESS = METRICS_FATE_PREFIX + "ops.in_progress_by_type"; diff --git a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsUtil.java b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsUtil.java index ce3a0c5ea4..92b11cc7d0 100644 --- a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsUtil.java @@ -23,6 +23,8 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.ExecutorService; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.accumulo.core.classloader.ClassLoaderUtil; import org.apache.accumulo.core.conf.AccumuloConfiguration; @@ -48,6 +50,7 @@ public class MetricsUtil { private static JvmGcMetrics gc; private static List<Tag> commonTags; + private static Pattern camelCasePattern = Pattern.compile("[a-z][A-Z][a-z]"); public static void initializeMetrics(final AccumuloConfiguration conf, final String appName, final HostAndPort address) throws ClassNotFoundException, InstantiationException, @@ -121,6 +124,33 @@ public class MetricsUtil { return commonTags; } + /** + * Centralize any specific string formatting for metric names and/or tags. Ensure strings match + * the micrometer naming convention. + */ + public static String formatString(String name) { + + // Handle spaces + name = name.replace(" ", "."); + // Handle snake_case notation + name = name.replace("_", "."); + // Handle Hyphens + name = name.replace("-", "."); + + // Handle camelCase notation + Matcher matcher = camelCasePattern.matcher(name); + StringBuilder output = new StringBuilder(name); + int insertCount = 0; + while (matcher.find()) { + // Pattern matches at a lowercase letter, but the insert is at the second position. + output.insert(matcher.start() + 1 + insertCount, "."); + // The correct index position will shift as inserts occur. + insertCount++; + } + name = output.toString(); + return name.toLowerCase(); + } + public static void close() { if (gc != null) { gc.close(); diff --git a/core/src/test/java/org/apache/accumulo/core/metrics/MetricsUtilTest.java b/core/src/test/java/org/apache/accumulo/core/metrics/MetricsUtilTest.java new file mode 100644 index 0000000000..0c8d64b60b --- /dev/null +++ b/core/src/test/java/org/apache/accumulo/core/metrics/MetricsUtilTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.metrics; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.Map; + +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MetricsUtilTest { + + private static final Logger log = LoggerFactory.getLogger(MetricsUtilTest.class); + + @Test + public void testLabelFormatting() { + Map.of("camelCase", "camel.case", "camelCamelCamelCase", "camel.camel.camel.case", "snake_case", + "snake.case", "normal.label", "normal.label", "space separated", "space.separated", + "Capital", "capital", "hyphen-ated", "hyphen.ated").forEach((label, correctFormat) -> { + log.info("Testing Label: {}", label); + String output = MetricsUtil.formatString(label); + assertTrue(output.contentEquals(correctFormat)); + }); + } +} diff --git a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java index 845a295224..2f1ac314d3 100644 --- a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java +++ b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java @@ -629,7 +629,10 @@ public class MiniAccumuloClusterImpl implements AccumuloCluster { throw new IllegalStateException("No Compactor groups configured."); } for (String name : groups) { - config.getClusterServerConfiguration().addCompactorResourceGroup(name, 1); + // Allow user override + if (!config.getClusterServerConfiguration().getCompactorConfiguration().containsKey(name)) { + config.getClusterServerConfiguration().addCompactorResourceGroup(name, 1); + } } } catch (ClassNotFoundException e) { throw new IllegalArgumentException("Unable to find declared CompactionPlanner class", e); diff --git a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java index 7f84529daf..895b1b6f4a 100644 --- a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java +++ b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java @@ -166,6 +166,8 @@ public class MiniAccumuloConfigImpl { mergeProp(Property.COMPACTOR_PORTSEARCH.getKey(), "true"); + mergeProp(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE.getKey(), + Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE.getDefaultValue()); mergeProp(Property.TSERV_COMPACTION_SERVICE_ROOT_PLANNER.getKey(), Property.TSERV_COMPACTION_SERVICE_ROOT_PLANNER.getDefaultValue()); mergeProp(Property.TSERV_COMPACTION_SERVICE_ROOT_EXECUTORS.getKey(), diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index eb56144d74..09c673aef0 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -1166,7 +1166,8 @@ public class Manager extends AbstractServer final ServerContext context = getContext(); final String zroot = getZooKeeperRoot(); - this.compactionJobQueues = new CompactionJobQueues(); + this.compactionJobQueues = new CompactionJobQueues( + getConfiguration().getCount(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE)); // ACCUMULO-4424 Put up the Thrift servers before getting the lock as a sign of process health // when a hot-standby diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java index aa9477818b..ae1c278147 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicLong; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.metadata.schema.TabletMetadata; @@ -94,6 +95,8 @@ public class CompactionJobPriorityQueue { // case where tablets decided to issues different compaction jobs than what is currently queued. private final TreeMap<CjpqKey,CompactionJobQueues.MetaJob> jobQueue; private final int maxSize; + private final AtomicLong rejectedJobs; + private final AtomicLong dequeuedJobs; // This map tracks what jobs a tablet currently has in the queue. Its used to efficiently remove // jobs in the queue when new jobs are queued for a tablet. @@ -108,6 +111,8 @@ public class CompactionJobPriorityQueue { this.maxSize = maxSize; this.tabletJobs = new HashMap<>(); this.executorId = executorId; + this.rejectedJobs = new AtomicLong(0); + this.dequeuedJobs = new AtomicLong(0); } public synchronized boolean add(TabletMetadata tabletMetadata, Collection<CompactionJob> jobs) { @@ -136,10 +141,31 @@ public class CompactionJobPriorityQueue { return true; } + public long getMaxSize() { + return maxSize; + } + + public long getRejectedJobs() { + return rejectedJobs.get(); + } + + public long getDequeuedJobs() { + return dequeuedJobs.get(); + } + + public synchronized long getQueuedJobs() { + return jobQueue.size(); + } + + public synchronized long getLowestPriority() { + return jobQueue.lastKey().job.getPriority(); + } + public synchronized CompactionJobQueues.MetaJob poll() { var first = jobQueue.pollFirstEntry(); if (first != null) { + dequeuedJobs.getAndIncrement(); var extent = first.getValue().getTabletMetadata().getExtent(); List<CjpqKey> jobs = tabletJobs.get(extent); checkState(jobs.remove(first.getKey())); @@ -147,7 +173,6 @@ public class CompactionJobPriorityQueue { tabletJobs.remove(extent); } } - return first == null ? null : first.getValue(); } @@ -174,6 +199,7 @@ public class CompactionJobPriorityQueue { if (job.getPriority() <= lastEntry.job.getPriority()) { // the queue is full and this job has a lower or same priority than the lowest job in the // queue, so do not add it + rejectedJobs.getAndIncrement(); return null; } else { // the new job has a higher priority than the lowest job in the queue, so remove the lowest diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java index 02d037dff0..bce1f05d7f 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java @@ -20,6 +20,7 @@ package org.apache.accumulo.manager.compaction.queue; import java.util.Collection; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentHashMap.KeySetView; import java.util.stream.Collectors; import org.apache.accumulo.core.metadata.schema.TabletMetadata; @@ -41,6 +42,12 @@ public class CompactionJobQueues { private final ConcurrentHashMap<CompactionExecutorId,CompactionJobPriorityQueue> priorityQueues = new ConcurrentHashMap<>(); + private final int queueSize; + + public CompactionJobQueues(int queueSize) { + this.queueSize = queueSize; + } + public void add(TabletMetadata tabletMetadata, Collection<CompactionJob> jobs) { if (jobs.size() == 1) { var executorId = jobs.iterator().next().getExecutor(); @@ -51,6 +58,39 @@ public class CompactionJobQueues { } } + public KeySetView<CompactionExecutorId,CompactionJobPriorityQueue> getQueueIds() { + return priorityQueues.keySet(); + } + + public long getQueueMaxSize(CompactionExecutorId executorId) { + var prioQ = priorityQueues.get(executorId); + return prioQ == null ? 0 : prioQ.getMaxSize(); + } + + public long getQueuedJobs(CompactionExecutorId executorId) { + var prioQ = priorityQueues.get(executorId); + return prioQ == null ? 0 : prioQ.getQueuedJobs(); + } + + public long getDequeuedJobs(CompactionExecutorId executorId) { + var prioQ = priorityQueues.get(executorId); + return prioQ == null ? 0 : prioQ.getDequeuedJobs(); + } + + public long getRejectedJobs(CompactionExecutorId executorId) { + var prioQ = priorityQueues.get(executorId); + return prioQ == null ? 0 : prioQ.getRejectedJobs(); + } + + public long getLowestPriority(CompactionExecutorId executorId) { + var prioQ = priorityQueues.get(executorId); + return prioQ == null ? 0 : prioQ.getLowestPriority(); + } + + public long getQueueCount() { + return priorityQueues.mappingCount(); + } + public static class MetaJob { private final CompactionJob job; @@ -87,7 +127,6 @@ public class CompactionJobQueues { } }); } - return mj; } @@ -101,14 +140,13 @@ public class CompactionJobQueues { + ",kind:" + job.getKind()).collect(Collectors.toList())); } - // TODO make max size configurable var pq = priorityQueues.computeIfAbsent(executorId, - eid -> new CompactionJobPriorityQueue(eid, 10000)); + eid -> new CompactionJobPriorityQueue(eid, queueSize)); while (!pq.add(tabletMetadata, jobs)) { // This loop handles race condition where poll() closes empty priority queues. The queue could // be closed after its obtained from the map and before add is called. pq = priorityQueues.computeIfAbsent(executorId, - eid -> new CompactionJobPriorityQueue(eid, 10000)); + eid -> new CompactionJobPriorityQueue(eid, queueSize)); } } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/ManagerMetrics.java b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/ManagerMetrics.java index 285df23a69..cd3a52955d 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/ManagerMetrics.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/ManagerMetrics.java @@ -31,16 +31,19 @@ import io.micrometer.core.instrument.MeterRegistry; public class ManagerMetrics implements MetricsProducer { private final FateMetrics fateMetrics; + private final QueueMetrics queueMetrics; public ManagerMetrics(final AccumuloConfiguration conf, final Manager manager) { requireNonNull(conf, "AccumuloConfiguration must not be null"); requireNonNull(conf, "Manager must not be null"); fateMetrics = new FateMetrics(manager.getContext(), conf.getTimeInMillis(Property.MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL)); + queueMetrics = new QueueMetrics(manager.getCompactionQueues()); } @Override public void registerMetrics(MeterRegistry registry) { fateMetrics.registerMetrics(registry); + queueMetrics.registerMetrics(registry); } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/QueueMetrics.java b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/QueueMetrics.java new file mode 100644 index 0000000000..d73fefc64f --- /dev/null +++ b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/QueueMetrics.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager.metrics; + +import static org.apache.accumulo.core.metrics.MetricsUtil.formatString; +import static org.apache.accumulo.core.metrics.MetricsUtil.getCommonTags; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.accumulo.core.metrics.MetricsProducer; +import org.apache.accumulo.core.spi.compaction.CompactionExecutorId; +import org.apache.accumulo.core.util.threads.ThreadPools; +import org.apache.accumulo.manager.compaction.queue.CompactionJobQueues; + +import io.micrometer.core.instrument.Gauge; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Tags; + +public class QueueMetrics implements MetricsProducer { + private static final long DEFAULT_MIN_REFRESH_DELAY = TimeUnit.SECONDS.toMillis(5); + private MeterRegistry meterRegistry = null; + private final CompactionJobQueues compactionJobQueues; + private AtomicLong queueCount; + + public QueueMetrics(CompactionJobQueues compactionJobQueues) { + this.compactionJobQueues = compactionJobQueues; + ScheduledExecutorService scheduler = ThreadPools.getServerThreadPools() + .createScheduledExecutorService(1, "queueMetricsPoller", false); + Runtime.getRuntime().addShutdownHook(new Thread(scheduler::shutdownNow)); + ThreadPools.watchNonCriticalScheduledTask(scheduler.scheduleAtFixedRate(this::update, + DEFAULT_MIN_REFRESH_DELAY, DEFAULT_MIN_REFRESH_DELAY, TimeUnit.MILLISECONDS)); + } + + public void update() { + + Gauge + .builder(METRICS_COMPACTOR_JOB_PRIORITY_QUEUES, compactionJobQueues, + CompactionJobQueues::getQueueCount) + .description("Number of current Queues").tags(getCommonTags()).register(meterRegistry); + + for (CompactionExecutorId ceid : compactionJobQueues.getQueueIds()) { + // Normalize the queueId to match metrics tag naming convention. + String queueId = formatString(ceid.toString()); + + // Register queues by ID rather than by object as queues can be deleted. + Gauge + .builder(METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH, ceid, + compactionJobQueues::getQueueMaxSize) + .description("Length of priority queues") + .tags(Tags.concat(getCommonTags(), "queue.id", queueId)).register(meterRegistry); + + Gauge + .builder(METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED, ceid, + compactionJobQueues::getQueuedJobs) + .description("Count of queued jobs") + .tags(Tags.concat(getCommonTags(), "queue.id", queueId)).register(meterRegistry); + + Gauge + .builder(METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_DEQUEUED, ceid, + compactionJobQueues::getDequeuedJobs) + .description("Count of jobs dequeued") + .tags(Tags.concat(getCommonTags(), "queue.id", queueId)).register(meterRegistry); + + Gauge + .builder(METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_REJECTED, ceid, + compactionJobQueues::getRejectedJobs) + .description("Count of rejected jobs") + .tags(Tags.concat(getCommonTags(), "queue.id", queueId)).register(meterRegistry); + + Gauge + .builder(METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_PRIORITY, ceid, + compactionJobQueues::getLowestPriority) + .description("Lowest priority queued job") + .tags(Tags.concat(getCommonTags(), "queue.id", queueId)).register(meterRegistry); + } + } + + @Override + public void registerMetrics(MeterRegistry registry) { + this.meterRegistry = registry; + } +} diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java new file mode 100644 index 0000000000..cc5a423eab --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java @@ -0,0 +1,397 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.compaction; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.IOException; +import java.math.BigInteger; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.time.Duration; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.admin.CompactionConfig; +import org.apache.accumulo.core.client.admin.NewTableConfiguration; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.file.FileOperations; +import org.apache.accumulo.core.file.FileSKVWriter; +import org.apache.accumulo.core.file.rfile.RFile; +import org.apache.accumulo.core.metadata.UnreferencedTabletFile; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.TabletsMetadata; +import org.apache.accumulo.core.metrics.MetricsProducer; +import org.apache.accumulo.core.metrics.MetricsUtil; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner; +import org.apache.accumulo.core.spi.compaction.SimpleCompactionDispatcher; +import org.apache.accumulo.core.spi.crypto.NoCryptoServiceFactory; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.core.util.threads.Threads; +import org.apache.accumulo.harness.MiniClusterConfigurationCallback; +import org.apache.accumulo.harness.SharedMiniClusterBase; +import org.apache.accumulo.minicluster.MemoryUnit; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.test.functional.CompactionIT; +import org.apache.accumulo.test.metrics.TestStatsDRegistryFactory; +import org.apache.accumulo.test.metrics.TestStatsDSink; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.apache.hadoop.io.Text; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; + +// Use the shared Mini Cluster for all metric tests as Compaction Resources can be spun up and down per test. +public class CompactionPriorityQueueMetricsIT extends SharedMiniClusterBase { + + public static final Logger log = LoggerFactory.getLogger(CompactionPriorityQueueMetricsIT.class); + + private static TestStatsDSink sink; + private String tableName; + private TableId tableId; + private AccumuloConfiguration aconf; + private FileSystem fs; + private String rootPath; + + public static final String QUEUE1 = "METRICSQ1"; + public static final String QUEUE1_METRIC_LABEL = "e." + MetricsUtil.formatString(QUEUE1); + public static final String QUEUE1_SERVICE = "Q1"; + public static final int QUEUE1_SIZE = 6; + + @BeforeEach + public void setupMetricsTest() throws Exception { + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + tableName = getUniqueNames(1)[0]; + + Map<String,String> props = + Map.of("table.compaction.dispatcher", SimpleCompactionDispatcher.class.getName(), + "table.compaction.dispatcher.opts.service", QUEUE1_SERVICE); + NewTableConfiguration ntc = new NewTableConfiguration().setProperties(props); + c.tableOperations().create(tableName, ntc); + + tableId = TableId.of(c.tableOperations().tableIdMap().get(tableName)); + aconf = getCluster().getServerContext().getConfiguration(); + fs = getCluster().getFileSystem(); + rootPath = getCluster().getTemporaryPath().toString(); + } + } + + private String getDir(String testName) throws Exception { + String dir = rootPath + testName + getUniqueNames(1)[0]; + fs.delete(new Path(dir), true); + return dir; + } + + @Override + protected Duration defaultTimeout() { + return Duration.ofMinutes(4); + } + + @BeforeAll + public static void setup() throws Exception { + sink = new TestStatsDSink(); + SharedMiniClusterBase.startMiniClusterWithConfig(new CompactionPriorityQueueMetricsITConfig()); + } + + @AfterAll + public static void teardown() { + SharedMiniClusterBase.stopMiniCluster(); + if (sink != null) { + sink.close(); + } + } + + public static class CompactionPriorityQueueMetricsITConfig + implements MiniClusterConfigurationCallback { + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration conf) { + cfg.setMemory(ServerType.TABLET_SERVER, 512, MemoryUnit.MEGABYTE); + // Zero the default compactors + cfg.getClusterServerConfiguration().setNumDefaultCompactors(0); + + // Create a new queue with zero compactors. + cfg.setProperty("tserver.compaction.major.service." + QUEUE1_SERVICE + ".planner", + DefaultCompactionPlanner.class.getName()); + cfg.setProperty( + "tserver.compaction.major.service." + QUEUE1_SERVICE + ".planner.opts.executors", + "[{'name':'all', 'type': 'external', 'group': '" + QUEUE1 + "'}]"); + + cfg.setProperty(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE, "6"); + cfg.getClusterServerConfiguration().addCompactorResourceGroup(QUEUE1, 0); + + // use raw local file system + conf.set("fs.file.impl", RawLocalFileSystem.class.getName()); + // Tell the server processes to use a StatsDMeterRegistry that will be configured + // to push all metrics to the sink we started. + cfg.setProperty(Property.GENERAL_MICROMETER_ENABLED, "true"); + cfg.setProperty(Property.GENERAL_MICROMETER_FACTORY, + TestStatsDRegistryFactory.class.getName()); + Map<String,String> sysProps = Map.of(TestStatsDRegistryFactory.SERVER_HOST, "127.0.0.1", + TestStatsDRegistryFactory.SERVER_PORT, Integer.toString(sink.getPort())); + cfg.setSystemProperties(sysProps); + } + } + + private String writeData(String file, AccumuloConfiguration aconf, int s, int e) + throws Exception { + FileSystem fs = getCluster().getFileSystem(); + String filename = file + RFile.EXTENSION; + try (FileSKVWriter writer = FileOperations.getInstance().newWriterBuilder() + .forFile(UnreferencedTabletFile.of(fs, new Path(filename)), fs, fs.getConf(), + NoCryptoServiceFactory.NONE) + .withTableConfiguration(aconf).build()) { + writer.startDefaultLocalityGroup(); + for (int i = s; i <= e; i++) { + writer.append(new Key(new Text(row(i))), new Value(Integer.toString(i))); + } + } + + return hash(filename); + } + + private void addSplits(AccumuloClient client, String tableName, String splitString) + throws Exception { + SortedSet<Text> splits = new TreeSet<>(); + for (String split : splitString.split(" ")) { + splits.add(new Text(split)); + } + client.tableOperations().addSplits(tableName, splits); + } + + private void verifyData(AccumuloClient client, String table, int start, int end, boolean setTime) + throws Exception { + try (Scanner scanner = client.createScanner(table, Authorizations.EMPTY)) { + + Iterator<Map.Entry<Key,Value>> iter = scanner.iterator(); + + for (int i = start; i <= end; i++) { + if (!iter.hasNext()) { + throw new Exception("row " + i + " not found"); + } + + Map.Entry<Key,Value> entry = iter.next(); + + String row = String.format("%04d", i); + + if (!entry.getKey().getRow().equals(new Text(row))) { + throw new Exception("unexpected row " + entry.getKey() + " " + i); + } + + if (Integer.parseInt(entry.getValue().toString()) != i) { + throw new Exception("unexpected value " + entry + " " + i); + } + + if (setTime) { + assertEquals(1L, entry.getKey().getTimestamp()); + } + } + + if (iter.hasNext()) { + throw new Exception("found more than expected " + iter.next()); + } + } + } + + @SuppressFBWarnings(value = {"PATH_TRAVERSAL_IN", "WEAK_MESSAGE_DIGEST_SHA1"}, + justification = "path provided by test; sha-1 is okay for test") + private String hash(String filename) { + try { + byte[] data = Files.readAllBytes(Paths.get(filename.replaceFirst("^file:", ""))); + byte[] hash = MessageDigest.getInstance("SHA1").digest(data); + return new BigInteger(1, hash).toString(16); + } catch (IOException | NoSuchAlgorithmException e) { + throw new RuntimeException(e); + } + } + + private static String row(int r) { + return String.format("%04d", r); + } + + @Test + public void testQueueMetrics() throws Exception { + // Metrics collector Thread + final LinkedBlockingQueue<TestStatsDSink.Metric> queueMetrics = new LinkedBlockingQueue<>(); + final AtomicBoolean shutdownTailer = new AtomicBoolean(false); + + Thread thread = Threads.createThread("metric-tailer", () -> { + while (!shutdownTailer.get()) { + List<String> statsDMetrics = sink.getLines(); + for (String s : statsDMetrics) { + if (shutdownTailer.get()) { + break; + } + if (s.startsWith(MetricsProducer.METRICS_COMPACTOR_PREFIX + "queue")) { + queueMetrics.add(TestStatsDSink.parseStatsDMetric(s)); + } + } + } + }); + thread.start(); + + long highestFileCount = 0L; + ServerContext context = getCluster().getServerContext(); + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + + String dir = getDir("/testBulkFile-"); + FileSystem fs = getCluster().getFileSystem(); + fs.mkdirs(new Path(dir)); + + // Create splits so there are two groupings of tablets with similar file counts. + List<String> splitPoints = + List.of("500", "1000", "1500", "2000", "3750", "5500", "7250", "9000"); + for (String splitPoint : splitPoints) { + addSplits(c, tableName, splitPoint); + } + + for (int i = 0; i < 100; i++) { + writeData(dir + "/f" + i + ".", aconf, i * 100, (i + 1) * 100 - 1); + } + c.tableOperations().importDirectory(dir).to(tableName).load(); + + IteratorSetting iterSetting = new IteratorSetting(100, CompactionIT.TestFilter.class); + iterSetting.addOption("expectedQ", QUEUE1); + iterSetting.addOption("modulus", 3 + ""); + CompactionConfig config = + new CompactionConfig().setIterators(List.of(iterSetting)).setWait(false); + c.tableOperations().compact(tableName, config); + + try (TabletsMetadata tm = context.getAmple().readTablets().forTable(tableId).build()) { + // Get each tablet's file sizes + for (TabletMetadata tablet : tm) { + long fileSize = tablet.getFiles().size(); + log.info("Number of files in tablet {}: {}", tablet.getExtent().toString(), fileSize); + highestFileCount = Math.max(highestFileCount, fileSize); + } + } + verifyData(c, tableName, 0, 100 * 100 - 1, false); + } + + boolean sawMetricsQ1 = false; + while (!sawMetricsQ1) { + while (!queueMetrics.isEmpty()) { + var qm = queueMetrics.take(); + if (qm.getName().contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED) + && qm.getTags().containsValue(QUEUE1_METRIC_LABEL)) { + if (Integer.parseInt(qm.getValue()) > 0) { + sawMetricsQ1 = true; + } + } + } + // Current poll rate of the TestStatsDRegistryFactory is 3 seconds + // If metrics are not found in the queue, sleep until the next poll. + UtilWaitThread.sleep(3500); + } + + // Set lowest priority to the lowest possible system compaction priority + long lowestPriority = Short.MIN_VALUE; + long rejectedCount = 0L; + int queueSize = 0; + + boolean sawQueues = false; + // An empty queue means that the last known value is the most recent. + while (!queueMetrics.isEmpty()) { + var metric = queueMetrics.take(); + if (metric.getName() + .contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_REJECTED) + && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) { + rejectedCount = Long.parseLong(metric.getValue()); + } else if (metric.getName() + .contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_PRIORITY) + && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) { + lowestPriority = Math.max(lowestPriority, Long.parseLong(metric.getValue())); + } else if (metric.getName() + .contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH) + && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) { + queueSize = Integer.parseInt(metric.getValue()); + } else if (metric.getName().contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUES)) { + sawQueues = true; + } else { + log.debug("{}", metric); + } + } + + // Confirm metrics were generated and in some cases, validate contents. + assertTrue(rejectedCount > 0L); + + // Priority is the file counts + number of compactions for that tablet. + // The lowestPriority job in the queue should have been + // at least 1 count higher than the highest file count. + assertTrue(lowestPriority > highestFileCount); + + // Multiple Queues have been created + assertTrue(sawQueues); + + // Queue size matches the intended queue size + assertEquals(QUEUE1_SIZE, queueSize); + + // Start Compactors + getCluster().getConfig().getClusterServerConfiguration().addCompactorResourceGroup(QUEUE1, 1); + getCluster().getClusterControl().start(ServerType.COMPACTOR); + + boolean emptyQueue = false; + + // Make sure that metrics added to the queue are recent + UtilWaitThread.sleep(3500); + + while (!emptyQueue) { + while (!queueMetrics.isEmpty()) { + var metric = queueMetrics.take(); + if (metric.getName() + .contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED) + && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) { + if (Integer.parseInt(metric.getValue()) == 0) { + emptyQueue = true; + } + } + } + UtilWaitThread.sleep(3500); + } + + shutdownTailer.set(true); + thread.join(); + } +}