This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new 6e745d16fa Add Metrics for CompactionJobPriority Queues (#3551)
6e745d16fa is described below

commit 6e745d16fa40524b60f63adc2873ab1f42b2454a
Author: Daniel Roberts <ddani...@gmail.com>
AuthorDate: Wed Jul 19 22:38:35 2023 -0400

    Add Metrics for CompactionJobPriority Queues (#3551)
    
    Adds the following metrics for CompactionJobPriority:
    
    * Number of Queues
    * Queue Size
    * Number of Queued Jobs
    * Number of Dequeued Jobs
    * Number of Rejected Jobs
    * Lowest Job Priority Metric
    
    Other changes in commit
    
    * Adds configurable property to change CompactionQueue size and test
    rejectedJobs. See #3635 for more details.
    * Switches metric names to dot notation
    * Adds CompactionPriorityQueueMetricsIT
    * Adds User defined CompactionResourceGroup override to MAC
    * Adds configurable property to change CompactionQueue size and test
    rejectedJobs. See #3635 for more details.
---
 .../org/apache/accumulo/core/conf/Property.java    |   3 +
 .../accumulo/core/metrics/MetricsProducer.java     |  55 ++-
 .../apache/accumulo/core/metrics/MetricsUtil.java  |  30 ++
 .../accumulo/core/metrics/MetricsUtilTest.java     |  43 +++
 .../miniclusterImpl/MiniAccumuloClusterImpl.java   |   5 +-
 .../miniclusterImpl/MiniAccumuloConfigImpl.java    |   2 +
 .../java/org/apache/accumulo/manager/Manager.java  |   3 +-
 .../queue/CompactionJobPriorityQueue.java          |  28 +-
 .../compaction/queue/CompactionJobQueues.java      |  46 ++-
 .../accumulo/manager/metrics/ManagerMetrics.java   |   3 +
 .../accumulo/manager/metrics/QueueMetrics.java     | 100 ++++++
 .../CompactionPriorityQueueMetricsIT.java          | 397 +++++++++++++++++++++
 12 files changed, 707 insertions(+), 8 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java 
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 3fca886afc..8cdf429882 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -373,6 +373,9 @@ public enum Property {
       "1.10.0"),
   MANAGER_SPLIT_WORKER_THREADS("manager.split.inspection.threadpool.size", 
"8", PropertyType.COUNT,
       "The number of threads used to inspect tablets files to find split 
points.", "4.0.0"),
+
+  
MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE("manager.compaction.major.service.queue.size",
+      "10000", PropertyType.COUNT, "The max size of the priority queue", 
"4.0"),
   // properties that are specific to scan server behavior
   @Experimental
   SSERV_PREFIX("sserver.", null, PropertyType.PREFIX,
diff --git 
a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java 
b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
index 322b236e58..820a432bd2 100644
--- a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
+++ b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
@@ -48,7 +48,7 @@ import io.micrometer.core.instrument.MeterRegistry;
  * <td>N/A</td>
  * <td>N/A</td>
  * <td>{@link #METRICS_LOW_MEMORY}</td>
- * <td>Guage</td>
+ * <td>Gauge</td>
  * <td>reports 1 when process memory usage is above threshold, 0 when memory 
is okay</td>
  * </tr>
  * <tr>
@@ -59,6 +59,48 @@ import io.micrometer.core.instrument.MeterRegistry;
  * <td></td>
  * </tr>
  * <tr>
+ * <td>N/A</td>
+ * <td>N/A</td>
+ * <td>{@link #METRICS_COMPACTOR_JOB_PRIORITY_QUEUES}</td>
+ * <td>Gauge</td>
+ * <td></td>
+ * </tr>
+ * <tr>
+ * <td>N/A</td>
+ * <td>N/A</td>
+ * <td>{@link #METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH}</td>
+ * <td>Gauge</td>
+ * <td></td>
+ * </tr>
+ * <tr>
+ * <td>N/A</td>
+ * <td>N/A</td>
+ * <td>{@link #METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_PRIORITY}</td>
+ * <td>Gauge</td>
+ * <td></td>
+ * </tr>
+ * <tr>
+ * <td>N/A</td>
+ * <td>N/A</td>
+ * <td>{@link #METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED}</td>
+ * <td>Gauge</td>
+ * <td></td>
+ * </tr>
+ * <tr>
+ * <td>N/A</td>
+ * <td>N/A</td>
+ * <td>{@link #METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_DEQUEUED}</td>
+ * <td>Gauge</td>
+ * <td></td>
+ * </tr>
+ * <tr>
+ * <td>N/A</td>
+ * <td>N/A</td>
+ * <td>{@link #METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_REJECTED}</td>
+ * <td>Gauge</td>
+ * <td></td>
+ * </tr>
+ * <tr>
  * <td>currentFateOps</td>
  * <td>Gauge</td>
  * <td>{@link #METRICS_FATE_TOTAL_IN_PROGRESS}</td>
@@ -597,6 +639,17 @@ public interface MetricsProducer {
   String METRICS_LOW_MEMORY = "accumulo.detected.low.memory";
   String METRICS_COMPACTOR_PREFIX = "accumulo.compactor.";
   String METRICS_COMPACTOR_MAJC_STUCK = METRICS_COMPACTOR_PREFIX + 
"majc.stuck";
+  String METRICS_COMPACTOR_JOB_PRIORITY_QUEUES = METRICS_COMPACTOR_PREFIX + 
"queue.count";
+  String METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH = 
METRICS_COMPACTOR_PREFIX + "queue.length";
+  String METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_DEQUEUED =
+      METRICS_COMPACTOR_PREFIX + "queue.jobs.dequeued";
+  String METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED =
+      METRICS_COMPACTOR_PREFIX + "queue.jobs.queued";
+  String METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_REJECTED =
+      METRICS_COMPACTOR_PREFIX + "queue.jobs.rejected";
+
+  String METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_PRIORITY =
+      METRICS_COMPACTOR_PREFIX + "queue.jobs.priority";
 
   String METRICS_FATE_PREFIX = "accumulo.fate.";
   String METRICS_FATE_TYPE_IN_PROGRESS = METRICS_FATE_PREFIX + 
"ops.in_progress_by_type";
diff --git 
a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsUtil.java 
b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsUtil.java
index ce3a0c5ea4..92b11cc7d0 100644
--- a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsUtil.java
@@ -23,6 +23,8 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import org.apache.accumulo.core.classloader.ClassLoaderUtil;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
@@ -48,6 +50,7 @@ public class MetricsUtil {
 
   private static JvmGcMetrics gc;
   private static List<Tag> commonTags;
+  private static Pattern camelCasePattern = Pattern.compile("[a-z][A-Z][a-z]");
 
   public static void initializeMetrics(final AccumuloConfiguration conf, final 
String appName,
       final HostAndPort address) throws ClassNotFoundException, 
InstantiationException,
@@ -121,6 +124,33 @@ public class MetricsUtil {
     return commonTags;
   }
 
+  /**
+   * Centralize any specific string formatting for metric names and/or tags. 
Ensure strings match
+   * the micrometer naming convention.
+   */
+  public static String formatString(String name) {
+
+    // Handle spaces
+    name = name.replace(" ", ".");
+    // Handle snake_case notation
+    name = name.replace("_", ".");
+    // Handle Hyphens
+    name = name.replace("-", ".");
+
+    // Handle camelCase notation
+    Matcher matcher = camelCasePattern.matcher(name);
+    StringBuilder output = new StringBuilder(name);
+    int insertCount = 0;
+    while (matcher.find()) {
+      // Pattern matches at a lowercase letter, but the insert is at the 
second position.
+      output.insert(matcher.start() + 1 + insertCount, ".");
+      // The correct index position will shift as inserts occur.
+      insertCount++;
+    }
+    name = output.toString();
+    return name.toLowerCase();
+  }
+
   public static void close() {
     if (gc != null) {
       gc.close();
diff --git 
a/core/src/test/java/org/apache/accumulo/core/metrics/MetricsUtilTest.java 
b/core/src/test/java/org/apache/accumulo/core/metrics/MetricsUtilTest.java
new file mode 100644
index 0000000000..0c8d64b60b
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/metrics/MetricsUtilTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.metrics;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.Map;
+
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MetricsUtilTest {
+
+  private static final Logger log = 
LoggerFactory.getLogger(MetricsUtilTest.class);
+
+  @Test
+  public void testLabelFormatting() {
+    Map.of("camelCase", "camel.case", "camelCamelCamelCase", 
"camel.camel.camel.case", "snake_case",
+        "snake.case", "normal.label", "normal.label", "space separated", 
"space.separated",
+        "Capital", "capital", "hyphen-ated", "hyphen.ated").forEach((label, 
correctFormat) -> {
+          log.info("Testing Label: {}", label);
+          String output = MetricsUtil.formatString(label);
+          assertTrue(output.contentEquals(correctFormat));
+        });
+  }
+}
diff --git 
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
 
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
index 845a295224..2f1ac314d3 100644
--- 
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
+++ 
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
@@ -629,7 +629,10 @@ public class MiniAccumuloClusterImpl implements 
AccumuloCluster {
         throw new IllegalStateException("No Compactor groups configured.");
       }
       for (String name : groups) {
-        config.getClusterServerConfiguration().addCompactorResourceGroup(name, 
1);
+        // Allow user override
+        if 
(!config.getClusterServerConfiguration().getCompactorConfiguration().containsKey(name))
 {
+          
config.getClusterServerConfiguration().addCompactorResourceGroup(name, 1);
+        }
       }
     } catch (ClassNotFoundException e) {
       throw new IllegalArgumentException("Unable to find declared 
CompactionPlanner class", e);
diff --git 
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java
 
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java
index 7f84529daf..895b1b6f4a 100644
--- 
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java
+++ 
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java
@@ -166,6 +166,8 @@ public class MiniAccumuloConfigImpl {
 
       mergeProp(Property.COMPACTOR_PORTSEARCH.getKey(), "true");
 
+      
mergeProp(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE.getKey(),
+          
Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE.getDefaultValue());
       mergeProp(Property.TSERV_COMPACTION_SERVICE_ROOT_PLANNER.getKey(),
           Property.TSERV_COMPACTION_SERVICE_ROOT_PLANNER.getDefaultValue());
       mergeProp(Property.TSERV_COMPACTION_SERVICE_ROOT_EXECUTORS.getKey(),
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java 
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index eb56144d74..09c673aef0 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -1166,7 +1166,8 @@ public class Manager extends AbstractServer
     final ServerContext context = getContext();
     final String zroot = getZooKeeperRoot();
 
-    this.compactionJobQueues = new CompactionJobQueues();
+    this.compactionJobQueues = new CompactionJobQueues(
+        
getConfiguration().getCount(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE));
 
     // ACCUMULO-4424 Put up the Thrift servers before getting the lock as a 
sign of process health
     // when a hot-standby
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
index aa9477818b..ae1c278147 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata;
@@ -94,6 +95,8 @@ public class CompactionJobPriorityQueue {
   // case where tablets decided to issues different compaction jobs than what 
is currently queued.
   private final TreeMap<CjpqKey,CompactionJobQueues.MetaJob> jobQueue;
   private final int maxSize;
+  private final AtomicLong rejectedJobs;
+  private final AtomicLong dequeuedJobs;
 
   // This map tracks what jobs a tablet currently has in the queue. Its used 
to efficiently remove
   // jobs in the queue when new jobs are queued for a tablet.
@@ -108,6 +111,8 @@ public class CompactionJobPriorityQueue {
     this.maxSize = maxSize;
     this.tabletJobs = new HashMap<>();
     this.executorId = executorId;
+    this.rejectedJobs = new AtomicLong(0);
+    this.dequeuedJobs = new AtomicLong(0);
   }
 
   public synchronized boolean add(TabletMetadata tabletMetadata, 
Collection<CompactionJob> jobs) {
@@ -136,10 +141,31 @@ public class CompactionJobPriorityQueue {
     return true;
   }
 
+  public long getMaxSize() {
+    return maxSize;
+  }
+
+  public long getRejectedJobs() {
+    return rejectedJobs.get();
+  }
+
+  public long getDequeuedJobs() {
+    return dequeuedJobs.get();
+  }
+
+  public synchronized long getQueuedJobs() {
+    return jobQueue.size();
+  }
+
+  public synchronized long getLowestPriority() {
+    return jobQueue.lastKey().job.getPriority();
+  }
+
   public synchronized CompactionJobQueues.MetaJob poll() {
     var first = jobQueue.pollFirstEntry();
 
     if (first != null) {
+      dequeuedJobs.getAndIncrement();
       var extent = first.getValue().getTabletMetadata().getExtent();
       List<CjpqKey> jobs = tabletJobs.get(extent);
       checkState(jobs.remove(first.getKey()));
@@ -147,7 +173,6 @@ public class CompactionJobPriorityQueue {
         tabletJobs.remove(extent);
       }
     }
-
     return first == null ? null : first.getValue();
   }
 
@@ -174,6 +199,7 @@ public class CompactionJobPriorityQueue {
       if (job.getPriority() <= lastEntry.job.getPriority()) {
         // the queue is full and this job has a lower or same priority than 
the lowest job in the
         // queue, so do not add it
+        rejectedJobs.getAndIncrement();
         return null;
       } else {
         // the new job has a higher priority than the lowest job in the queue, 
so remove the lowest
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java
index 02d037dff0..bce1f05d7f 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java
@@ -20,6 +20,7 @@ package org.apache.accumulo.manager.compaction.queue;
 
 import java.util.Collection;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentHashMap.KeySetView;
 import java.util.stream.Collectors;
 
 import org.apache.accumulo.core.metadata.schema.TabletMetadata;
@@ -41,6 +42,12 @@ public class CompactionJobQueues {
   private final 
ConcurrentHashMap<CompactionExecutorId,CompactionJobPriorityQueue> 
priorityQueues =
       new ConcurrentHashMap<>();
 
+  private final int queueSize;
+
+  public CompactionJobQueues(int queueSize) {
+    this.queueSize = queueSize;
+  }
+
   public void add(TabletMetadata tabletMetadata, Collection<CompactionJob> 
jobs) {
     if (jobs.size() == 1) {
       var executorId = jobs.iterator().next().getExecutor();
@@ -51,6 +58,39 @@ public class CompactionJobQueues {
     }
   }
 
+  public KeySetView<CompactionExecutorId,CompactionJobPriorityQueue> 
getQueueIds() {
+    return priorityQueues.keySet();
+  }
+
+  public long getQueueMaxSize(CompactionExecutorId executorId) {
+    var prioQ = priorityQueues.get(executorId);
+    return prioQ == null ? 0 : prioQ.getMaxSize();
+  }
+
+  public long getQueuedJobs(CompactionExecutorId executorId) {
+    var prioQ = priorityQueues.get(executorId);
+    return prioQ == null ? 0 : prioQ.getQueuedJobs();
+  }
+
+  public long getDequeuedJobs(CompactionExecutorId executorId) {
+    var prioQ = priorityQueues.get(executorId);
+    return prioQ == null ? 0 : prioQ.getDequeuedJobs();
+  }
+
+  public long getRejectedJobs(CompactionExecutorId executorId) {
+    var prioQ = priorityQueues.get(executorId);
+    return prioQ == null ? 0 : prioQ.getRejectedJobs();
+  }
+
+  public long getLowestPriority(CompactionExecutorId executorId) {
+    var prioQ = priorityQueues.get(executorId);
+    return prioQ == null ? 0 : prioQ.getLowestPriority();
+  }
+
+  public long getQueueCount() {
+    return priorityQueues.mappingCount();
+  }
+
   public static class MetaJob {
     private final CompactionJob job;
 
@@ -87,7 +127,6 @@ public class CompactionJobQueues {
         }
       });
     }
-
     return mj;
   }
 
@@ -101,14 +140,13 @@ public class CompactionJobQueues {
               + ",kind:" + job.getKind()).collect(Collectors.toList()));
     }
 
-    // TODO make max size configurable
     var pq = priorityQueues.computeIfAbsent(executorId,
-        eid -> new CompactionJobPriorityQueue(eid, 10000));
+        eid -> new CompactionJobPriorityQueue(eid, queueSize));
     while (!pq.add(tabletMetadata, jobs)) {
       // This loop handles race condition where poll() closes empty priority 
queues. The queue could
       // be closed after its obtained from the map and before add is called.
       pq = priorityQueues.computeIfAbsent(executorId,
-          eid -> new CompactionJobPriorityQueue(eid, 10000));
+          eid -> new CompactionJobPriorityQueue(eid, queueSize));
     }
   }
 }
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/ManagerMetrics.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/ManagerMetrics.java
index 285df23a69..cd3a52955d 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/ManagerMetrics.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/ManagerMetrics.java
@@ -31,16 +31,19 @@ import io.micrometer.core.instrument.MeterRegistry;
 public class ManagerMetrics implements MetricsProducer {
 
   private final FateMetrics fateMetrics;
+  private final QueueMetrics queueMetrics;
 
   public ManagerMetrics(final AccumuloConfiguration conf, final Manager 
manager) {
     requireNonNull(conf, "AccumuloConfiguration must not be null");
     requireNonNull(conf, "Manager must not be null");
     fateMetrics = new FateMetrics(manager.getContext(),
         
conf.getTimeInMillis(Property.MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL));
+    queueMetrics = new QueueMetrics(manager.getCompactionQueues());
   }
 
   @Override
   public void registerMetrics(MeterRegistry registry) {
     fateMetrics.registerMetrics(registry);
+    queueMetrics.registerMetrics(registry);
   }
 }
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/QueueMetrics.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/QueueMetrics.java
new file mode 100644
index 0000000000..d73fefc64f
--- /dev/null
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/QueueMetrics.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.manager.metrics;
+
+import static org.apache.accumulo.core.metrics.MetricsUtil.formatString;
+import static org.apache.accumulo.core.metrics.MetricsUtil.getCommonTags;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.accumulo.core.metrics.MetricsProducer;
+import org.apache.accumulo.core.spi.compaction.CompactionExecutorId;
+import org.apache.accumulo.core.util.threads.ThreadPools;
+import org.apache.accumulo.manager.compaction.queue.CompactionJobQueues;
+
+import io.micrometer.core.instrument.Gauge;
+import io.micrometer.core.instrument.MeterRegistry;
+import io.micrometer.core.instrument.Tags;
+
+public class QueueMetrics implements MetricsProducer {
+  private static final long DEFAULT_MIN_REFRESH_DELAY = 
TimeUnit.SECONDS.toMillis(5);
+  private MeterRegistry meterRegistry = null;
+  private final CompactionJobQueues compactionJobQueues;
+  private AtomicLong queueCount;
+
+  public QueueMetrics(CompactionJobQueues compactionJobQueues) {
+    this.compactionJobQueues = compactionJobQueues;
+    ScheduledExecutorService scheduler = ThreadPools.getServerThreadPools()
+        .createScheduledExecutorService(1, "queueMetricsPoller", false);
+    Runtime.getRuntime().addShutdownHook(new Thread(scheduler::shutdownNow));
+    
ThreadPools.watchNonCriticalScheduledTask(scheduler.scheduleAtFixedRate(this::update,
+        DEFAULT_MIN_REFRESH_DELAY, DEFAULT_MIN_REFRESH_DELAY, 
TimeUnit.MILLISECONDS));
+  }
+
+  public void update() {
+
+    Gauge
+        .builder(METRICS_COMPACTOR_JOB_PRIORITY_QUEUES, compactionJobQueues,
+            CompactionJobQueues::getQueueCount)
+        .description("Number of current 
Queues").tags(getCommonTags()).register(meterRegistry);
+
+    for (CompactionExecutorId ceid : compactionJobQueues.getQueueIds()) {
+      // Normalize the queueId to match metrics tag naming convention.
+      String queueId = formatString(ceid.toString());
+
+      // Register queues by ID rather than by object as queues can be deleted.
+      Gauge
+          .builder(METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH, ceid,
+              compactionJobQueues::getQueueMaxSize)
+          .description("Length of priority queues")
+          .tags(Tags.concat(getCommonTags(), "queue.id", 
queueId)).register(meterRegistry);
+
+      Gauge
+          .builder(METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED, ceid,
+              compactionJobQueues::getQueuedJobs)
+          .description("Count of queued jobs")
+          .tags(Tags.concat(getCommonTags(), "queue.id", 
queueId)).register(meterRegistry);
+
+      Gauge
+          .builder(METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_DEQUEUED, ceid,
+              compactionJobQueues::getDequeuedJobs)
+          .description("Count of jobs dequeued")
+          .tags(Tags.concat(getCommonTags(), "queue.id", 
queueId)).register(meterRegistry);
+
+      Gauge
+          .builder(METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_REJECTED, ceid,
+              compactionJobQueues::getRejectedJobs)
+          .description("Count of rejected jobs")
+          .tags(Tags.concat(getCommonTags(), "queue.id", 
queueId)).register(meterRegistry);
+
+      Gauge
+          .builder(METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_PRIORITY, ceid,
+              compactionJobQueues::getLowestPriority)
+          .description("Lowest priority queued job")
+          .tags(Tags.concat(getCommonTags(), "queue.id", 
queueId)).register(meterRegistry);
+    }
+  }
+
+  @Override
+  public void registerMetrics(MeterRegistry registry) {
+    this.meterRegistry = registry;
+  }
+}
diff --git 
a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java
 
b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java
new file mode 100644
index 0000000000..cc5a423eab
--- /dev/null
+++ 
b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.compaction;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.time.Duration;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.admin.CompactionConfig;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileOperations;
+import org.apache.accumulo.core.file.FileSKVWriter;
+import org.apache.accumulo.core.file.rfile.RFile;
+import org.apache.accumulo.core.metadata.UnreferencedTabletFile;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
+import org.apache.accumulo.core.metrics.MetricsProducer;
+import org.apache.accumulo.core.metrics.MetricsUtil;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner;
+import org.apache.accumulo.core.spi.compaction.SimpleCompactionDispatcher;
+import org.apache.accumulo.core.spi.crypto.NoCryptoServiceFactory;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.core.util.threads.Threads;
+import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
+import org.apache.accumulo.harness.SharedMiniClusterBase;
+import org.apache.accumulo.minicluster.MemoryUnit;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.test.functional.CompactionIT;
+import org.apache.accumulo.test.metrics.TestStatsDRegistryFactory;
+import org.apache.accumulo.test.metrics.TestStatsDSink;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.Text;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+// Use the shared Mini Cluster for all metric tests as Compaction Resources 
can be spun up and down per test.
+public class CompactionPriorityQueueMetricsIT extends SharedMiniClusterBase {
+
+  public static final Logger log = 
LoggerFactory.getLogger(CompactionPriorityQueueMetricsIT.class);
+
+  private static TestStatsDSink sink;
+  private String tableName;
+  private TableId tableId;
+  private AccumuloConfiguration aconf;
+  private FileSystem fs;
+  private String rootPath;
+
+  public static final String QUEUE1 = "METRICSQ1";
+  public static final String QUEUE1_METRIC_LABEL = "e." + 
MetricsUtil.formatString(QUEUE1);
+  public static final String QUEUE1_SERVICE = "Q1";
+  public static final int QUEUE1_SIZE = 6;
+
+  @BeforeEach
+  public void setupMetricsTest() throws Exception {
+    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
+      tableName = getUniqueNames(1)[0];
+
+      Map<String,String> props =
+          Map.of("table.compaction.dispatcher", 
SimpleCompactionDispatcher.class.getName(),
+              "table.compaction.dispatcher.opts.service", QUEUE1_SERVICE);
+      NewTableConfiguration ntc = new 
NewTableConfiguration().setProperties(props);
+      c.tableOperations().create(tableName, ntc);
+
+      tableId = TableId.of(c.tableOperations().tableIdMap().get(tableName));
+      aconf = getCluster().getServerContext().getConfiguration();
+      fs = getCluster().getFileSystem();
+      rootPath = getCluster().getTemporaryPath().toString();
+    }
+  }
+
+  private String getDir(String testName) throws Exception {
+    String dir = rootPath + testName + getUniqueNames(1)[0];
+    fs.delete(new Path(dir), true);
+    return dir;
+  }
+
+  @Override
+  protected Duration defaultTimeout() {
+    return Duration.ofMinutes(4);
+  }
+
+  @BeforeAll
+  public static void setup() throws Exception {
+    sink = new TestStatsDSink();
+    SharedMiniClusterBase.startMiniClusterWithConfig(new 
CompactionPriorityQueueMetricsITConfig());
+  }
+
+  @AfterAll
+  public static void teardown() {
+    SharedMiniClusterBase.stopMiniCluster();
+    if (sink != null) {
+      sink.close();
+    }
+  }
+
+  public static class CompactionPriorityQueueMetricsITConfig
+      implements MiniClusterConfigurationCallback {
+    @Override
+    public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration 
conf) {
+      cfg.setMemory(ServerType.TABLET_SERVER, 512, MemoryUnit.MEGABYTE);
+      // Zero the default compactors
+      cfg.getClusterServerConfiguration().setNumDefaultCompactors(0);
+
+      // Create a new queue with zero compactors.
+      cfg.setProperty("tserver.compaction.major.service." + QUEUE1_SERVICE + 
".planner",
+          DefaultCompactionPlanner.class.getName());
+      cfg.setProperty(
+          "tserver.compaction.major.service." + QUEUE1_SERVICE + 
".planner.opts.executors",
+          "[{'name':'all', 'type': 'external', 'group': '" + QUEUE1 + "'}]");
+
+      cfg.setProperty(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE, 
"6");
+      cfg.getClusterServerConfiguration().addCompactorResourceGroup(QUEUE1, 0);
+
+      // use raw local file system
+      conf.set("fs.file.impl", RawLocalFileSystem.class.getName());
+      // Tell the server processes to use a StatsDMeterRegistry that will be 
configured
+      // to push all metrics to the sink we started.
+      cfg.setProperty(Property.GENERAL_MICROMETER_ENABLED, "true");
+      cfg.setProperty(Property.GENERAL_MICROMETER_FACTORY,
+          TestStatsDRegistryFactory.class.getName());
+      Map<String,String> sysProps = 
Map.of(TestStatsDRegistryFactory.SERVER_HOST, "127.0.0.1",
+          TestStatsDRegistryFactory.SERVER_PORT, 
Integer.toString(sink.getPort()));
+      cfg.setSystemProperties(sysProps);
+    }
+  }
+
+  private String writeData(String file, AccumuloConfiguration aconf, int s, 
int e)
+      throws Exception {
+    FileSystem fs = getCluster().getFileSystem();
+    String filename = file + RFile.EXTENSION;
+    try (FileSKVWriter writer = FileOperations.getInstance().newWriterBuilder()
+        .forFile(UnreferencedTabletFile.of(fs, new Path(filename)), fs, 
fs.getConf(),
+            NoCryptoServiceFactory.NONE)
+        .withTableConfiguration(aconf).build()) {
+      writer.startDefaultLocalityGroup();
+      for (int i = s; i <= e; i++) {
+        writer.append(new Key(new Text(row(i))), new 
Value(Integer.toString(i)));
+      }
+    }
+
+    return hash(filename);
+  }
+
+  private void addSplits(AccumuloClient client, String tableName, String 
splitString)
+      throws Exception {
+    SortedSet<Text> splits = new TreeSet<>();
+    for (String split : splitString.split(" ")) {
+      splits.add(new Text(split));
+    }
+    client.tableOperations().addSplits(tableName, splits);
+  }
+
+  private void verifyData(AccumuloClient client, String table, int start, int 
end, boolean setTime)
+      throws Exception {
+    try (Scanner scanner = client.createScanner(table, Authorizations.EMPTY)) {
+
+      Iterator<Map.Entry<Key,Value>> iter = scanner.iterator();
+
+      for (int i = start; i <= end; i++) {
+        if (!iter.hasNext()) {
+          throw new Exception("row " + i + " not found");
+        }
+
+        Map.Entry<Key,Value> entry = iter.next();
+
+        String row = String.format("%04d", i);
+
+        if (!entry.getKey().getRow().equals(new Text(row))) {
+          throw new Exception("unexpected row " + entry.getKey() + " " + i);
+        }
+
+        if (Integer.parseInt(entry.getValue().toString()) != i) {
+          throw new Exception("unexpected value " + entry + " " + i);
+        }
+
+        if (setTime) {
+          assertEquals(1L, entry.getKey().getTimestamp());
+        }
+      }
+
+      if (iter.hasNext()) {
+        throw new Exception("found more than expected " + iter.next());
+      }
+    }
+  }
+
+  @SuppressFBWarnings(value = {"PATH_TRAVERSAL_IN", 
"WEAK_MESSAGE_DIGEST_SHA1"},
+      justification = "path provided by test; sha-1 is okay for test")
+  private String hash(String filename) {
+    try {
+      byte[] data = 
Files.readAllBytes(Paths.get(filename.replaceFirst("^file:", "")));
+      byte[] hash = MessageDigest.getInstance("SHA1").digest(data);
+      return new BigInteger(1, hash).toString(16);
+    } catch (IOException | NoSuchAlgorithmException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private static String row(int r) {
+    return String.format("%04d", r);
+  }
+
+  @Test
+  public void testQueueMetrics() throws Exception {
+    // Metrics collector Thread
+    final LinkedBlockingQueue<TestStatsDSink.Metric> queueMetrics = new 
LinkedBlockingQueue<>();
+    final AtomicBoolean shutdownTailer = new AtomicBoolean(false);
+
+    Thread thread = Threads.createThread("metric-tailer", () -> {
+      while (!shutdownTailer.get()) {
+        List<String> statsDMetrics = sink.getLines();
+        for (String s : statsDMetrics) {
+          if (shutdownTailer.get()) {
+            break;
+          }
+          if (s.startsWith(MetricsProducer.METRICS_COMPACTOR_PREFIX + 
"queue")) {
+            queueMetrics.add(TestStatsDSink.parseStatsDMetric(s));
+          }
+        }
+      }
+    });
+    thread.start();
+
+    long highestFileCount = 0L;
+    ServerContext context = getCluster().getServerContext();
+    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
+
+      String dir = getDir("/testBulkFile-");
+      FileSystem fs = getCluster().getFileSystem();
+      fs.mkdirs(new Path(dir));
+
+      // Create splits so there are two groupings of tablets with similar file 
counts.
+      List<String> splitPoints =
+          List.of("500", "1000", "1500", "2000", "3750", "5500", "7250", 
"9000");
+      for (String splitPoint : splitPoints) {
+        addSplits(c, tableName, splitPoint);
+      }
+
+      for (int i = 0; i < 100; i++) {
+        writeData(dir + "/f" + i + ".", aconf, i * 100, (i + 1) * 100 - 1);
+      }
+      c.tableOperations().importDirectory(dir).to(tableName).load();
+
+      IteratorSetting iterSetting = new IteratorSetting(100, 
CompactionIT.TestFilter.class);
+      iterSetting.addOption("expectedQ", QUEUE1);
+      iterSetting.addOption("modulus", 3 + "");
+      CompactionConfig config =
+          new 
CompactionConfig().setIterators(List.of(iterSetting)).setWait(false);
+      c.tableOperations().compact(tableName, config);
+
+      try (TabletsMetadata tm = 
context.getAmple().readTablets().forTable(tableId).build()) {
+        // Get each tablet's file sizes
+        for (TabletMetadata tablet : tm) {
+          long fileSize = tablet.getFiles().size();
+          log.info("Number of files in tablet {}: {}", 
tablet.getExtent().toString(), fileSize);
+          highestFileCount = Math.max(highestFileCount, fileSize);
+        }
+      }
+      verifyData(c, tableName, 0, 100 * 100 - 1, false);
+    }
+
+    boolean sawMetricsQ1 = false;
+    while (!sawMetricsQ1) {
+      while (!queueMetrics.isEmpty()) {
+        var qm = queueMetrics.take();
+        if 
(qm.getName().contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED)
+            && qm.getTags().containsValue(QUEUE1_METRIC_LABEL)) {
+          if (Integer.parseInt(qm.getValue()) > 0) {
+            sawMetricsQ1 = true;
+          }
+        }
+      }
+      // Current poll rate of the TestStatsDRegistryFactory is 3 seconds
+      // If metrics are not found in the queue, sleep until the next poll.
+      UtilWaitThread.sleep(3500);
+    }
+
+    // Set lowest priority to the lowest possible system compaction priority
+    long lowestPriority = Short.MIN_VALUE;
+    long rejectedCount = 0L;
+    int queueSize = 0;
+
+    boolean sawQueues = false;
+    // An empty queue means that the last known value is the most recent.
+    while (!queueMetrics.isEmpty()) {
+      var metric = queueMetrics.take();
+      if (metric.getName()
+          
.contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_REJECTED)
+          && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) {
+        rejectedCount = Long.parseLong(metric.getValue());
+      } else if (metric.getName()
+          
.contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_PRIORITY)
+          && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) {
+        lowestPriority = Math.max(lowestPriority, 
Long.parseLong(metric.getValue()));
+      } else if (metric.getName()
+          
.contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH)
+          && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) {
+        queueSize = Integer.parseInt(metric.getValue());
+      } else if 
(metric.getName().contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUES))
 {
+        sawQueues = true;
+      } else {
+        log.debug("{}", metric);
+      }
+    }
+
+    // Confirm metrics were generated and in some cases, validate contents.
+    assertTrue(rejectedCount > 0L);
+
+    // Priority is the file counts + number of compactions for that tablet.
+    // The lowestPriority job in the queue should have been
+    // at least 1 count higher than the highest file count.
+    assertTrue(lowestPriority > highestFileCount);
+
+    // Multiple Queues have been created
+    assertTrue(sawQueues);
+
+    // Queue size matches the intended queue size
+    assertEquals(QUEUE1_SIZE, queueSize);
+
+    // Start Compactors
+    
getCluster().getConfig().getClusterServerConfiguration().addCompactorResourceGroup(QUEUE1,
 1);
+    getCluster().getClusterControl().start(ServerType.COMPACTOR);
+
+    boolean emptyQueue = false;
+
+    // Make sure that metrics added to the queue are recent
+    UtilWaitThread.sleep(3500);
+
+    while (!emptyQueue) {
+      while (!queueMetrics.isEmpty()) {
+        var metric = queueMetrics.take();
+        if (metric.getName()
+            
.contains(MetricsProducer.METRICS_COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED)
+            && metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) {
+          if (Integer.parseInt(metric.getValue()) == 0) {
+            emptyQueue = true;
+          }
+        }
+      }
+      UtilWaitThread.sleep(3500);
+    }
+
+    shutdownTailer.set(true);
+    thread.join();
+  }
+}


Reply via email to