This is an automated email from the ASF dual-hosted git repository.
edcoleman pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 6c1e453f53 Merge branch '2.1'
6c1e453f53 is described below
commit 6c1e453f5399dc21bf9889bb78e0ff541ad95878
Author: Ed Coleman <[email protected]>
AuthorDate: Wed Apr 24 18:47:12 2024 +0000
Merge branch '2.1'
- excludes changes to the grep iterator and grep shell command.
---
core/pom.xml | 1 +
.../org/apache/accumulo/core/conf/Property.java | 11 +-
.../core/metrics/MeterRegistryFactory.java | 5 +
.../apache/accumulo/core/metrics/MetricsInfo.java | 114 ++++++++
.../accumulo/core/metrics/MetricsProducer.java | 43 ---
.../apache/accumulo/core/metrics/MetricsUtil.java | 132 ---------
.../spi/metrics/LoggingMeterRegistryFactory.java | 81 ++++++
.../core/spi/metrics/MeterRegistryFactory.java | 72 +++++
.../accumulo/core/util/threads/ThreadPools.java | 38 ++-
.../metrics/LoggingMeterRegistryFactoryTest.java | 52 ++++
.../miniclusterImpl/MiniAccumuloConfigImpl.java | 3 +
.../org/apache/accumulo/server/AbstractServer.java | 9 +-
.../org/apache/accumulo/server/ServerContext.java | 13 +
.../server/compaction/PausedCompactionMetrics.java | 5 +-
.../conf/store/impl/PropCacheCaffeineImpl.java | 18 +-
.../server/conf/store/impl/PropStoreMetrics.java | 92 ------
.../server/conf/store/impl/ZooPropLoader.java | 19 +-
.../server/conf/store/impl/ZooPropStore.java | 15 +-
.../server/metrics/MeterRegistryEnvPropImpl.java | 75 +++++
.../accumulo/server/metrics/MetricsInfoImpl.java | 319 +++++++++++++++++++++
.../apache/accumulo/server/rpc/TServerUtils.java | 11 +-
.../apache/accumulo/server/rpc/TimedProcessor.java | 6 +-
.../conf/store/impl/PropCacheCaffeineImplTest.java | 6 +-
.../server/conf/store/impl/PropStoreEventTest.java | 8 +-
.../server/conf/store/impl/ZooPropLoaderTest.java | 78 ++---
.../metrics/MeterRegistryEnvPropImplTest.java | 50 ++++
.../server/metrics/MetricsInfoImplTest.java | 84 ++++++
.../accumulo/server/rpc/TServerUtilsTest.java | 19 +-
.../coordinator/CompactionCoordinator.java | 15 +-
.../org/apache/accumulo/compactor/Compactor.java | 18 +-
.../apache/accumulo/gc/SimpleGarbageCollector.java | 19 +-
.../java/org/apache/accumulo/manager/Manager.java | 20 +-
.../accumulo/manager/metrics/ManagerMetrics.java | 24 +-
.../accumulo/manager/metrics/fate/FateMetrics.java | 49 ++--
.../java/org/apache/accumulo/monitor/Monitor.java | 6 +
.../org/apache/accumulo/tserver/ScanServer.java | 19 +-
.../org/apache/accumulo/tserver/TabletServer.java | 34 +--
.../tserver/TabletServerResourceManager.java | 35 +--
.../tserver/metrics/TabletServerMinCMetrics.java | 8 +-
.../tserver/metrics/TabletServerScanMetrics.java | 24 +-
.../accumulo/tserver/tablet/CompactableImpl.java | 9 +
.../test/conf/store/PropCacheCaffeineImplZkIT.java | 10 +-
.../accumulo/test/functional/ZombieTServer.java | 2 +-
.../apache/accumulo/test/metrics/MetricsIT.java | 16 +-
.../test/metrics/TestStatsDRegistryFactory.java | 6 +-
.../accumulo/test/performance/NullTserver.java | 2 +-
test/src/main/resources/log4j2-test.properties | 23 --
47 files changed, 1109 insertions(+), 609 deletions(-)
diff --git a/core/pom.xml b/core/pom.xml
index 05d9a02c69..5d70035fab 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -271,6 +271,7 @@
</includes>
<excludes />
<allows>
+
<allow>io[.]micrometer[.]core[.]instrument[.]MeterRegistry</allow>
<allow>io[.]opentelemetry[.]api[.]OpenTelemetry</allow>
<allow>org[.]apache[.]hadoop[.]io[.]Text</allow>
<allow>org[.]apache[.]accumulo[.]core[.]client[.].*</allow>
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 7d48c05ae1..6dc56205ba 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -322,8 +322,15 @@ public enum Property {
"Enables metrics functionality using Micrometer.", "2.1.0"),
GENERAL_MICROMETER_JVM_METRICS_ENABLED("general.micrometer.jvm.metrics.enabled",
"false",
PropertyType.BOOLEAN, "Enables JVM metrics functionality using
Micrometer.", "2.1.0"),
- GENERAL_MICROMETER_FACTORY("general.micrometer.factory", "",
PropertyType.CLASSNAME,
- "Name of class that implements MeterRegistryFactory.", "2.1.0"),
+ GENERAL_MICROMETER_FACTORY("general.micrometer.factory",
+ "org.apache.accumulo.core.spi.metrics.LoggingMeterRegistryFactory",
+ PropertyType.CLASSNAMELIST,
+ "A comma separated list of one or more class names that implements"
+ + " org.apache.accumulo.core.spi.metrics.MeterRegistryFactory. Prior
to"
+ + " 2.1.3 this was a single value and the default was an empty
string. In 2.1.3 the default"
+ + " was changed and it now can accept multiple class names. The
metrics spi was introduced in 2.1.3,"
+ + " the deprecated factory is
org.apache.accumulo.core.metrics.MeterRegistryFactory.",
+ "2.1.0"),
GENERAL_PROCESS_BIND_ADDRESS("general.process.bind.addr", "0.0.0.0",
PropertyType.STRING,
"The local IP address to which this server should bind for sending and
receiving network traffic.",
"3.0.0"),
diff --git
a/core/src/main/java/org/apache/accumulo/core/metrics/MeterRegistryFactory.java
b/core/src/main/java/org/apache/accumulo/core/metrics/MeterRegistryFactory.java
index acb4c4cbd2..e6fe10356c 100644
---
a/core/src/main/java/org/apache/accumulo/core/metrics/MeterRegistryFactory.java
+++
b/core/src/main/java/org/apache/accumulo/core/metrics/MeterRegistryFactory.java
@@ -20,6 +20,11 @@ package org.apache.accumulo.core.metrics;
import io.micrometer.core.instrument.MeterRegistry;
+/**
+ * @deprecated since 2.1.3; use {@link
org.apache.accumulo.core.spi.metrics.MeterRegistryFactory}
+ * instead
+ */
+@Deprecated()
public interface MeterRegistryFactory {
MeterRegistry create();
}
diff --git
a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsInfo.java
b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsInfo.java
new file mode 100644
index 0000000000..b6ef72b1df
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsInfo.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.metrics;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+import com.google.common.net.HostAndPort;
+
+import io.micrometer.core.instrument.MeterRegistry;
+import io.micrometer.core.instrument.Tag;
+
+public interface MetricsInfo {
+
+ /**
+ * Convenience method to create tag name / value pair for the instance name
+ *
+ * @param instanceName the instance name
+ */
+ static Tag instanceNameTag(final String instanceName) {
+ Objects.requireNonNull(instanceName,
+ "cannot create the tag without providing the instance name");
+ return Tag.of("instance.name", instanceName);
+ }
+
+ /**
+ * Convenience method to create tag name / value pair for the process name
+ *
+ * @param processName the process name
+ */
+ static Tag processTag(final String processName) {
+ Objects.requireNonNull(processName, "cannot create the tag without
providing the process name");
+ return Tag.of("process.name", processName);
+ }
+
+ /**
+ * Convenience method to create tag name / value pairs for the host and port
from address
+ * host:port pair.
+ *
+ * @param hostAndPort the host:port pair
+ */
+ static List<Tag> addressTags(final HostAndPort hostAndPort) {
+ Objects.requireNonNull(hostAndPort, "cannot create the tag without
providing the hostAndPort");
+ List<Tag> tags = new ArrayList<>(2);
+ tags.add(Tag.of("host", hostAndPort.getHost()));
+ int port = hostAndPort.getPort();
+ if (port != 0) {
+ tags.add(Tag.of("port", Integer.toString(hostAndPort.getPort())));
+ }
+ return Collections.unmodifiableList(tags);
+ }
+
+ boolean isMetricsEnabled();
+
+ /**
+ * Convenience method to set the common tags for application (process), host
and port.
+ *
+ * @param applicationName the application (process) name.
+ * @param hostAndPort the host:port pair
+ */
+ void addServiceTags(final String applicationName, final HostAndPort
hostAndPort);
+
+ /**
+ * Add the list of tag name / value pair to the common tags that will be
emitted with all metrics.
+ * Common tags must ne added before initialization of any registries. Tags
that are added after a
+ * registry is initialized may not be emitted by the underlying metrics
system. This would cause
+ * inconsistent grouping and filtering based on tags,
+ *
+ * @param updates list of tags (name / value pairs)
+ */
+ void addCommonTags(final List<Tag> updates);
+
+ /**
+ * Get the current list of common tags.
+ */
+ Collection<Tag> getCommonTags();
+
+ void addRegistry(MeterRegistry registry);
+
+ void addMetricsProducers(MetricsProducer... producer);
+
+ /**
+ * Initialize the metrics system. This sets the list of common tags that are
emitted with the
+ * metrics.
+ */
+ void init();
+
+ MeterRegistry getRegistry();
+
+ /**
+ * Close the underlying registry and release resources. The registry will
not accept new meters
+ * and will stop publishing metrics.
+ */
+ void close();
+}
diff --git
a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
index 7aea9a7a6c..e50d2d9c03 100644
--- a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
+++ b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
@@ -556,42 +556,6 @@ import io.micrometer.core.instrument.MeterRegistry;
* <td>Distribution Summary</td>
* <td></td>
* </tr>
- * <!-- ZooKeeper property cache -->
- * <tr>
- * <td>N/A</td>
- * <td>N/A</td>
- * <td>{@value #METRICS_PROPSTORE_LOAD_TIMER}</td>
- * <td>Timer</td>
- * <td></td>
- * </tr>
- * <tr>
- * <td>N/A</td>
- * <td>N/A</td>
- * <td>{@value #METRICS_PROPSTORE_REFRESH_COUNT}</td>
- * <td>Counter</td>
- * <td></td>
- * </tr>
- * <tr>
- * <td>N/A</td>
- * <td>N/A</td>
- * <td>{@value #METRICS_PROPSTORE_REFRESH_LOAD_COUNT}</td>
- * <td>Counter</td>
- * <td></td>
- * </tr>
- * <tr>
- * <td>N/A</td>
- * <td>N/A</td>
- * <td>{@value #METRICS_PROPSTORE_EVICTION_COUNT}</td>
- * <td>Counter</td>
- * <td></td>
- * </tr>
- * <tr>
- * <td>N/A</td>
- * <td>N/A</td>
- * <td>{@value #METRICS_PROPSTORE_ZK_ERROR_COUNT}</td>
- * <td>Counter</td>
- * <td></td>
- * </tr>
* </table>
*
* @since 2.1.0
@@ -683,13 +647,6 @@ public interface MetricsProducer {
String METRICS_UPDATE_WALOG_WRITE = METRICS_UPDATE_PREFIX + "walog.write";
String METRICS_UPDATE_MUTATION_ARRAY_SIZE = METRICS_UPDATE_PREFIX +
"mutation.arrays.size";
- String METRICS_PROPSTORE_PREFIX = "accumulo.prop.store.";
- String METRICS_PROPSTORE_LOAD_TIMER = METRICS_PROPSTORE_PREFIX + "load";
- String METRICS_PROPSTORE_REFRESH_COUNT = METRICS_PROPSTORE_PREFIX +
"refresh";
- String METRICS_PROPSTORE_REFRESH_LOAD_COUNT = METRICS_PROPSTORE_PREFIX +
"refresh.load";
- String METRICS_PROPSTORE_EVICTION_COUNT = METRICS_PROPSTORE_PREFIX +
"evictions";
- String METRICS_PROPSTORE_ZK_ERROR_COUNT = METRICS_PROPSTORE_PREFIX +
"zookeeper.error";
-
/**
* Build Micrometer Meter objects and register them with the registry
*/
diff --git
a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsUtil.java
b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsUtil.java
deleted file mode 100644
index fe24c93899..0000000000
--- a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsUtil.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.accumulo.core.metrics;
-
-import java.lang.reflect.InvocationTargetException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-
-import org.apache.accumulo.core.classloader.ClassLoaderUtil;
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
-import org.apache.accumulo.core.conf.Property;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.net.HostAndPort;
-
-import io.micrometer.core.instrument.MeterRegistry;
-import io.micrometer.core.instrument.Metrics;
-import io.micrometer.core.instrument.Tag;
-import io.micrometer.core.instrument.binder.jvm.ClassLoaderMetrics;
-import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics;
-import io.micrometer.core.instrument.binder.jvm.JvmGcMetrics;
-import io.micrometer.core.instrument.binder.jvm.JvmMemoryMetrics;
-import io.micrometer.core.instrument.binder.jvm.JvmThreadMetrics;
-import io.micrometer.core.instrument.binder.system.ProcessorMetrics;
-
-public class MetricsUtil {
-
- private static final Logger LOG = LoggerFactory.getLogger(MetricsUtil.class);
-
- private static JvmGcMetrics gc;
- private static List<Tag> commonTags;
-
- public static void initializeMetrics(final AccumuloConfiguration conf, final
String appName,
- final HostAndPort address, final String instanceName) throws
ClassNotFoundException,
- InstantiationException, IllegalAccessException, IllegalArgumentException,
- InvocationTargetException, NoSuchMethodException, SecurityException {
- initializeMetrics(conf.getBoolean(Property.GENERAL_MICROMETER_ENABLED),
- conf.getBoolean(Property.GENERAL_MICROMETER_JVM_METRICS_ENABLED),
- conf.get(Property.GENERAL_MICROMETER_FACTORY), appName, address,
instanceName);
- }
-
- private static void initializeMetrics(boolean enabled, boolean
jvmMetricsEnabled,
- String factoryClass, String appName, HostAndPort address, String
instanceName)
- throws ClassNotFoundException, InstantiationException,
IllegalAccessException,
- IllegalArgumentException, InvocationTargetException,
NoSuchMethodException,
- SecurityException {
-
- LOG.info("initializing metrics, enabled:{}, class:{}", enabled,
factoryClass);
-
- if (enabled && factoryClass != null && !factoryClass.isEmpty()) {
-
- String processName = appName;
- String serviceInstance =
System.getProperty("accumulo.metrics.service.instance", "");
- if (!serviceInstance.isBlank()) {
- processName += serviceInstance;
- }
-
- List<Tag> tags = new ArrayList<>();
- tags.add(Tag.of("instance.name", instanceName));
- tags.add(Tag.of("process.name", processName));
-
- if (address != null) {
- if (!address.getHost().isEmpty()) {
- tags.add(Tag.of("host", address.getHost()));
- }
- if (address.getPort() > 0) {
- tags.add(Tag.of("port", Integer.toString(address.getPort())));
- }
- }
-
- commonTags = Collections.unmodifiableList(tags);
-
- Class<? extends MeterRegistryFactory> clazz =
- ClassLoaderUtil.loadClass(factoryClass, MeterRegistryFactory.class);
- MeterRegistryFactory factory =
clazz.getDeclaredConstructor().newInstance();
-
- MeterRegistry registry = factory.create();
- registry.config().commonTags(commonTags);
- Metrics.addRegistry(registry);
-
- if (jvmMetricsEnabled) {
- new ClassLoaderMetrics(commonTags).bindTo(Metrics.globalRegistry);
- new JvmMemoryMetrics(commonTags).bindTo(Metrics.globalRegistry);
- gc = new JvmGcMetrics(commonTags);
- gc.bindTo(Metrics.globalRegistry);
- new ProcessorMetrics(commonTags).bindTo(Metrics.globalRegistry);
- new JvmThreadMetrics(commonTags).bindTo(Metrics.globalRegistry);
- }
- }
- }
-
- public static void initializeProducers(MetricsProducer... producer) {
- for (MetricsProducer p : producer) {
- p.registerMetrics(Metrics.globalRegistry);
- LOG.info("Metric producer {} initialize", p.getClass().getSimpleName());
- }
- }
-
- public static void addExecutorServiceMetrics(ExecutorService executor,
String name) {
- new ExecutorServiceMetrics(executor, name,
commonTags).bindTo(Metrics.globalRegistry);
- }
-
- public static List<Tag> getCommonTags() {
- return commonTags;
- }
-
- public static void close() {
- if (gc != null) {
- gc.close();
- }
- }
-
-}
diff --git
a/core/src/main/java/org/apache/accumulo/core/spi/metrics/LoggingMeterRegistryFactory.java
b/core/src/main/java/org/apache/accumulo/core/spi/metrics/LoggingMeterRegistryFactory.java
new file mode 100644
index 0000000000..2e24726e86
--- /dev/null
+++
b/core/src/main/java/org/apache/accumulo/core/spi/metrics/LoggingMeterRegistryFactory.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.spi.metrics;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.micrometer.core.instrument.MeterRegistry;
+import io.micrometer.core.instrument.logging.LoggingMeterRegistry;
+import io.micrometer.core.instrument.logging.LoggingRegistryConfig;
+
+/**
+ * Example implementation of enabling a metrics provider by implementing the
+ * {@link org.apache.accumulo.core.spi.metrics.MeterRegistryFactory}
interface. When enabled though
+ * properties by enabling {@code Property.GENERAL_MICROMETER_ENABLED} and
providing this class for
+ * the {@code Property.GENERAL_MICROMETER_FACTORY}
+ * <p>
+ * The metrics will appear in the normal service logs with a named logger of
+ * {@code org.apache.accumulo.METRICS} at the INFO level. The metrics output
can be directed to a
+ * file using standard logging configuration properties by configuring the
log4j2-service.properties
+ * file.
+ * <p>
+ * Properties can be passed in the Accumulo properties files using the prefix
+ * {@code general.custom.metrics.opts}
+ * <p>
+ * For example, the default polling rate is 60 sec. To modify the update
frequency set
+ * {@code general.custom.metrics.opts.logging.step} in the Accumulo
configuration.
+ *
+ * <pre>
+ * general.custom.metrics.opts.logging.step = 10s
+ * </pre>
+ */
+public class LoggingMeterRegistryFactory implements MeterRegistryFactory {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(LoggingMeterRegistryFactory.class);
+
+ // named logger that can be configured using standard logging properties.
+ private static final Logger METRICS =
LoggerFactory.getLogger("org.apache.accumulo.METRICS");
+
+ public LoggingMeterRegistryFactory() {
+ // needed for classloader
+ }
+
+ @Override
+ public MeterRegistry create(final InitParameters params) {
+ final Consumer<String> metricConsumer = METRICS::info;
+ final Map<String,String> metricsProps = new HashMap<>();
+
+ // defines the metrics update period, default is 60 seconds.
+ final LoggingRegistryConfig lconf = c -> {
+ if (c.equals("logging.step")) {
+ return metricsProps.getOrDefault("logging.step", "60s");
+ }
+ return null;
+ };
+
+ LOG.info("Creating logging metrics registry with params: {}", params);
+ metricsProps.putAll(params.getOptions());
+ return
LoggingMeterRegistry.builder(lconf).loggingSink(metricConsumer).build();
+ }
+}
diff --git
a/core/src/main/java/org/apache/accumulo/core/spi/metrics/MeterRegistryFactory.java
b/core/src/main/java/org/apache/accumulo/core/spi/metrics/MeterRegistryFactory.java
new file mode 100644
index 0000000000..3e57cc200a
--- /dev/null
+++
b/core/src/main/java/org/apache/accumulo/core/spi/metrics/MeterRegistryFactory.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.spi.metrics;
+
+import java.util.Map;
+
+import org.apache.accumulo.core.spi.common.ServiceEnvironment;
+
+import io.micrometer.core.instrument.MeterRegistry;
+
+/**
+ * The Micrometer metrics allows for different monitoring systems. and can be
enabled within
+ * Accumulo with properties and are initialized by implementing this interface
and providing the
+ * factory implementation clas name as a property. Metrics are specified with
the following
+ * properties:
+ * <p>
+ * Property.GENERAL_MICROMETER_ENABLED = true
+ * <p>
+ * Property.GENERAL_MICROMETER_FACTORY = [implementation].class.getName()
+ *
+ * @since 2.1.3
+ */
+public interface MeterRegistryFactory {
+ // full form in property file is "general.custom.metrics.opts"
+ String METRICS_PROP_SUBSTRING = "metrics.opts.";
+
+ interface InitParameters {
+ /**
+ * Get the configured metrics properties passed as {@code
general.custom.metrics.opts} The
+ * returned map is the stripped names with {@code
general.custom.metrics.opts} removed.
+ * <p>
+ * For example properties {@code general.custom.metrics.opts.prop1=abc} and
+ * {@code general.custom.metrics.opts.prop9=123} are set, then this map
would contain
+ * {@code prop1=abc} and {@code prop9=123}.
+ *
+ * @return a map of property name, value pairs, stripped of a prefix.
+ */
+ Map<String,String> getOptions();
+
+ /**
+ * Optional extension point to pass additional information though the
ServiceEnvironment.
+ *
+ * @return the service environment
+ */
+ ServiceEnvironment getServiceEnv();
+ }
+
+ /**
+ * Called on metrics initialization. Implementations should note the initial
parameters set when
+ * instantiating a MeterRegistry should be considered fixed. Once a
MeterRegistry is initialized
+ * parameters such as common tags may not be updated with later additions or
changes.
+ *
+ * @return a Micrometer registry that will be added to the metrics
configuration.
+ */
+ MeterRegistry create(final InitParameters params);
+}
diff --git
a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
index 334fb46a53..324fafebc8 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
@@ -31,6 +31,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledFuture;
@@ -42,7 +43,6 @@ import java.util.function.IntSupplier;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.metrics.MetricsUtil;
import org.apache.accumulo.core.trace.TraceUtil;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.slf4j.Logger;
@@ -51,6 +51,8 @@ import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import io.micrometer.core.instrument.Metrics;
+import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics;
@SuppressFBWarnings(value = "RV_EXCEPTION_NOT_THROWN",
justification = "Throwing Error for it to be caught by
AccumuloUncaughtExceptionHandler")
@@ -439,16 +441,30 @@ public class ThreadPools {
}
/**
- * When set to true will emit metrics and register the metrics in a static
registry. After the
- * thread pool is deleted, there will still be metrics objects related to
it in the static
- * registry. There is no way to clean these leftover objects up therefore
its recommended that
- * this option only be set true for long-lived thread pools. Creating lots
of short-lived thread
- * pools and registering them can lead to out of memory errors over long
time periods.
+ * When set to true will emit metrics and register the metrics in a
registry. After the thread
+ * pool is deleted, there will still be metrics objects related to it in
the static registry.
+ * There is no way to clean these leftover objects up therefore its
recommended that this option
+ * only be set true for long-lived thread pools. Creating lots of
short-lived thread pools and
+ * registering them can lead to out of memory errors over long time
periods.
*
* @return a fluent-style builder instance
*/
public ThreadPoolExecutorBuilder enableThreadPoolMetrics() {
- this.emitThreadPoolMetrics = true;
+ return enableThreadPoolMetrics(true);
+ }
+
+ /**
+ * Optionally set to register pool metrics. When set to true will emit
metrics and register the
+ * metrics in a registry. After the thread pool is deleted, there will
still be metrics objects
+ * related to it in the static registry. There is no way to clean these
leftover objects up
+ * therefore its recommended that this option only be set true for
long-lived thread pools.
+ * Creating lots of short-lived thread pools and registering them can lead
to out of memory
+ * errors over long time periods.
+ *
+ * @return a fluent-style builder instance
+ */
+ public ThreadPoolExecutorBuilder enableThreadPoolMetrics(final boolean
enable) {
+ this.emitThreadPoolMetrics = enable;
return this;
}
}
@@ -513,7 +529,7 @@ public class ThreadPools {
result.allowCoreThreadTimeOut(true);
}
if (emitThreadPoolMetrics) {
- MetricsUtil.addExecutorServiceMetrics(result, name);
+ ThreadPools.addExecutorServiceMetrics(result, name);
}
return result;
}
@@ -618,9 +634,13 @@ public class ThreadPools {
};
if (emitThreadPoolMetrics) {
- MetricsUtil.addExecutorServiceMetrics(result, name);
+ ThreadPools.addExecutorServiceMetrics(result, name);
}
return result;
}
+ private static void addExecutorServiceMetrics(ExecutorService executor,
String name) {
+ new ExecutorServiceMetrics(executor, name,
List.of()).bindTo(Metrics.globalRegistry);
+ }
+
}
diff --git
a/core/src/test/java/org/apache/accumulo/core/spi/metrics/LoggingMeterRegistryFactoryTest.java
b/core/src/test/java/org/apache/accumulo/core/spi/metrics/LoggingMeterRegistryFactoryTest.java
new file mode 100644
index 0000000000..9d628d2775
--- /dev/null
+++
b/core/src/test/java/org/apache/accumulo/core/spi/metrics/LoggingMeterRegistryFactoryTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.spi.metrics;
+
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+
+import java.util.Map;
+
+import org.apache.accumulo.core.spi.common.ServiceEnvironment;
+import org.junit.jupiter.api.Test;
+
+import io.micrometer.core.instrument.logging.LoggingMeterRegistry;
+
+class LoggingMeterRegistryFactoryTest {
+
+ @Test
+ public void createTest() {
+ LoggingMeterRegistryFactory factory = new LoggingMeterRegistryFactory();
+ var reg = factory.create(new LoggingMetricsParams());
+ assertInstanceOf(LoggingMeterRegistry.class, reg);
+ }
+
+ private static class LoggingMetricsParams implements
MeterRegistryFactory.InitParameters {
+
+ @Override
+ public Map<String,String> getOptions() {
+ // note: general.custom.metrics.opts. is expected to be stripped before
passing the options.
+ return Map.of("prop1", "abc", "logging.step", "1s");
+ }
+
+ @Override
+ public ServiceEnvironment getServiceEnv() {
+ return null;
+ }
+ }
+}
diff --git
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java
index efa6343868..b133280b84 100644
---
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java
+++
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java
@@ -147,6 +147,9 @@ public class MiniAccumuloConfigImpl {
mergeProp(Property.INSTANCE_SECRET.getKey(), DEFAULT_INSTANCE_SECRET);
}
+ // enable metrics reporting - by default will appear in standard log
files.
+ mergeProp(Property.GENERAL_MICROMETER_ENABLED.getKey(), "true");
+
mergeProp(Property.TSERV_PORTSEARCH.getKey(), "true");
mergeProp(Property.TSERV_DATACACHE_SIZE.getKey(), "10M");
mergeProp(Property.TSERV_INDEXCACHE_SIZE.getKey(), "10M");
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java
b/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java
index 8966687983..bd66689d0f 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java
@@ -28,7 +28,6 @@ import org.apache.accumulo.core.cli.ConfigOpts;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.metrics.MetricsProducer;
-import org.apache.accumulo.core.metrics.MetricsUtil;
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.server.mem.LowMemoryDetector;
@@ -116,9 +115,11 @@ public abstract class AbstractServer implements
AutoCloseable, MetricsProducer,
return getContext().getConfiguration();
}
- @Override
- public void close() {
- MetricsUtil.close();
+ public String getApplicationName() {
+ return applicationName;
}
+ @Override
+ public void close() {}
+
}
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java
b/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java
index e58645d00b..bf75821828 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java
@@ -53,6 +53,7 @@ import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.fate.zookeeper.ZooReader;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metrics.MetricsInfo;
import org.apache.accumulo.core.rpc.SslConnectionParams;
import org.apache.accumulo.core.singletons.SingletonReservation;
import org.apache.accumulo.core.spi.crypto.CryptoServiceFactory;
@@ -67,6 +68,7 @@ import
org.apache.accumulo.server.conf.store.impl.ZooPropStore;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.mem.LowMemoryDetector;
import org.apache.accumulo.server.metadata.ServerAmpleImpl;
+import org.apache.accumulo.server.metrics.MetricsInfoImpl;
import org.apache.accumulo.server.rpc.SaslServerConnectionParams;
import org.apache.accumulo.server.rpc.ThriftServerType;
import org.apache.accumulo.server.security.AuditedSecurityOperation;
@@ -103,6 +105,7 @@ public class ServerContext extends ClientContext {
private final Supplier<AuditedSecurityOperation> securityOperation;
private final Supplier<CryptoServiceFactory> cryptoFactorySupplier;
private final Supplier<LowMemoryDetector> lowMemoryDetector;
+ private final Supplier<MetricsInfo> metricsInfoSupplier;
public ServerContext(SiteConfiguration siteConfig) {
this(new ServerInfo(siteConfig));
@@ -129,6 +132,7 @@ public class ServerContext extends ClientContext {
memoize(() -> new AuditedSecurityOperation(this,
SecurityOperation.getAuthorizor(this),
SecurityOperation.getAuthenticator(this),
SecurityOperation.getPermHandler(this)));
lowMemoryDetector = memoize(() -> new LowMemoryDetector());
+ metricsInfoSupplier = memoize(() -> new MetricsInfoImpl(this));
}
/**
@@ -459,4 +463,13 @@ public class ServerContext extends ClientContext {
return lowMemoryDetector.get();
}
+ public MetricsInfo getMetricsInfo() {
+ return metricsInfoSupplier.get();
+ }
+
+ @Override
+ public void close() {
+ getMetricsInfo().close();
+ super.close();
+ }
}
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/compaction/PausedCompactionMetrics.java
b/server/base/src/main/java/org/apache/accumulo/server/compaction/PausedCompactionMetrics.java
index b8731cda08..3d1a07be42 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/compaction/PausedCompactionMetrics.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/compaction/PausedCompactionMetrics.java
@@ -19,7 +19,6 @@
package org.apache.accumulo.server.compaction;
import org.apache.accumulo.core.metrics.MetricsProducer;
-import org.apache.accumulo.core.metrics.MetricsUtil;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
@@ -40,9 +39,9 @@ public class PausedCompactionMetrics implements
MetricsProducer {
@Override
public void registerMetrics(MeterRegistry registry) {
majcPauses = Counter.builder(METRICS_MAJC_PAUSED).description("major
compaction pause count")
- .tags(MetricsUtil.getCommonTags()).register(registry);
+ .register(registry);
mincPauses = Counter.builder(METRICS_MINC_PAUSED).description("minor
compactor pause count")
- .tags(MetricsUtil.getCommonTags()).register(registry);
+ .register(registry);
}
}
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/PropCacheCaffeineImpl.java
b/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/PropCacheCaffeineImpl.java
index 5c42d0c0a3..d55c9465b1 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/PropCacheCaffeineImpl.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/PropCacheCaffeineImpl.java
@@ -48,13 +48,10 @@ public class PropCacheCaffeineImpl implements PropCache {
ThreadPools.getServerThreadPools().getPoolBuilder("caffeine-tasks").numCoreThreads(1)
.numMaxThreads(20).withTimeOut(60L, SECONDS).build();
- private final PropStoreMetrics metrics;
-
private final LoadingCache<PropStoreKey<?>,VersionedProperties> cache;
private PropCacheCaffeineImpl(final
CacheLoader<PropStoreKey<?>,VersionedProperties> cacheLoader,
- final PropStoreMetrics metrics, final Ticker ticker, boolean
runTasksInline) {
- this.metrics = metrics;
+ final Ticker ticker, boolean runTasksInline) {
var builder = Caffeine.newBuilder().expireAfterAccess(EXPIRE_MIN,
BASE_TIME_UNITS)
.evictionListener(this::evictionNotifier);
if (runTasksInline) {
@@ -68,14 +65,9 @@ public class PropCacheCaffeineImpl implements PropCache {
cache = builder.build(cacheLoader);
}
- public PropStoreMetrics getMetrics() {
- return metrics;
- }
-
void evictionNotifier(PropStoreKey<?> propStoreKey, VersionedProperties
value,
RemovalCause cause) {
log.trace("Evicted: ID: {} was evicted from cache. Reason: {}",
propStoreKey, cause);
- metrics.incrEviction();
}
@Override
@@ -85,7 +77,6 @@ public class PropCacheCaffeineImpl implements PropCache {
return cache.get(propStoreKey);
} catch (Exception ex) {
log.info("Cache failed to retrieve properties for: " + propStoreKey, ex);
- metrics.incrZkError();
return null;
}
}
@@ -116,20 +107,17 @@ public class PropCacheCaffeineImpl implements PropCache {
}
public static class Builder {
-
- private final PropStoreMetrics metrics;
private final ZooPropLoader zooPropLoader;
private Ticker ticker = null;
private boolean runTasksInline = false;
- public Builder(final ZooPropLoader zooPropLoader, final PropStoreMetrics
metrics) {
+ public Builder(final ZooPropLoader zooPropLoader) {
Objects.requireNonNull(zooPropLoader, "A PropStoreChangeMonitor must be
provided");
this.zooPropLoader = zooPropLoader;
- this.metrics = metrics;
}
public PropCacheCaffeineImpl build() {
- return new PropCacheCaffeineImpl(zooPropLoader, metrics, ticker,
runTasksInline);
+ return new PropCacheCaffeineImpl(zooPropLoader, ticker, runTasksInline);
}
public Builder forTests(final Ticker ticker) {
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/PropStoreMetrics.java
b/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/PropStoreMetrics.java
deleted file mode 100644
index a85c14eec2..0000000000
---
a/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/PropStoreMetrics.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.accumulo.server.conf.store.impl;
-
-import java.time.Duration;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.accumulo.core.metrics.MetricsProducer;
-import org.apache.accumulo.core.metrics.MetricsUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import io.micrometer.core.instrument.Counter;
-import io.micrometer.core.instrument.MeterRegistry;
-import io.micrometer.core.instrument.Timer;
-
-public class PropStoreMetrics implements MetricsProducer {
-
- private static final Logger log =
LoggerFactory.getLogger(PropStoreMetrics.class);
-
- private Timer load;
- private Counter refresh;
- private Counter refreshLoad;
- private Counter eviction;
- private Counter zkError;
-
- @Override
- public void registerMetrics(MeterRegistry registry) {
-
- load = Timer.builder(METRICS_PROPSTORE_LOAD_TIMER).description("prop store
load time")
- .tags(MetricsUtil.getCommonTags()).register(registry);
-
- refresh =
- Counter.builder(METRICS_PROPSTORE_REFRESH_COUNT).description("prop
store refresh count")
- .tags(MetricsUtil.getCommonTags()).register(registry);
-
- refreshLoad = Counter.builder(METRICS_PROPSTORE_REFRESH_LOAD_COUNT)
- .description("prop store refresh load
count").tags(MetricsUtil.getCommonTags())
- .register(registry);
-
- eviction =
- Counter.builder(METRICS_PROPSTORE_EVICTION_COUNT).description("prop
store eviction count")
- .tags(MetricsUtil.getCommonTags()).register(registry);
-
- zkError = Counter.builder(METRICS_PROPSTORE_ZK_ERROR_COUNT)
- .description("prop store ZooKeeper error
count").tags(MetricsUtil.getCommonTags())
- .register(registry);
-
- }
-
- public PropStoreMetrics() {
- log.debug("Creating PropStore metrics");
- }
-
- public void addLoadTime(final long value) {
- log.trace("Load time: {}", value);
- load.record(Duration.ofMillis(value));
- log.trace("Load count: {} time:{}", load.count(),
load.totalTime(TimeUnit.MILLISECONDS));
- }
-
- public void incrRefresh() {
- refresh.increment();
- }
-
- public void incrRefreshLoad() {
- refreshLoad.increment();
- }
-
- public void incrEviction() {
- eviction.increment();
- }
-
- public void incrZkError() {
- zkError.increment();
- }
-}
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/ZooPropLoader.java
b/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/ZooPropLoader.java
index 48620f6a92..c5842e5b66 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/ZooPropLoader.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/ZooPropLoader.java
@@ -22,7 +22,6 @@ import static java.util.Objects.requireNonNull;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.server.conf.codec.VersionedPropCodec;
@@ -44,14 +43,12 @@ public class ZooPropLoader implements
CacheLoader<PropStoreKey<?>,VersionedPrope
private final VersionedPropCodec propCodec;
// used to set watcher, does not react to events.
private final PropStoreWatcher propStoreWatcher;
- private final PropStoreMetrics metrics;
public ZooPropLoader(final ZooReaderWriter zrw, final VersionedPropCodec
propCodec,
- final PropStoreWatcher propStoreWatcher, final PropStoreMetrics metrics)
{
+ final PropStoreWatcher propStoreWatcher) {
this.zrw = zrw;
this.propCodec = propCodec;
this.propStoreWatcher = propStoreWatcher;
- this.metrics = metrics;
}
@Override
@@ -59,26 +56,18 @@ public class ZooPropLoader implements
CacheLoader<PropStoreKey<?>,VersionedPrope
try {
log.trace("load called for {}", propStoreKey);
- long startNanos = System.nanoTime();
-
Stat stat = new Stat();
byte[] bytes = zrw.getData(propStoreKey.getPath(), propStoreWatcher,
stat);
if (stat.getDataLength() == 0) {
return new VersionedProperties();
}
VersionedProperties vProps = propCodec.fromBytes(stat.getVersion(),
bytes);
-
- metrics.addLoadTime(
- TimeUnit.MILLISECONDS.convert(System.nanoTime() - startNanos,
TimeUnit.NANOSECONDS));
-
return vProps;
} catch (KeeperException.NoNodeException ex) {
- metrics.incrZkError();
log.debug("property node for {} does not exist - it may be being
created", propStoreKey);
propStoreWatcher.signalZkChangeEvent(propStoreKey);
return null;
} catch (Exception ex) {
- metrics.incrZkError();
log.info("Failed to load properties for: {} from ZooKeeper, returning
null", propStoreKey,
ex);
propStoreWatcher.signalZkChangeEvent(propStoreKey);
@@ -97,8 +86,6 @@ public class ZooPropLoader implements
CacheLoader<PropStoreKey<?>,VersionedPrope
public CompletableFuture<VersionedProperties> asyncReload(PropStoreKey<?>
propStoreKey,
VersionedProperties oldValue, Executor executor) throws Exception {
log.trace("asyncReload called for key: {}", propStoreKey);
- metrics.incrRefresh();
-
return CompletableFuture.supplyAsync(() ->
loadIfDifferentVersion(propStoreKey, oldValue),
executor);
}
@@ -107,7 +94,6 @@ public class ZooPropLoader implements
CacheLoader<PropStoreKey<?>,VersionedPrope
public @Nullable VersionedProperties reload(PropStoreKey<?> propStoreKey,
VersionedProperties oldValue) throws Exception {
log.trace("reload called for: {}", propStoreKey);
- metrics.incrRefresh();
return loadIfDifferentVersion(propStoreKey, oldValue);
}
@@ -141,8 +127,6 @@ public class ZooPropLoader implements
CacheLoader<PropStoreKey<?>,VersionedPrope
var updatedValue = load(propCacheId);
- metrics.incrRefreshLoad();
-
// The cache will be updated - notify external listeners value changed.
propStoreWatcher.signalCacheChangeEvent(propCacheId);
log.trace("Updated value {}", updatedValue == null ? "null" :
updatedValue.print(true));
@@ -150,7 +134,6 @@ public class ZooPropLoader implements
CacheLoader<PropStoreKey<?>,VersionedPrope
} catch (RuntimeException | KeeperException | InterruptedException ex) {
log.warn("async exception occurred reading properties from ZooKeeper
for: {} returning null",
propCacheId, ex);
- metrics.incrZkError();
propStoreWatcher.signalZkChangeEvent(propCacheId);
return null;
}
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/ZooPropStore.java
b/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/ZooPropStore.java
index 881f394107..9f733e8495 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/ZooPropStore.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/ZooPropStore.java
@@ -31,7 +31,6 @@ import org.apache.accumulo.core.data.InstanceId;
import org.apache.accumulo.core.fate.zookeeper.ZooReader;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
-import org.apache.accumulo.core.metrics.MetricsUtil;
import org.apache.accumulo.server.conf.codec.VersionedPropCodec;
import org.apache.accumulo.server.conf.codec.VersionedProperties;
import org.apache.accumulo.server.conf.store.PropCache;
@@ -55,7 +54,6 @@ public class ZooPropStore implements PropStore,
PropChangeListener {
private final ZooReaderWriter zrw;
private final PropStoreWatcher propStoreWatcher;
private final PropCacheCaffeineImpl cache;
- private final PropStoreMetrics cacheMetrics = new PropStoreMetrics();
private final ReadyMonitor zkReadyMon;
/**
@@ -89,17 +87,14 @@ public class ZooPropStore implements PropStore,
PropChangeListener {
this.propStoreWatcher = requireNonNullElseGet(watcher, () -> new
PropStoreWatcher(zkReadyMon));
- ZooPropLoader propLoader = new ZooPropLoader(zrw, codec,
this.propStoreWatcher, cacheMetrics);
+ ZooPropLoader propLoader = new ZooPropLoader(zrw, codec,
this.propStoreWatcher);
if (ticker == null) {
- this.cache = new PropCacheCaffeineImpl.Builder(propLoader,
cacheMetrics).build();
+ this.cache = new PropCacheCaffeineImpl.Builder(propLoader).build();
} else {
- this.cache =
- new PropCacheCaffeineImpl.Builder(propLoader,
cacheMetrics).forTests(ticker).build();
+ this.cache = new
PropCacheCaffeineImpl.Builder(propLoader).forTests(ticker).build();
}
- MetricsUtil.initializeProducers(cacheMetrics);
-
try {
var path = ZooUtil.getRoot(instanceId);
if (zrw.exists(path, propStoreWatcher)) {
@@ -139,10 +134,6 @@ public class ZooPropStore implements PropStore,
PropChangeListener {
return false;
}
- public PropStoreMetrics getMetrics() {
- return cacheMetrics;
- }
-
@Override
public void create(PropStoreKey<?> propStoreKey, Map<String,String> props) {
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/metrics/MeterRegistryEnvPropImpl.java
b/server/base/src/main/java/org/apache/accumulo/server/metrics/MeterRegistryEnvPropImpl.java
new file mode 100644
index 0000000000..f3b447defe
--- /dev/null
+++
b/server/base/src/main/java/org/apache/accumulo/server/metrics/MeterRegistryEnvPropImpl.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.metrics;
+
+import static
org.apache.accumulo.core.conf.Property.GENERAL_ARBITRARY_PROP_PREFIX;
+import static
org.apache.accumulo.core.spi.metrics.MeterRegistryFactory.METRICS_PROP_SUBSTRING;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.accumulo.core.spi.common.ServiceEnvironment;
+import org.apache.accumulo.core.spi.metrics.MeterRegistryFactory;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.ServiceEnvironmentImpl;
+
+/**
+ * Provides a way to pass parameters from an Accumulo configuration to the
MeterRegistryFactory.
+ * Properties need have the form {@code
general.custom.metrics.opts.PARAMETER_NAME = VALUE}. The
+ * prefix {@code general.custom.metrics.opts.} is stripped and the resulting
Map returned by
+ * {@link #getOptions()} will be map of {@code PROPERTY_NAME, VALUE} key value
pairs.
+ * <p>
+ * Other implementations can extend
+ * {@link
org.apache.accumulo.core.spi.metrics.MeterRegistryFactory.InitParameters} to
provide other
+ * implementations
+ */
+public class MeterRegistryEnvPropImpl implements
MeterRegistryFactory.InitParameters {
+
+ private final ServerContext context;
+
+ public MeterRegistryEnvPropImpl(final ServerContext context) {
+ this.context = context;
+ }
+
+ /**
+ * Properties that match {@code general.custom.metrics.opts.PARAMETER_NAME =
VALUE} with be
+ * filtered and returned with the prefix stripped.
+ *
+ * @return a map of the filtered, stripped property, value pairs.
+ */
+ @Override
+ public Map<String,String> getOptions() {
+ Map<String,String> filtered = new HashMap<>();
+
+ Map<String,String> options = context.getConfiguration()
+ .getAllPropertiesWithPrefixStripped(GENERAL_ARBITRARY_PROP_PREFIX);
+ options.forEach((k, v) -> {
+ if (k.startsWith(METRICS_PROP_SUBSTRING)) {
+ String name = k.substring(METRICS_PROP_SUBSTRING.length());
+ filtered.put(name, v);
+ }
+ });
+ return filtered;
+ }
+
+ @Override
+ public ServiceEnvironment getServiceEnv() {
+ return new ServiceEnvironmentImpl(context);
+ }
+}
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/metrics/MetricsInfoImpl.java
b/server/base/src/main/java/org/apache/accumulo/server/metrics/MetricsInfoImpl.java
new file mode 100644
index 0000000000..988bf38f63
--- /dev/null
+++
b/server/base/src/main/java/org/apache/accumulo/server/metrics/MetricsInfoImpl.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.metrics;
+
+import static org.apache.hadoop.util.StringUtils.getTrimmedStrings;
+
+import java.lang.reflect.InvocationTargetException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.accumulo.core.classloader.ClassLoaderUtil;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.metrics.MetricsInfo;
+import org.apache.accumulo.core.metrics.MetricsProducer;
+import org.apache.accumulo.server.ServerContext;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.net.HostAndPort;
+
+import io.micrometer.core.instrument.Meter;
+import io.micrometer.core.instrument.MeterRegistry;
+import io.micrometer.core.instrument.Metrics;
+import io.micrometer.core.instrument.Tag;
+import io.micrometer.core.instrument.binder.jvm.ClassLoaderMetrics;
+import io.micrometer.core.instrument.binder.jvm.JvmGcMetrics;
+import io.micrometer.core.instrument.binder.jvm.JvmMemoryMetrics;
+import io.micrometer.core.instrument.binder.jvm.JvmThreadMetrics;
+import io.micrometer.core.instrument.binder.system.ProcessorMetrics;
+import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
+import io.micrometer.core.instrument.config.MeterFilter;
+import io.micrometer.core.instrument.distribution.DistributionStatisticConfig;
+
+public class MetricsInfoImpl implements MetricsInfo {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(MetricsInfoImpl.class);
+
+ private final ServerContext context;
+
+ private final Lock lock = new ReentrantLock();
+
+ private final Map<String,Tag> commonTags;
+
+ // JvmGcMetrics are declared with AutoCloseable - keep reference to use with
close()
+ private JvmGcMetrics jvmGcMetrics;
+
+ private final boolean metricsEnabled;
+
+ private CompositeMeterRegistry composite = null;
+ private final List<MeterRegistry> pendingRegistries = new ArrayList<>();
+
+ private final List<MetricsProducer> producers = new ArrayList<>();
+
+ public MetricsInfoImpl(final ServerContext context) {
+ this.context = context;
+ metricsEnabled =
context.getConfiguration().getBoolean(Property.GENERAL_MICROMETER_ENABLED);
+ printMetricsConfig();
+ commonTags = new HashMap<>();
+ Tag t = MetricsInfo.instanceNameTag(context.getInstanceName());
+ commonTags.put(t.getKey(), t);
+ }
+
+ private void printMetricsConfig() {
+ final boolean jvmMetricsEnabled =
+
context.getConfiguration().getBoolean(Property.GENERAL_MICROMETER_JVM_METRICS_ENABLED);
+ LOG.info("micrometer metrics enabled: {}", metricsEnabled);
+ if (jvmMetricsEnabled) {
+ if (metricsEnabled) {
+ LOG.info("detailed jvm metrics enabled: {}", jvmMetricsEnabled);
+ } else {
+ LOG.info("requested jvm metrics, but micrometer metrics are
disabled.");
+ }
+ }
+ if (metricsEnabled) {
+ LOG.info("metrics registry factories: {}",
+ context.getConfiguration().get(Property.GENERAL_MICROMETER_FACTORY));
+ }
+ }
+
+ @Override
+ public boolean isMetricsEnabled() {
+ return metricsEnabled;
+ }
+
+ /**
+ * Common tags for all services.
+ */
+ @Override
+ public void addServiceTags(final String applicationName, final HostAndPort
hostAndPort) {
+ List<Tag> tags = new ArrayList<>();
+
+ if (applicationName != null && !applicationName.isEmpty()) {
+ tags.add(MetricsInfo.processTag(applicationName));
+ }
+ if (hostAndPort != null) {
+ tags.addAll(MetricsInfo.addressTags(hostAndPort));
+ }
+ addCommonTags(tags);
+ }
+
+ @Override
+ public void addCommonTags(List<Tag> updates) {
+ lock.lock();
+ try {
+ if (composite != null) {
+ LOG.warn(
+ "Common tags after registry has been initialized may be ignored.
Current common tags: {}",
+ commonTags);
+ return;
+ }
+ updates.forEach(t -> commonTags.put(t.getKey(), t));
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public Collection<Tag> getCommonTags() {
+ lock.lock();
+ try {
+ return Collections.unmodifiableCollection(commonTags.values());
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public void addRegistry(MeterRegistry registry) {
+ lock.lock();
+ try {
+ if (composite != null) {
+ composite.add(registry);
+ } else {
+ // defer until composite is initialized
+ pendingRegistries.add(registry);
+ }
+
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public void addMetricsProducers(MetricsProducer... producer) {
+ if (producer.length == 0) {
+ LOG.debug(
+ "called addMetricsProducers() without providing at least one
producer - this has no effect");
+ return;
+ }
+ lock.lock();
+ try {
+ if (composite == null) {
+ producers.addAll(Arrays.asList(producer));
+ } else {
+ Arrays.stream(producer).forEach(p -> p.registerMetrics(composite));
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public MeterRegistry getRegistry() {
+ lock.lock();
+ try {
+ if (composite == null) {
+ throw new IllegalStateException("metrics have not been initialized,
call init() first");
+ }
+ } finally {
+ lock.unlock();
+ }
+ return composite;
+ }
+
+ @Override
+ public void init() {
+ lock.lock();
+ try {
+ if (composite != null) {
+ LOG.warn("metrics registry has already been initialized");
+ return;
+ }
+ composite = new CompositeMeterRegistry();
+ composite.config().commonTags(commonTags.values());
+
+ LOG.info("Metrics initialization. common tags: {}", commonTags);
+
+ boolean jvmMetricsEnabled =
+
context.getConfiguration().getBoolean(Property.GENERAL_MICROMETER_JVM_METRICS_ENABLED);
+
+ if (jvmMetricsEnabled) {
+ LOG.info("enabling detailed jvm, classloader, jvm gc and process
metrics");
+ new ClassLoaderMetrics().bindTo(composite);
+ new JvmMemoryMetrics().bindTo(composite);
+ jvmGcMetrics = new JvmGcMetrics();
+ jvmGcMetrics.bindTo(composite);
+ new ProcessorMetrics().bindTo(composite);
+ new JvmThreadMetrics().bindTo(composite);
+ }
+
+ MeterFilter replicationFilter = new MeterFilter() {
+ @Override
+ public DistributionStatisticConfig configure(Meter.Id id,
+ @NonNull DistributionStatisticConfig config) {
+ if (id.getName().equals("replicationQueue")) {
+ return DistributionStatisticConfig.builder().percentiles(0.5,
0.75, 0.9, 0.95, 0.99)
+ .expiry(Duration.ofMinutes(10)).build().merge(config);
+ }
+ return config;
+ }
+ };
+
+ if (isMetricsEnabled()) {
+ // user specified registries
+ String userRegistryFactories =
+
context.getConfiguration().get(Property.GENERAL_MICROMETER_FACTORY);
+
+ for (String factoryName : getTrimmedStrings(userRegistryFactories)) {
+ try {
+ MeterRegistry registry = getRegistryFromFactory(factoryName,
context);
+ registry.config().commonTags(commonTags.values());
+ registry.config().meterFilter(replicationFilter);
+ addRegistry(registry);
+ } catch (ClassNotFoundException | NoSuchMethodException |
InvocationTargetException
+ | InstantiationException | IllegalAccessException ex) {
+ LOG.warn("Could not load registry {}", factoryName, ex);
+ }
+ }
+ }
+
+ pendingRegistries.forEach(registry -> composite.add(registry));
+
+ LOG.info("Metrics initialization. Register producers: {}", producers);
+ producers.forEach(p -> p.registerMetrics(composite));
+
+ Metrics.globalRegistry.add(composite);
+
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @VisibleForTesting
+ @SuppressWarnings({"deprecation",
+ "support for org.apache.accumulo.core.metrics.MeterRegistryFactory can
be removed in 3.1"})
+ static MeterRegistry getRegistryFromFactory(final String factoryName, final
ServerContext context)
+ throws ClassNotFoundException, NoSuchMethodException,
InvocationTargetException,
+ InstantiationException, IllegalAccessException {
+ try {
+ LOG.info("look for meter spi registry factory {}", factoryName);
+ Class<? extends
org.apache.accumulo.core.spi.metrics.MeterRegistryFactory> clazz =
+ ClassLoaderUtil.loadClass(factoryName,
+ org.apache.accumulo.core.spi.metrics.MeterRegistryFactory.class);
+ org.apache.accumulo.core.spi.metrics.MeterRegistryFactory factory =
+ clazz.getDeclaredConstructor().newInstance();
+ org.apache.accumulo.core.spi.metrics.MeterRegistryFactory.InitParameters
initParameters =
+ new MeterRegistryEnvPropImpl(context);
+ return factory.create(initParameters);
+ } catch (ClassCastException ex) {
+ // empty. On exception try deprecated version
+ }
+ try {
+ LOG.info("find legacy meter registry factory {}", factoryName);
+ Class<? extends org.apache.accumulo.core.metrics.MeterRegistryFactory>
clazz = ClassLoaderUtil
+ .loadClass(factoryName,
org.apache.accumulo.core.metrics.MeterRegistryFactory.class);
+ org.apache.accumulo.core.metrics.MeterRegistryFactory factory =
+ clazz.getDeclaredConstructor().newInstance();
+ return factory.create();
+ } catch (ClassCastException ex) {
+ // empty. No valid metrics factory, fall through and then throw
exception.
+ }
+ throw new ClassNotFoundException(
+ "Could not find appropriate class implementing a MetricsFactory for: "
+ factoryName);
+ }
+
+ @Override
+ public synchronized void close() {
+ LOG.info("Closing metrics registry");
+ if (jvmGcMetrics != null) {
+ jvmGcMetrics.close();
+ jvmGcMetrics = null;
+ }
+ if (composite != null) {
+ composite.close();
+ composite = null;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "MetricsCommonTags{tags=" + getCommonTags() + '}';
+ }
+}
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
index 0f28ecab8b..249148167d 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
@@ -42,6 +42,7 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.conf.PropertyType;
import org.apache.accumulo.core.conf.PropertyType.PortRange;
+import org.apache.accumulo.core.metrics.MetricsInfo;
import org.apache.accumulo.core.rpc.SslConnectionParams;
import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.rpc.UGIAssumingTransportFactory;
@@ -173,7 +174,7 @@ public class TServerUtils {
// create the TimedProcessor outside the port search loop so we don't try
to
// register the same
// metrics mbean more than once
- TimedProcessor timedProcessor = new TimedProcessor(processor);
+ TimedProcessor timedProcessor = new TimedProcessor(processor,
context.getMetricsInfo());
HostAndPort[] addresses = getHostAndPorts(hostname, portHint);
try {
@@ -568,16 +569,16 @@ public class TServerUtils {
ThriftServerType serverType, TProcessor processor, String serverName,
String threadName,
int numThreads, long threadTimeOut, long timeBetweenThreadChecks, long
maxMessageSize,
SslConnectionParams sslParams, SaslServerConnectionParams saslParams,
- long serverSocketTimeout, int backlog, HostAndPort... addresses) {
+ long serverSocketTimeout, int backlog, MetricsInfo metricsInfo,
HostAndPort... addresses) {
if (serverType == ThriftServerType.SASL) {
processor = updateSaslProcessor(serverType, processor);
}
try {
- return startTServer(serverType, new TimedProcessor(processor),
serverName, threadName,
- numThreads, threadTimeOut, conf, timeBetweenThreadChecks,
maxMessageSize, sslParams,
- saslParams, serverSocketTimeout, backlog, addresses);
+ return startTServer(serverType, new TimedProcessor(processor,
metricsInfo), serverName,
+ threadName, numThreads, threadTimeOut, conf,
timeBetweenThreadChecks, maxMessageSize,
+ sslParams, saslParams, serverSocketTimeout, backlog, addresses);
} catch (TTransportException e) {
throw new IllegalStateException(e);
}
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/rpc/TimedProcessor.java
b/server/base/src/main/java/org/apache/accumulo/server/rpc/TimedProcessor.java
index 165479a71f..4148cfb0ed 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/rpc/TimedProcessor.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/rpc/TimedProcessor.java
@@ -20,7 +20,7 @@ package org.apache.accumulo.server.rpc;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
-import org.apache.accumulo.core.metrics.MetricsUtil;
+import org.apache.accumulo.core.metrics.MetricsInfo;
import org.apache.accumulo.server.metrics.ThriftMetrics;
import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
@@ -35,10 +35,10 @@ public class TimedProcessor implements TProcessor {
private final ThriftMetrics thriftMetrics;
private long idleStart;
- public TimedProcessor(TProcessor next) {
+ public TimedProcessor(final TProcessor next, final MetricsInfo metricsInfo) {
this.other = next;
thriftMetrics = new ThriftMetrics();
- MetricsUtil.initializeProducers(thriftMetrics);
+ metricsInfo.addMetricsProducers(thriftMetrics);
idleStart = System.nanoTime();
}
diff --git
a/server/base/src/test/java/org/apache/accumulo/server/conf/store/impl/PropCacheCaffeineImplTest.java
b/server/base/src/test/java/org/apache/accumulo/server/conf/store/impl/PropCacheCaffeineImplTest.java
index 1121b2b726..3789bb3f41 100644
---
a/server/base/src/test/java/org/apache/accumulo/server/conf/store/impl/PropCacheCaffeineImplTest.java
+++
b/server/base/src/test/java/org/apache/accumulo/server/conf/store/impl/PropCacheCaffeineImplTest.java
@@ -35,7 +35,6 @@ import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.data.InstanceId;
import org.apache.accumulo.core.data.TableId;
-import org.apache.accumulo.core.metrics.MetricsUtil;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.conf.codec.VersionedProperties;
import org.apache.accumulo.server.conf.store.TablePropKey;
@@ -67,9 +66,6 @@ public class PropCacheCaffeineImplTest {
ticker = new TestTicker();
instanceId = InstanceId.of(UUID.randomUUID());
- PropStoreMetrics cacheMetrics = new PropStoreMetrics();
- MetricsUtil.initializeProducers(cacheMetrics);
-
tablePropKey =
TablePropKey.of(instanceId, TableId.of("t" +
ThreadLocalRandom.current().nextInt(1, 1000)));
@@ -85,7 +81,7 @@ public class PropCacheCaffeineImplTest {
expect(context.getInstanceID()).andReturn(instanceId).anyTimes();
- cache = new PropCacheCaffeineImpl.Builder(zooPropLoader,
cacheMetrics).forTests(ticker).build();
+ cache = new
PropCacheCaffeineImpl.Builder(zooPropLoader).forTests(ticker).build();
}
diff --git
a/server/base/src/test/java/org/apache/accumulo/server/conf/store/impl/PropStoreEventTest.java
b/server/base/src/test/java/org/apache/accumulo/server/conf/store/impl/PropStoreEventTest.java
index 748013cd95..71247b4d3b 100644
---
a/server/base/src/test/java/org/apache/accumulo/server/conf/store/impl/PropStoreEventTest.java
+++
b/server/base/src/test/java/org/apache/accumulo/server/conf/store/impl/PropStoreEventTest.java
@@ -39,7 +39,6 @@ import java.util.UUID;
import org.apache.accumulo.core.data.InstanceId;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
-import org.apache.accumulo.core.metrics.MetricsUtil;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.conf.codec.VersionedPropCodec;
import org.apache.accumulo.server.conf.codec.VersionedProperties;
@@ -255,12 +254,9 @@ public class PropStoreEventTest {
replay(context, zrw, readyMonitor);
- PropStoreMetrics metrics = new PropStoreMetrics();
- MetricsUtil.initializeProducers(metrics);
+ ZooPropLoader loader = new ZooPropLoader(zrw, propCodec, watcher);
- ZooPropLoader loader = new ZooPropLoader(zrw, propCodec, watcher, metrics);
-
- PropCacheCaffeineImpl cache = new PropCacheCaffeineImpl.Builder(loader,
metrics).build();
+ PropCacheCaffeineImpl cache = new
PropCacheCaffeineImpl.Builder(loader).build();
// load cache
var read1 = cache.get(tablePropKey);
diff --git
a/server/base/src/test/java/org/apache/accumulo/server/conf/store/impl/ZooPropLoaderTest.java
b/server/base/src/test/java/org/apache/accumulo/server/conf/store/impl/ZooPropLoaderTest.java
index d9f0f4020e..c47fc49752 100644
---
a/server/base/src/test/java/org/apache/accumulo/server/conf/store/impl/ZooPropLoaderTest.java
+++
b/server/base/src/test/java/org/apache/accumulo/server/conf/store/impl/ZooPropLoaderTest.java
@@ -23,7 +23,6 @@ import static
org.apache.accumulo.core.conf.Property.MANAGER_CLIENTPORT;
import static org.apache.accumulo.core.conf.Property.TSERV_CLIENTPORT;
import static org.apache.accumulo.core.conf.Property.TSERV_NATIVEMAP_ENABLED;
import static org.apache.accumulo.core.conf.Property.TSERV_SCAN_MAX_OPENFILES;
-import static org.easymock.EasyMock.anyLong;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.capture;
import static org.easymock.EasyMock.createMock;
@@ -58,13 +57,9 @@ import org.easymock.Capture;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public class ZooPropLoaderTest {
- private static final Logger log =
LoggerFactory.getLogger(ZooPropLoaderTest.class);
-
private PropCacheCaffeineImplTest.TestTicker ticker;
private InstanceId instanceId;
private ServerContext context;
@@ -72,7 +67,6 @@ public class ZooPropLoaderTest {
private VersionedPropCodec propCodec;
// mocks
- private PropStoreMetrics cacheMetrics;
private PropStoreWatcher propStoreWatcher;
private ZooReaderWriter zrw;
@@ -92,18 +86,16 @@ public class ZooPropLoaderTest {
zrw = createMock(ZooReaderWriter.class);
- cacheMetrics = createMock(PropStoreMetrics.class);
-
propStoreWatcher = createMock(PropStoreWatcher.class);
// loader used in tests
- loader = new ZooPropLoader(zrw, propCodec, propStoreWatcher, cacheMetrics);
+ loader = new ZooPropLoader(zrw, propCodec, propStoreWatcher);
}
@AfterEach
public void verifyCommonMocks() {
- verify(context, zrw, propStoreWatcher, cacheMetrics);
+ verify(context, zrw, propStoreWatcher);
}
@Test
@@ -122,10 +114,7 @@ public class ZooPropLoaderTest {
return (bytes);
}).once();
- cacheMetrics.addLoadTime(anyLong());
- expectLastCall().times(1);
-
- replay(context, zrw, propStoreWatcher, cacheMetrics);
+ replay(context, zrw, propStoreWatcher);
assertNotNull(loader.load(propStoreKey));
}
@@ -158,13 +147,10 @@ public class ZooPropLoaderTest {
return (bytes);
}).once();
- cacheMetrics.addLoadTime(anyLong());
- expectLastCall().times(1);
-
- replay(context, zrw, propStoreWatcher, cacheMetrics);
+ replay(context, zrw, propStoreWatcher);
PropCacheCaffeineImpl cache =
- new PropCacheCaffeineImpl.Builder(loader,
cacheMetrics).forTests(ticker).build();
+ new PropCacheCaffeineImpl.Builder(loader).forTests(ticker).build();
// load into cache
assertNotNull(cache.get(propStoreKey));
@@ -189,17 +175,12 @@ public class ZooPropLoaderTest {
propStoreWatcher.signalZkChangeEvent(eq(propStoreKey));
expectLastCall();
- cacheMetrics.incrZkError();
- expectLastCall().once();
-
- replay(context, zrw, propStoreWatcher, cacheMetrics);
+ replay(context, zrw, propStoreWatcher);
PropCacheCaffeineImpl cache =
- new PropCacheCaffeineImpl.Builder(loader,
cacheMetrics).forTests(ticker).build();
+ new PropCacheCaffeineImpl.Builder(loader).forTests(ticker).build();
assertNull(cache.get(propStoreKey));
-
- log.debug("Metrics: {}", cacheMetrics);
}
/**
@@ -225,16 +206,10 @@ public class ZooPropLoaderTest {
return (bytes);
}).times(2);
- cacheMetrics.addLoadTime(anyLong());
- expectLastCall().times(2);
-
- cacheMetrics.incrEviction();
- expectLastCall().once();
-
- replay(context, zrw, propStoreWatcher, cacheMetrics);
+ replay(context, zrw, propStoreWatcher);
PropCacheCaffeineImpl cache =
- new PropCacheCaffeineImpl.Builder(loader,
cacheMetrics).forTests(ticker).build();
+ new PropCacheCaffeineImpl.Builder(loader).forTests(ticker).build();
// load cache
assertNotNull(cache.get(propStoreKey));
@@ -278,17 +253,10 @@ public class ZooPropLoaderTest {
propStoreWatcher.signalCacheChangeEvent(anyObject());
expectLastCall().anyTimes();
- cacheMetrics.addLoadTime(anyLong());
- expectLastCall().times(1);
- cacheMetrics.incrEviction();
- expectLastCall().times(1);
- cacheMetrics.incrZkError();
- expectLastCall().times(1);
-
- replay(context, zrw, propStoreWatcher, cacheMetrics);
+ replay(context, zrw, propStoreWatcher);
PropCacheCaffeineImpl cache =
- new PropCacheCaffeineImpl.Builder(loader,
cacheMetrics).forTests(ticker).build();
+ new PropCacheCaffeineImpl.Builder(loader).forTests(ticker).build();
// prime cache
assertNotNull(cache.get(propStoreKey));
@@ -307,10 +275,10 @@ public class ZooPropLoaderTest {
@Test
public void getIfCachedTest() {
- replay(context, zrw, propStoreWatcher, cacheMetrics);
+ replay(context, zrw, propStoreWatcher);
PropCacheCaffeineImpl cache =
- new PropCacheCaffeineImpl.Builder(loader,
cacheMetrics).forTests(ticker).build();
+ new PropCacheCaffeineImpl.Builder(loader).forTests(ticker).build();
assertNull(cache.getIfCached(propStoreKey));
@@ -348,13 +316,10 @@ public class ZooPropLoaderTest {
return (bytes);
}).once();
- cacheMetrics.addLoadTime(anyLong());
- expectLastCall().times(2);
-
- replay(context, zrw, propStoreWatcher, cacheMetrics);
+ replay(context, zrw, propStoreWatcher);
PropCacheCaffeineImpl cache =
- new PropCacheCaffeineImpl.Builder(loader,
cacheMetrics).forTests(ticker).build();
+ new PropCacheCaffeineImpl.Builder(loader).forTests(ticker).build();
// load into cache
assertNotNull(cache.get(sysPropKey));
@@ -399,13 +364,10 @@ public class ZooPropLoaderTest {
return (bytes);
}).once();
- cacheMetrics.addLoadTime(anyLong());
- expectLastCall().times(2);
-
- replay(context, zrw, propStoreWatcher, cacheMetrics);
+ replay(context, zrw, propStoreWatcher);
PropCacheCaffeineImpl cache =
- new PropCacheCaffeineImpl.Builder(loader,
cacheMetrics).forTests(ticker).build();
+ new PropCacheCaffeineImpl.Builder(loader).forTests(ticker).build();
// load into cache
assertNotNull(cache.get(sysPropKey));
@@ -420,10 +382,10 @@ public class ZooPropLoaderTest {
@Test
public void getIfCachedNotPresentTest() {
- replay(context, zrw, propStoreWatcher, cacheMetrics);
+ replay(context, zrw, propStoreWatcher);
PropCacheCaffeineImpl cache =
- new PropCacheCaffeineImpl.Builder(loader,
cacheMetrics).forTests(ticker).build();
+ new PropCacheCaffeineImpl.Builder(loader).forTests(ticker).build();
// load into cache
assertNull(cache.getIfCached(propStoreKey));
@@ -451,7 +413,7 @@ public class ZooPropLoaderTest {
return propCodec.toBytes(vProps);
}).anyTimes();
- replay(context, zrw, propStoreWatcher, cacheMetrics);
+ replay(context, zrw, propStoreWatcher);
Stat statCheck = new Stat();
statCheck.setVersion(9);
diff --git
a/server/base/src/test/java/org/apache/accumulo/server/metrics/MeterRegistryEnvPropImplTest.java
b/server/base/src/test/java/org/apache/accumulo/server/metrics/MeterRegistryEnvPropImplTest.java
new file mode 100644
index 0000000000..ff0f958dd5
--- /dev/null
+++
b/server/base/src/test/java/org/apache/accumulo/server/metrics/MeterRegistryEnvPropImplTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.metrics;
+
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import java.util.Map;
+
+import org.apache.accumulo.core.conf.ConfigurationCopy;
+import org.apache.accumulo.server.ServerContext;
+import org.easymock.EasyMock;
+import org.junit.jupiter.api.Test;
+
+public class MeterRegistryEnvPropImplTest {
+
+ @Test
+ public void customParamsTest() {
+ ServerContext context = EasyMock.createMock(ServerContext.class);
+ ConfigurationCopy conf = new ConfigurationCopy();
+ conf.set("unknown", "none");
+ conf.set("general.custom.metrics.opts.sample.p1", "v1");
+ conf.set("general.custom.metrics.opts.sample.p2", "v2");
+
+ expect(context.getConfiguration()).andReturn(conf).anyTimes();
+
+ replay(context);
+ MeterRegistryEnvPropImpl env = new MeterRegistryEnvPropImpl(context);
+ assertEquals(Map.of("sample.p1", "v1", "sample.p2", "v2"),
env.getOptions());
+ assertNotNull(env.getServiceEnv());
+ }
+}
diff --git
a/server/base/src/test/java/org/apache/accumulo/server/metrics/MetricsInfoImplTest.java
b/server/base/src/test/java/org/apache/accumulo/server/metrics/MetricsInfoImplTest.java
new file mode 100644
index 0000000000..5ffb615685
--- /dev/null
+++
b/server/base/src/test/java/org/apache/accumulo/server/metrics/MetricsInfoImplTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.metrics;
+
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.mock;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.util.Map;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.server.ServerContext;
+import org.junit.jupiter.api.Test;
+
+import io.micrometer.core.instrument.MeterRegistry;
+import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
+
+public class MetricsInfoImplTest {
+ @Test
+ public void factoryTest() throws Exception {
+
+ ServerContext context = mock(ServerContext.class);
+ AccumuloConfiguration conf = mock(AccumuloConfiguration.class);
+ expect(context.getConfiguration()).andReturn(conf).anyTimes();
+
expect(conf.getAllPropertiesWithPrefixStripped(anyObject())).andReturn(Map.of()).anyTimes();
+ expect(conf.newDeriver(anyObject())).andReturn(Map::of).anyTimes();
+ replay(context, conf);
+
assertNotNull(MetricsInfoImpl.getRegistryFromFactory(SPIFactory.class.getName(),
context));
+
+ assertNotNull(
+
MetricsInfoImpl.getRegistryFromFactory(DeprecatedFactory.class.getName(),
context));
+
+ assertThrows(ClassNotFoundException.class,
+ () -> MetricsInfoImpl.getRegistryFromFactory(String.class.getName(),
context));
+
+ verify(context, conf);
+ }
+
+ @SuppressWarnings({"deprecation",
+ "support for org.apache.accumulo.core.metrics.MeterRegistryFactory can
be removed in 3.1"})
+ static final class DeprecatedFactory
+ implements org.apache.accumulo.core.metrics.MeterRegistryFactory {
+ DeprecatedFactory() {
+
+ }
+
+ @Override
+ public MeterRegistry create() {
+ return new SimpleMeterRegistry();
+ }
+ }
+
+ static class SPIFactory implements
org.apache.accumulo.core.spi.metrics.MeterRegistryFactory {
+
+ SPIFactory() {
+
+ }
+
+ @Override
+ public MeterRegistry create(final InitParameters params) {
+ return new SimpleMeterRegistry();
+ }
+ }
+}
diff --git
a/server/base/src/test/java/org/apache/accumulo/server/rpc/TServerUtilsTest.java
b/server/base/src/test/java/org/apache/accumulo/server/rpc/TServerUtilsTest.java
index 04bd9c6d64..80d261f093 100644
---
a/server/base/src/test/java/org/apache/accumulo/server/rpc/TServerUtilsTest.java
+++
b/server/base/src/test/java/org/apache/accumulo/server/rpc/TServerUtilsTest.java
@@ -18,8 +18,10 @@
*/
package org.apache.accumulo.server.rpc;
+import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.createMock;
import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -41,6 +43,8 @@ import org.apache.accumulo.core.conf.ConfigurationCopy;
import org.apache.accumulo.core.conf.DefaultConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.InstanceId;
+import org.apache.accumulo.core.metrics.MetricsInfo;
+import org.apache.accumulo.core.metrics.MetricsProducer;
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.client.ClientServiceHandler;
@@ -54,6 +58,7 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
public class TServerUtilsTest {
private ServerContext context;
+ private MetricsInfo metricsInfo;
private final ConfigurationCopy conf = new
ConfigurationCopy(DefaultConfiguration.getInstance());
@BeforeEach
@@ -72,12 +77,16 @@ public class TServerUtilsTest {
expect(context.getSaslParams()).andReturn(null).anyTimes();
expect(context.getClientTimeoutInMillis()).andReturn((long)
1000).anyTimes();
expect(context.getSecurityOperation()).andReturn(null).anyTimes();
- replay(context);
+ metricsInfo = createMock(MetricsInfo.class);
+ metricsInfo.addMetricsProducers(anyObject(MetricsProducer.class));
+ expectLastCall().anyTimes();
+ expect(context.getMetricsInfo()).andReturn(metricsInfo).anyTimes();
+ replay(context, metricsInfo);
}
@AfterEach
public void verifyMockServerContext() {
- verify(context);
+ verify(context, metricsInfo);
}
@Test
@@ -200,7 +209,7 @@ public class TServerUtilsTest {
assertNotNull(server);
// Finally ensure that the TServer is using the last port (i.e. port
search worked)
- assertTrue(address.getAddress().getPort() == tserverFinalPort);
+ assertEquals(address.getAddress().getPort(), tserverFinalPort);
} finally {
if (server != null) {
server.stop();
@@ -253,9 +262,9 @@ public class TServerUtilsTest {
}
private int[] findTwoFreeSequentialPorts(int startingAddress) throws
UnknownHostException {
- boolean sequential = false;
+ boolean sequential;
int low = startingAddress;
- int high = 0;
+ int high;
do {
low = getFreePort(low);
high = getFreePort(low + 1);
diff --git
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
index c86a93350c..ec8cdc514c 100644
---
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
+++
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@ -20,7 +20,6 @@ package org.apache.accumulo.coordinator;
import static
com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
-import java.lang.reflect.InvocationTargetException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashSet;
@@ -67,7 +66,7 @@ import org.apache.accumulo.core.metadata.TServerInstance;
import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
-import org.apache.accumulo.core.metrics.MetricsUtil;
+import org.apache.accumulo.core.metrics.MetricsInfo;
import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
@@ -263,15 +262,9 @@ public class CompactionCoordinator extends AbstractServer
throw new IllegalStateException("Exception getting Coordinator lock", e);
}
- try {
- MetricsUtil.initializeMetrics(getContext().getConfiguration(),
this.applicationName,
- clientAddress, getContext().getInstanceName());
- MetricsUtil.initializeProducers(this);
- } catch (ClassNotFoundException | InstantiationException |
IllegalAccessException
- | IllegalArgumentException | InvocationTargetException |
NoSuchMethodException
- | SecurityException e1) {
- LOG.error("Error initializing metrics, metrics will not be emitted.",
e1);
- }
+ MetricsInfo metricsInfo = getContext().getMetricsInfo();
+ metricsInfo.addServiceTags(getApplicationName(), clientAddress);
+ metricsInfo.init();
// On a re-start of the coordinator it's possible that external
compactions are in-progress.
// Attempt to get the running compactions on the compactors and then
resolve which tserver
diff --git
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index f4affca907..6b0931b3d4 100644
---
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -24,7 +24,6 @@ import static java.util.concurrent.TimeUnit.MINUTES;
import static org.apache.accumulo.core.util.LazySingletons.RANDOM;
import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
@@ -78,8 +77,8 @@ import org.apache.accumulo.core.metadata.schema.DataFileValue;
import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
+import org.apache.accumulo.core.metrics.MetricsInfo;
import org.apache.accumulo.core.metrics.MetricsProducer;
-import org.apache.accumulo.core.metrics.MetricsUtil;
import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
@@ -586,16 +585,11 @@ public class Compactor extends AbstractServer implements
MetricsProducer, Compac
throw new RuntimeException("Error registering compactor in ZooKeeper",
e);
}
- try {
- MetricsUtil.initializeMetrics(getContext().getConfiguration(),
this.applicationName,
- clientAddress, getContext().getInstanceName());
- pausedMetrics = new PausedCompactionMetrics();
- MetricsUtil.initializeProducers(this, pausedMetrics);
- } catch (ClassNotFoundException | InstantiationException |
IllegalAccessException
- | IllegalArgumentException | InvocationTargetException |
NoSuchMethodException
- | SecurityException e1) {
- LOG.error("Error initializing metrics, metrics will not be emitted.",
e1);
- }
+ MetricsInfo metricsInfo = getContext().getMetricsInfo();
+ metricsInfo.addServiceTags(getApplicationName(), clientAddress);
+
+ metricsInfo.addMetricsProducers(this, pausedMetrics);
+ metricsInfo.init();
var watcher = new CompactionWatcher(getConfiguration());
var schedExecutor = ThreadPools.getServerThreadPools()
diff --git
a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index 968e7d2b8b..e9c23b1729 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@ -22,7 +22,6 @@ import static
com.google.common.util.concurrent.Uninterruptibles.sleepUninterrup
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
@@ -43,7 +42,7 @@ import org.apache.accumulo.core.lock.ServiceLockData;
import org.apache.accumulo.core.lock.ServiceLockData.ThriftService;
import org.apache.accumulo.core.metadata.AccumuloTable;
import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
-import org.apache.accumulo.core.metrics.MetricsUtil;
+import org.apache.accumulo.core.metrics.MetricsInfo;
import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.util.Halt;
@@ -155,16 +154,11 @@ public class SimpleGarbageCollector extends
AbstractServer implements Iface {
System.exit(1);
}
- try {
- MetricsUtil.initializeMetrics(getContext().getConfiguration(),
this.applicationName, address,
- getContext().getInstanceName());
- MetricsUtil.initializeProducers(this, new GcMetrics(this));
- } catch (ClassNotFoundException | InstantiationException |
IllegalAccessException
- | IllegalArgumentException | InvocationTargetException |
NoSuchMethodException
- | SecurityException e1) {
- log.error("Error initializing metrics, metrics will not be emitted.",
e1);
- }
+ MetricsInfo metricsInfo = getContext().getMetricsInfo();
+ metricsInfo.addServiceTags(getApplicationName(), address);
+ metricsInfo.addMetricsProducers(this, new GcMetrics(this));
+ metricsInfo.init();
try {
long delay = getStartDelay();
log.debug("Sleeping for {} milliseconds before beginning garbage
collection cycles", delay);
@@ -378,7 +372,8 @@ public class SimpleGarbageCollector extends AbstractServer
implements Iface {
getContext().getThriftServerType(), processor,
this.getClass().getSimpleName(),
"GC Monitor Service", 2, ThreadPools.DEFAULT_TIMEOUT_MILLISECS, 1000,
maxMessageSize,
getContext().getServerSslParams(), getContext().getSaslParams(), 0,
- getConfiguration().getCount(Property.RPC_BACKLOG), addresses);
+ getConfiguration().getCount(Property.RPC_BACKLOG),
getContext().getMetricsInfo(),
+ addresses);
log.debug("Starting garbage collector listening on " + server.address);
return server.address;
}
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index cbbba0fb7f..9cead258f7 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -28,7 +28,6 @@ import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
import java.net.UnknownHostException;
import java.time.Duration;
import java.util.ArrayList;
@@ -98,7 +97,8 @@ import org.apache.accumulo.core.metadata.TServerInstance;
import org.apache.accumulo.core.metadata.TabletLocationState;
import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
-import org.apache.accumulo.core.metrics.MetricsUtil;
+import org.apache.accumulo.core.metrics.MetricsInfo;
+import org.apache.accumulo.core.metrics.MetricsProducer;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.spi.balancer.BalancerEnvironment;
import org.apache.accumulo.core.spi.balancer.SimpleLoadBalancer;
@@ -1104,16 +1104,12 @@ public class Manager extends AbstractServer
managerUpgrading.set(true);
}
- try {
- MetricsUtil.initializeMetrics(getContext().getConfiguration(),
this.applicationName,
- sa.getAddress(), getContext().getInstanceName());
- ManagerMetrics mm = new ManagerMetrics(getConfiguration(), this);
- MetricsUtil.initializeProducers(this, mm);
- } catch (ClassNotFoundException | InstantiationException |
IllegalAccessException
- | IllegalArgumentException | InvocationTargetException |
NoSuchMethodException
- | SecurityException e1) {
- log.error("Error initializing metrics, metrics will not be emitted.",
e1);
- }
+ MetricsInfo metricsInfo = getContext().getMetricsInfo();
+ metricsInfo.addServiceTags(getApplicationName(), sa.getAddress());
+
+ var producers = ManagerMetrics.getProducers(getConfiguration(), this);
+ metricsInfo.addMetricsProducers(producers.toArray(new MetricsProducer[0]));
+ metricsInfo.init();
recoveryManager = new RecoveryManager(this,
timeToCacheRecoveryWalExistence);
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/ManagerMetrics.java
b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/ManagerMetrics.java
index 285df23a69..02a30d4d22 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/ManagerMetrics.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/ManagerMetrics.java
@@ -18,7 +18,8 @@
*/
package org.apache.accumulo.manager.metrics;
-import static java.util.Objects.requireNonNull;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
@@ -26,21 +27,12 @@ import org.apache.accumulo.core.metrics.MetricsProducer;
import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.manager.metrics.fate.FateMetrics;
-import io.micrometer.core.instrument.MeterRegistry;
+public class ManagerMetrics {
-public class ManagerMetrics implements MetricsProducer {
-
- private final FateMetrics fateMetrics;
-
- public ManagerMetrics(final AccumuloConfiguration conf, final Manager
manager) {
- requireNonNull(conf, "AccumuloConfiguration must not be null");
- requireNonNull(conf, "Manager must not be null");
- fateMetrics = new FateMetrics(manager.getContext(),
-
conf.getTimeInMillis(Property.MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL));
- }
-
- @Override
- public void registerMetrics(MeterRegistry registry) {
- fateMetrics.registerMetrics(registry);
+ public static List<MetricsProducer> getProducers(AccumuloConfiguration conf,
Manager manager) {
+ ArrayList<MetricsProducer> producers = new ArrayList<>();
+ producers.add(new FateMetrics(manager.getContext(),
+
conf.getTimeInMillis(Property.MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL)));
+ return producers;
}
}
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java
b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java
index 800455fe0e..bed2e53411 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java
@@ -18,6 +18,7 @@
*/
package org.apache.accumulo.manager.metrics.fate;
+import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
@@ -28,7 +29,6 @@ import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.fate.ReadOnlyTStore;
import org.apache.accumulo.core.fate.ZooStore;
import org.apache.accumulo.core.metrics.MetricsProducer;
-import org.apache.accumulo.core.metrics.MetricsUtil;
import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.server.ServerContext;
import org.apache.zookeeper.KeeperException;
@@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Metrics;
+import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
public class FateMetrics implements MetricsProducer {
@@ -127,27 +128,31 @@ public class FateMetrics implements MetricsProducer {
@Override
public void registerMetrics(final MeterRegistry registry) {
- totalCurrentOpsGauge = registry.gauge(METRICS_FATE_TOTAL_IN_PROGRESS,
- MetricsUtil.getCommonTags(), new AtomicLong(0));
- totalOpsGauge =
- registry.gauge(METRICS_FATE_OPS_ACTIVITY, MetricsUtil.getCommonTags(),
new AtomicLong(0));
- fateErrorsGauge = registry.gauge(METRICS_FATE_ERRORS,
- Tags.concat(MetricsUtil.getCommonTags(), "type", "zk.connection"), new
AtomicLong(0));
- newTxGauge = registry.gauge(METRICS_FATE_TX,
Tags.concat(MetricsUtil.getCommonTags(), "state",
- ReadOnlyTStore.TStatus.NEW.name().toLowerCase()), new AtomicLong(0));
- submittedTxGauge = registry.gauge(METRICS_FATE_TX,
Tags.concat(MetricsUtil.getCommonTags(),
- "state", ReadOnlyTStore.TStatus.SUBMITTED.name().toLowerCase()), new
AtomicLong(0));
- inProgressTxGauge = registry.gauge(METRICS_FATE_TX,
Tags.concat(MetricsUtil.getCommonTags(),
- "state", ReadOnlyTStore.TStatus.IN_PROGRESS.name().toLowerCase()), new
AtomicLong(0));
- failedInProgressTxGauge =
- registry.gauge(METRICS_FATE_TX,
Tags.concat(MetricsUtil.getCommonTags(), "state",
- ReadOnlyTStore.TStatus.FAILED_IN_PROGRESS.name().toLowerCase()),
new AtomicLong(0));
- failedTxGauge = registry.gauge(METRICS_FATE_TX,
Tags.concat(MetricsUtil.getCommonTags(),
- "state", ReadOnlyTStore.TStatus.FAILED.name().toLowerCase()), new
AtomicLong(0));
- successfulTxGauge = registry.gauge(METRICS_FATE_TX,
Tags.concat(MetricsUtil.getCommonTags(),
- "state", ReadOnlyTStore.TStatus.SUCCESSFUL.name().toLowerCase()), new
AtomicLong(0));
- unknownTxGauge = registry.gauge(METRICS_FATE_TX,
Tags.concat(MetricsUtil.getCommonTags(),
- "state", ReadOnlyTStore.TStatus.UNKNOWN.name().toLowerCase()), new
AtomicLong(0));
+ totalCurrentOpsGauge = registry.gauge(METRICS_FATE_TOTAL_IN_PROGRESS, new
AtomicLong(0));
+ totalOpsGauge = registry.gauge(METRICS_FATE_OPS_ACTIVITY, new
AtomicLong(0));
+ fateErrorsGauge = registry.gauge(METRICS_FATE_ERRORS,
List.of(Tag.of("type", "zk.connection")),
+ new AtomicLong(0));
+ newTxGauge = registry.gauge(METRICS_FATE_TX,
+ List.of(Tag.of("state",
ReadOnlyTStore.TStatus.NEW.name().toLowerCase())),
+ new AtomicLong(0));
+ submittedTxGauge = registry.gauge(METRICS_FATE_TX,
+ List.of(Tag.of("state",
ReadOnlyTStore.TStatus.SUBMITTED.name().toLowerCase())),
+ new AtomicLong(0));
+ inProgressTxGauge = registry.gauge(METRICS_FATE_TX,
+ List.of(Tag.of("state",
ReadOnlyTStore.TStatus.IN_PROGRESS.name().toLowerCase())),
+ new AtomicLong(0));
+ failedInProgressTxGauge = registry.gauge(METRICS_FATE_TX,
+ List.of(Tag.of("state",
ReadOnlyTStore.TStatus.FAILED_IN_PROGRESS.name().toLowerCase())),
+ new AtomicLong(0));
+ failedTxGauge = registry.gauge(METRICS_FATE_TX,
+ List.of(Tag.of("state",
ReadOnlyTStore.TStatus.FAILED.name().toLowerCase())),
+ new AtomicLong(0));
+ successfulTxGauge = registry.gauge(METRICS_FATE_TX,
+ List.of(Tag.of("state",
ReadOnlyTStore.TStatus.SUCCESSFUL.name().toLowerCase())),
+ new AtomicLong(0));
+ unknownTxGauge = registry.gauge(METRICS_FATE_TX,
+ List.of(Tag.of("state",
ReadOnlyTStore.TStatus.UNKNOWN.name().toLowerCase())),
+ new AtomicLong(0));
update();
diff --git
a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
index a93919d61c..fde338f422 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
@@ -65,6 +65,7 @@ import
org.apache.accumulo.core.manager.thrift.ManagerClientService;
import org.apache.accumulo.core.manager.thrift.ManagerMonitorInfo;
import org.apache.accumulo.core.manager.thrift.TableInfo;
import org.apache.accumulo.core.manager.thrift.TabletServerStatus;
+import org.apache.accumulo.core.metrics.MetricsInfo;
import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
import org.apache.accumulo.core.tabletscan.thrift.ActiveScan;
@@ -490,6 +491,11 @@ public class Monitor extends AbstractServer implements
HighlyAvailableService {
}
log.debug("Using {} to advertise monitor location in ZooKeeper",
advertiseHost);
+ MetricsInfo metricsInfo = getContext().getMetricsInfo();
+ metricsInfo.addServiceTags(getApplicationName(),
+ HostAndPort.fromParts(advertiseHost, livePort));
+ metricsInfo.init();
+
try {
URL url = new URL(server.isSecure() ? "https" : "http", advertiseHost,
server.getPort(), "/");
final String path = context.getZooKeeperRoot() +
Constants.ZMONITOR_HTTP_ADDR;
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
index 8fcc742ba2..ca4b28d06e 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
@@ -22,7 +22,6 @@ import static
com.google.common.util.concurrent.Uninterruptibles.sleepUninterrup
import java.io.IOException;
import java.io.UncheckedIOException;
-import java.lang.reflect.InvocationTargetException;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -78,7 +77,7 @@ import org.apache.accumulo.core.metadata.StoredTabletFile;
import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
-import org.apache.accumulo.core.metrics.MetricsUtil;
+import org.apache.accumulo.core.metrics.MetricsInfo;
import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
import org.apache.accumulo.core.tabletscan.thrift.ActiveScan;
import org.apache.accumulo.core.tabletscan.thrift.TSampleNotPresentException;
@@ -374,17 +373,13 @@ public class ScanServer extends AbstractServer
throw new RuntimeException("Failed to start the compactor client
service", e1);
}
- try {
- MetricsUtil.initializeMetrics(getContext().getConfiguration(),
this.applicationName,
- clientAddress, getContext().getInstanceName());
- scanMetrics = new TabletServerScanMetrics();
- MetricsUtil.initializeProducers(this, scanMetrics);
- } catch (ClassNotFoundException | InstantiationException |
IllegalAccessException
- | IllegalArgumentException | InvocationTargetException |
NoSuchMethodException
- | SecurityException e1) {
- LOG.error("Error initializing metrics, metrics will not be emitted.",
e1);
- }
+ MetricsInfo metricsInfo = getContext().getMetricsInfo();
+ metricsInfo.addServiceTags(getApplicationName(), clientAddress);
+
+ scanMetrics = new TabletServerScanMetrics();
+ metricsInfo.addMetricsProducers(this, scanMetrics);
+ metricsInfo.init();
// We need to set the compaction manager so that we don't get an NPE in
CompactableImpl.close
ServiceLock lock = announceExistence();
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 678b1294c5..89d503ef58 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -30,7 +30,6 @@ import static
org.apache.accumulo.core.util.threads.ThreadPools.watchNonCritical
import java.io.IOException;
import java.lang.management.ManagementFactory;
-import java.lang.reflect.InvocationTargetException;
import java.net.UnknownHostException;
import java.time.Duration;
import java.time.Instant;
@@ -93,7 +92,7 @@ import org.apache.accumulo.core.metadata.AccumuloTable;
import org.apache.accumulo.core.metadata.TServerInstance;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
-import org.apache.accumulo.core.metrics.MetricsUtil;
+import org.apache.accumulo.core.metrics.MetricsInfo;
import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
import org.apache.accumulo.core.spi.fs.VolumeChooserEnvironment;
@@ -705,24 +704,19 @@ public class TabletServer extends AbstractServer
implements TabletHostingServer
throw new RuntimeException("Failed to start the tablet client service",
e1);
}
- try {
- MetricsUtil.initializeMetrics(context.getConfiguration(),
this.applicationName, clientAddress,
- getContext().getInstanceName());
-
- metrics = new TabletServerMetrics(this);
- updateMetrics = new TabletServerUpdateMetrics();
- scanMetrics = new TabletServerScanMetrics();
- mincMetrics = new TabletServerMinCMetrics();
- ceMetrics = new CompactionExecutorsMetrics();
- pausedMetrics = new PausedCompactionMetrics();
- MetricsUtil.initializeProducers(this, metrics, updateMetrics,
scanMetrics, mincMetrics,
- ceMetrics, pausedMetrics);
-
- } catch (ClassNotFoundException | InstantiationException |
IllegalAccessException
- | IllegalArgumentException | InvocationTargetException |
NoSuchMethodException
- | SecurityException e1) {
- log.error("Error initializing metrics, metrics will not be emitted.",
e1);
- }
+ MetricsInfo metricsInfo = getContext().getMetricsInfo();
+ metricsInfo.addServiceTags(getApplicationName(), clientAddress);
+
+ metrics = new TabletServerMetrics(this);
+ updateMetrics = new TabletServerUpdateMetrics();
+ scanMetrics = new TabletServerScanMetrics();
+ mincMetrics = new TabletServerMinCMetrics();
+ ceMetrics = new CompactionExecutorsMetrics();
+ pausedMetrics = new PausedCompactionMetrics();
+
+ metricsInfo.addMetricsProducers(metrics, updateMetrics, scanMetrics,
mincMetrics, ceMetrics,
+ pausedMetrics);
+ metricsInfo.init();
this.compactionManager = new CompactionManager(() -> Iterators
.transform(onlineTablets.snapshot().values().iterator(),
Tablet::asCompactable),
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
index de47b841fd..2bb80309a1 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
@@ -140,7 +140,7 @@ public class TabletServerResourceManager {
}
private ThreadPoolExecutor createPriorityExecutor(ScanExecutorConfig sec,
- Map<String,Queue<Runnable>> scanExecQueues) {
+ Map<String,Queue<Runnable>> scanExecQueues, final boolean enableMetrics)
{
BlockingQueue<Runnable> queue;
@@ -188,7 +188,7 @@ public class TabletServerResourceManager {
ThreadPoolExecutor es =
ThreadPools.getServerThreadPools().getPoolBuilder("scan-" + sec.name)
.numCoreThreads(sec.getCurrentMaxThreads()).numMaxThreads(sec.getCurrentMaxThreads())
.withTimeOut(0L,
MILLISECONDS).withQueue(queue).atPriority(sec.priority)
- .enableThreadPoolMetrics().build();
+ .enableThreadPoolMetrics(enableMetrics).build();
modifyThreadPoolSizesAtRuntime(sec::getCurrentMaxThreads, "scan-" +
sec.name, es);
return es;
@@ -251,6 +251,7 @@ public class TabletServerResourceManager {
public TabletServerResourceManager(ServerContext context,
TabletHostingServer tserver) {
this.context = context;
final AccumuloConfiguration acuConf = context.getConfiguration();
+ final boolean enableMetrics = context.getMetricsInfo().isMetricsEnabled();
// acuConf.getBoolean(Property.GENERAL_MICROMETER_ENABLED);
long maxMemory = acuConf.getAsBytes(Property.TSERV_MAXMEM);
boolean usingNativeMap =
acuConf.getBoolean(Property.TSERV_NATIVEMAP_ENABLED);
if (usingNativeMap) {
@@ -307,20 +308,20 @@ public class TabletServerResourceManager {
() ->
context.getConfiguration().getCount(Property.TSERV_MINC_MAXCONCURRENT),
"minor compactor", minorCompactionThreadPool);
- splitThreadPool =
-
ThreadPools.getServerThreadPools().getPoolBuilder("splitter").numCoreThreads(0)
- .numMaxThreads(1).withTimeOut(1,
SECONDS).enableThreadPoolMetrics().build();
+ splitThreadPool =
ThreadPools.getServerThreadPools().getPoolBuilder("splitter")
+ .numCoreThreads(0).numMaxThreads(1).withTimeOut(1, SECONDS)
+ .enableThreadPoolMetrics(enableMetrics).build();
- defaultSplitThreadPool =
- ThreadPools.getServerThreadPools().getPoolBuilder("md
splitter").numCoreThreads(0)
- .numMaxThreads(1).withTimeOut(60,
SECONDS).enableThreadPoolMetrics().build();
+ defaultSplitThreadPool =
ThreadPools.getServerThreadPools().getPoolBuilder("md splitter")
+ .numCoreThreads(0).numMaxThreads(1).withTimeOut(60, SECONDS)
+ .enableThreadPoolMetrics(enableMetrics).build();
defaultMigrationPool = ThreadPools.getServerThreadPools()
.getPoolBuilder("metadata tablet
migration").numCoreThreads(0).numMaxThreads(1)
- .withTimeOut(60, SECONDS).enableThreadPoolMetrics().build();
+ .withTimeOut(60,
SECONDS).enableThreadPoolMetrics(enableMetrics).build();
migrationPool =
ThreadPools.getServerThreadPools().createExecutorService(acuConf,
- Property.TSERV_MIGRATE_MAXCONCURRENT, true);
+ Property.TSERV_MIGRATE_MAXCONCURRENT, enableMetrics);
modifyThreadPoolSizesAtRuntime(
() ->
context.getConfiguration().getCount(Property.TSERV_MIGRATE_MAXCONCURRENT),
"tablet migration", migrationPool);
@@ -331,31 +332,31 @@ public class TabletServerResourceManager {
// individual tablet server run
// concurrent assignments would put more load on the metadata table at
startup
assignmentPool =
ThreadPools.getServerThreadPools().createExecutorService(acuConf,
- Property.TSERV_ASSIGNMENT_MAXCONCURRENT, true);
+ Property.TSERV_ASSIGNMENT_MAXCONCURRENT, enableMetrics);
modifyThreadPoolSizesAtRuntime(
() ->
context.getConfiguration().getCount(Property.TSERV_ASSIGNMENT_MAXCONCURRENT),
"tablet assignment", assignmentPool);
assignMetaDataPool = ThreadPools.getServerThreadPools()
.getPoolBuilder("metadata tablet
assignment").numCoreThreads(0).numMaxThreads(1)
- .withTimeOut(60, SECONDS).enableThreadPoolMetrics().build();
+ .withTimeOut(60,
SECONDS).enableThreadPoolMetrics(enableMetrics).build();
activeAssignments = new ConcurrentHashMap<>();
summaryRetrievalPool =
ThreadPools.getServerThreadPools().createExecutorService(acuConf,
- Property.TSERV_SUMMARY_RETRIEVAL_THREADS, true);
+ Property.TSERV_SUMMARY_RETRIEVAL_THREADS, enableMetrics);
modifyThreadPoolSizesAtRuntime(
() ->
context.getConfiguration().getCount(Property.TSERV_SUMMARY_RETRIEVAL_THREADS),
"summary file retriever", summaryRetrievalPool);
summaryRemotePool =
ThreadPools.getServerThreadPools().createExecutorService(acuConf,
- Property.TSERV_SUMMARY_REMOTE_THREADS, true);
+ Property.TSERV_SUMMARY_REMOTE_THREADS, enableMetrics);
modifyThreadPoolSizesAtRuntime(
() ->
context.getConfiguration().getCount(Property.TSERV_SUMMARY_REMOTE_THREADS),
"summary remote", summaryRemotePool);
summaryPartitionPool =
ThreadPools.getServerThreadPools().createExecutorService(acuConf,
- Property.TSERV_SUMMARY_PARTITION_THREADS, true);
+ Property.TSERV_SUMMARY_PARTITION_THREADS, enableMetrics);
modifyThreadPoolSizesAtRuntime(
() ->
context.getConfiguration().getCount(Property.TSERV_SUMMARY_PARTITION_THREADS),
"summary partition", summaryPartitionPool);
@@ -364,8 +365,8 @@ public class TabletServerResourceManager {
Collection<ScanExecutorConfig> scanExecCfg =
acuConf.getScanExecutors(isScanServer);
Map<String,Queue<Runnable>> scanExecQueues = new HashMap<>();
- scanExecutors = scanExecCfg.stream().collect(
- toUnmodifiableMap(cfg -> cfg.name, cfg -> createPriorityExecutor(cfg,
scanExecQueues)));
+ scanExecutors = scanExecCfg.stream().collect(toUnmodifiableMap(cfg ->
cfg.name,
+ cfg -> createPriorityExecutor(cfg, scanExecQueues, enableMetrics)));
scanExecutorChoices = scanExecCfg.stream().collect(toUnmodifiableMap(cfg
-> cfg.name,
cfg -> new ScanExecutorImpl(cfg, scanExecQueues.get(cfg.name))));
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMinCMetrics.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMinCMetrics.java
index 3df8aff090..a7f402343c 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMinCMetrics.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMinCMetrics.java
@@ -21,7 +21,6 @@ package org.apache.accumulo.tserver.metrics;
import java.time.Duration;
import org.apache.accumulo.core.metrics.MetricsProducer;
-import org.apache.accumulo.core.metrics.MetricsUtil;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
@@ -42,11 +41,10 @@ public class TabletServerMinCMetrics implements
MetricsProducer {
@Override
public void registerMetrics(MeterRegistry registry) {
activeMinc = Timer.builder(METRICS_MINC_RUNNING).description("Minor
compactions time active")
- .tags(MetricsUtil.getCommonTags()).register(registry);
+ .register(registry);
- queuedMinc =
- Timer.builder(METRICS_MINC_QUEUED).description("Queued minor
compactions time queued")
- .tags(MetricsUtil.getCommonTags()).register(registry);
+ queuedMinc = Timer.builder(METRICS_MINC_QUEUED)
+ .description("Queued minor compactions time
queued").register(registry);
}
}
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java
index 2dc18a5cbe..9a1faa6261 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java
@@ -23,7 +23,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
import org.apache.accumulo.core.metrics.MetricsProducer;
-import org.apache.accumulo.core.metrics.MetricsUtil;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
@@ -138,18 +137,14 @@ public class TabletServerScanMetrics implements
MetricsProducer {
.description("Results per scan").register(registry);
yields =
DistributionSummary.builder(METRICS_SCAN_YIELDS).description("yields").register(registry);
- startScanCalls =
- Counter.builder(METRICS_SCAN_START).description("calls to start a scan
/ multiscan")
- .tags(MetricsUtil.getCommonTags()).register(registry);
- continueScanCalls =
- Counter.builder(METRICS_SCAN_CONTINUE).description("calls to continue
a scan / multiscan")
- .tags(MetricsUtil.getCommonTags()).register(registry);
- closeScanCalls =
- Counter.builder(METRICS_SCAN_CLOSE).description("calls to close a scan
/ multiscan")
- .tags(MetricsUtil.getCommonTags()).register(registry);
+ startScanCalls = Counter.builder(METRICS_SCAN_START)
+ .description("calls to start a scan / multiscan").register(registry);
+ continueScanCalls = Counter.builder(METRICS_SCAN_CONTINUE)
+ .description("calls to continue a scan /
multiscan").register(registry);
+ closeScanCalls = Counter.builder(METRICS_SCAN_CLOSE)
+ .description("calls to close a scan / multiscan").register(registry);
busyTimeoutReturned = Counter.builder(METRICS_SCAN_BUSY_TIMEOUT)
- .description("times that a scan has timed out in the queue")
- .tags(MetricsUtil.getCommonTags()).register(registry);
+ .description("times that a scan has timed out in the
queue").register(registry);
Gauge.builder(METRICS_TSERVER_QUERIES, this,
TabletServerScanMetrics::getLookupCount)
.description("Number of queries").register(registry);
Gauge.builder(METRICS_TSERVER_SCAN_RESULTS, this,
TabletServerScanMetrics::getQueryResultCount)
@@ -161,11 +156,10 @@ public class TabletServerScanMetrics implements
MetricsProducer {
Gauge.builder(METRICS_TSERVER_SCANNED_ENTRIES, this,
TabletServerScanMetrics::getScannedCount)
.description("Scanned rate").register(registry);
pausedForMemory = Counter.builder(METRICS_SCAN_PAUSED_FOR_MEM)
- .description("scan paused due to server being low on memory")
- .tags(MetricsUtil.getCommonTags()).register(registry);
+ .description("scan paused due to server being low on
memory").register(registry);
earlyReturnForMemory = Counter.builder(METRICS_SCAN_RETURN_FOR_MEM)
.description("scan returned results early due to server being low on
memory")
- .tags(MetricsUtil.getCommonTags()).register(registry);
+ .register(registry);
}
}
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
index 5d8c81ec73..567d8171e1 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
@@ -832,6 +832,7 @@ public class CompactableImpl implements Compactable {
synchronized (this) {
if (closed) {
+ log.trace("Selection of files was not initiated {} because closed",
getExtent());
return;
}
@@ -842,6 +843,14 @@ public class CompactableImpl implements Compactable {
log.trace("Selected compaction status changed {} {} {} {}",
getExtent(),
fileMgr.getSelectionStatus(), compactionId, compactionConfig);
} else {
+ if (kind == CompactionKind.USER) {
+ // Only log for user compaction because this code is only called
when one is initiated via
+ // the API call. For other compaction kinds the tserver will keep
periodically attempting
+ // to initiate which would result in lots of logs.
+ log.trace(
+ "Selection of files was not initiated {} compactionId:{}
selectStatus:{} selectedFiles:{}",
+ getExtent(), this.compactionId, fileMgr.selectStatus,
fileMgr.selectedFiles.size());
+ }
return;
}
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/conf/store/PropCacheCaffeineImplZkIT.java
b/test/src/main/java/org/apache/accumulo/test/conf/store/PropCacheCaffeineImplZkIT.java
index 5abb7057d3..22d06b83a1 100644
---
a/test/src/main/java/org/apache/accumulo/test/conf/store/PropCacheCaffeineImplZkIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/conf/store/PropCacheCaffeineImplZkIT.java
@@ -41,13 +41,11 @@ import org.apache.accumulo.core.data.InstanceId;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
-import org.apache.accumulo.core.metrics.MetricsUtil;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.conf.codec.VersionedPropCodec;
import org.apache.accumulo.server.conf.codec.VersionedProperties;
import org.apache.accumulo.server.conf.store.TablePropKey;
import org.apache.accumulo.server.conf.store.impl.PropCacheCaffeineImpl;
-import org.apache.accumulo.server.conf.store.impl.PropStoreMetrics;
import org.apache.accumulo.server.conf.store.impl.PropStoreWatcher;
import org.apache.accumulo.server.conf.store.impl.ReadyMonitor;
import org.apache.accumulo.server.conf.store.impl.ZooPropLoader;
@@ -79,7 +77,6 @@ public class PropCacheCaffeineImplZkIT {
private final TableId tIdA = TableId.of("A");
private final TableId tIdB = TableId.of("B");
- private final PropStoreMetrics cacheMetrics = new PropStoreMetrics();
private static ServerContext context;
@TempDir
@@ -161,12 +158,9 @@ public class PropCacheCaffeineImplZkIT {
PropStoreWatcher propStoreWatcher = new PropStoreWatcher(readyMonitor);
- MetricsUtil.initializeProducers(cacheMetrics);
-
ZooPropLoader propLoader =
- new ZooPropLoader(zrw, VersionedPropCodec.getDefault(),
propStoreWatcher, cacheMetrics);
- PropCacheCaffeineImpl cache =
- new PropCacheCaffeineImpl.Builder(propLoader, cacheMetrics).build();
+ new ZooPropLoader(zrw, VersionedPropCodec.getDefault(),
propStoreWatcher);
+ PropCacheCaffeineImpl cache = new
PropCacheCaffeineImpl.Builder(propLoader).build();
VersionedProperties readProps = cache.get(propStoreKey);
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java
b/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java
index 2a8abcfffb..a56c34fe3f 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java
@@ -127,7 +127,7 @@ public class ZombieTServer {
ServerAddress serverPort =
TServerUtils.startTServer(context.getConfiguration(),
ThriftServerType.CUSTOM_HS_HA, muxProcessor, "ZombieTServer", "walking
dead", 2,
ThreadPools.DEFAULT_TIMEOUT_MILLISECS, 1000, 10 * 1024 * 1024, null,
null, -1,
- context.getConfiguration().getCount(Property.RPC_BACKLOG),
+ context.getConfiguration().getCount(Property.RPC_BACKLOG),
context.getMetricsInfo(),
HostAndPort.fromParts("0.0.0.0", port));
String addressString = serverPort.address.toString();
diff --git a/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java
b/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java
index 2342a77559..4b92480f19 100644
--- a/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java
@@ -43,6 +43,7 @@ import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.metrics.MetricsProducer;
+import org.apache.accumulo.core.spi.metrics.LoggingMeterRegistryFactory;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.accumulo.test.functional.ConfigurableMacBase;
import org.apache.accumulo.test.metrics.TestStatsDSink.Metric;
@@ -79,11 +80,14 @@ public class MetricsIT extends ConfigurableMacBase
implements MetricsProducer {
cfg.setProperty(Property.GC_CYCLE_START, "1s");
cfg.setProperty(Property.GC_CYCLE_DELAY, "1s");
cfg.setProperty(Property.MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL, "1s");
-
- // Tell the server processes to use a StatsDMeterRegistry that will be
configured
- // to push all metrics to the sink we started.
+ // Tell the server processes to use a StatsDMeterRegistry and the simple
logging registry
+ // that will be configured to push all metrics to the sink we started.
cfg.setProperty(Property.GENERAL_MICROMETER_ENABLED, "true");
- cfg.setProperty(Property.GENERAL_MICROMETER_FACTORY,
TestStatsDRegistryFactory.class.getName());
+ cfg.setProperty(Property.GENERAL_MICROMETER_JVM_METRICS_ENABLED, "true");
+ cfg.setProperty("general.custom.metrics.opts.logging.step", "1s");
+ String clazzList = LoggingMeterRegistryFactory.class.getName() + ","
+ + TestStatsDRegistryFactory.class.getName();
+ cfg.setProperty(Property.GENERAL_MICROMETER_FACTORY, clazzList);
Map<String,String> sysProps =
Map.of(TestStatsDRegistryFactory.SERVER_HOST, "127.0.0.1",
TestStatsDRegistryFactory.SERVER_PORT,
Integer.toString(sink.getPort()));
cfg.setSystemProperties(sysProps);
@@ -98,9 +102,7 @@ public class MetricsIT extends ConfigurableMacBase
implements MetricsProducer {
Set<String> unexpectedMetrics = Set.of(METRICS_SCAN_YIELDS,
METRICS_UPDATE_ERRORS,
METRICS_COMPACTOR_MAJC_STUCK, METRICS_SCAN_BUSY_TIMEOUT,
METRICS_SCAN_PAUSED_FOR_MEM,
METRICS_SCAN_RETURN_FOR_MEM, METRICS_MINC_PAUSED, METRICS_MAJC_PAUSED);
- Set<String> flakyMetrics = Set.of(METRICS_GC_WAL_ERRORS,
METRICS_FATE_TYPE_IN_PROGRESS,
- METRICS_PROPSTORE_EVICTION_COUNT, METRICS_PROPSTORE_REFRESH_COUNT,
- METRICS_PROPSTORE_REFRESH_LOAD_COUNT,
METRICS_PROPSTORE_ZK_ERROR_COUNT);
+ Set<String> flakyMetrics = Set.of(METRICS_GC_WAL_ERRORS,
METRICS_FATE_TYPE_IN_PROGRESS);
Map<String,String> expectedMetricNames = this.getMetricFields();
flakyMetrics.forEach(expectedMetricNames::remove); // might not see these
diff --git
a/test/src/main/java/org/apache/accumulo/test/metrics/TestStatsDRegistryFactory.java
b/test/src/main/java/org/apache/accumulo/test/metrics/TestStatsDRegistryFactory.java
index 6c26fabb4d..8715a40c00 100644
---
a/test/src/main/java/org/apache/accumulo/test/metrics/TestStatsDRegistryFactory.java
+++
b/test/src/main/java/org/apache/accumulo/test/metrics/TestStatsDRegistryFactory.java
@@ -20,7 +20,7 @@ package org.apache.accumulo.test.metrics;
import java.time.Duration;
-import org.apache.accumulo.core.metrics.MeterRegistryFactory;
+import org.apache.accumulo.core.spi.metrics.MeterRegistryFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,8 +38,8 @@ public class TestStatsDRegistryFactory implements
MeterRegistryFactory {
public static final String SERVER_PORT = "test.meter.registry.port";
@Override
- public MeterRegistry create() {
-
+ public MeterRegistry create(final InitParameters params) {
+ LOG.info("starting metrics registration.");
String host = System.getProperty(SERVER_HOST, null);
String port = System.getProperty(SERVER_PORT, null);
diff --git
a/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java
b/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java
index 512acb5c48..8e5a96c51e 100644
--- a/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java
+++ b/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java
@@ -339,7 +339,7 @@ public class NullTserver {
TServerUtils.startTServer(context.getConfiguration(),
ThriftServerType.CUSTOM_HS_HA,
muxProcessor, "NullTServer", "null tserver", 2,
ThreadPools.DEFAULT_TIMEOUT_MILLISECS, 1000,
10 * 1024 * 1024, null, null, -1,
context.getConfiguration().getCount(Property.RPC_BACKLOG),
- HostAndPort.fromParts("0.0.0.0", opts.port));
+ context.getMetricsInfo(), HostAndPort.fromParts("0.0.0.0", opts.port));
HostAndPort addr =
HostAndPort.fromParts(InetAddress.getLocalHost().getHostName(), opts.port);
diff --git a/test/src/main/resources/log4j2-test.properties
b/test/src/main/resources/log4j2-test.properties
index f0d7d93212..0c77a3871b 100644
--- a/test/src/main/resources/log4j2-test.properties
+++ b/test/src/main/resources/log4j2-test.properties
@@ -142,28 +142,5 @@ logger.38.level = debug
logger.39.name = org.apache.accumulo.manager.Manager
logger.39.level = trace
-property.metricsFilename = ./target/test-metrics
-
-# appender.metrics.type = Console
-appender.metrics.type = RollingFile
-appender.metrics.name = LoggingMetricsOutput
-appender.metrics.fileName = ${metricsFilename}.metrics
-appender.metrics.filePattern = ${metricsFilename}-%d{MM-dd-yy-HH}-%i.metrics.gz
-appender.metrics.layout.type = PatternLayout
-appender.metrics.layout.pattern = METRICS: %d{ISO8601}, %m%n
-appender.metrics.policies.type = Policies
-appender.metrics.policies.time.type = TimeBasedTriggeringPolicy
-appender.metrics.policies.time.interval = 1
-appender.metrics.policies.time.modulate = true
-appender.metrics.policies.size.type = SizeBasedTriggeringPolicy
-appender.metrics.policies.size.size=100MB
-appender.metrics.strategy.type = DefaultRolloverStrategy
-appender.metrics.strategy.max = 10
-
-logger.metrics.name = org.apache.accumulo.metrics
-logger.metrics.level = info
-logger.metrics.additivity = false
-logger.metrics.appenderRef.metrics.ref = LoggingMetricsOutput
-
rootLogger.level = debug
rootLogger.appenderRef.console.ref = STDOUT