This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new ccb76c0a2a ARTEMIS-4064 Harden MetricsManager
ccb76c0a2a is described below
commit ccb76c0a2a1055e83ddfdbbda5049b97b2389b16
Author: Justin Bertram <[email protected]>
AuthorDate: Thu Oct 20 13:47:18 2022 -0500
ARTEMIS-4064 Harden MetricsManager
In order to improve trouble-shooting for the MetricsManager there should
be additional logging and exceptions. In all, this commit contains the
following changes:
- Additional logging
- Throw an exception when registering meters if meters already exist
- Rename a few variables & methods to more clearly identify what they
are used for
- Upgrade Micrometer to 1.9.5
- Simplify/clarify a few blocks of code
- No longer pass the ManagementServiceImpl when registering the
metrics, but instead pass the Object the meter is observing (e.g.
broker, address, or queue)
---
.../artemis/core/server/ActiveMQMessageBundle.java | 3 +
.../management/impl/ManagementServiceImpl.java | 58 ++++++++---------
.../core/server/metrics/MetricsManager.java | 72 ++++++++++------------
pom.xml | 2 +-
4 files changed, 67 insertions(+), 68 deletions(-)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
index 9ca0a1127d..aadb9ce633 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
@@ -521,4 +521,7 @@ public interface ActiveMQMessageBundle {
@Message(id = 229243, value = "Embedded web server restart failed")
ActiveMQException embeddedWebServerRestartFailed(Exception e);
+
+ @Message(id = 229244, value = "Meters already registered for {}")
+ IllegalStateException metersAlreadyRegistered(String resource);
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
index d7c3a3342f..d2f6dbee3a 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
@@ -227,11 +227,11 @@ public class ManagementServiceImpl implements
ManagementService {
MetricsManager metricsManager = messagingServer.getMetricsManager();
if (metricsManager != null) {
metricsManager.registerBrokerGauge(builder -> {
- builder.register(BrokerMetricNames.CONNECTION_COUNT, this, metrics
-> Double.valueOf(messagingServer.getConnectionCount()),
ActiveMQServerControl.CONNECTION_COUNT_DESCRIPTION);
- builder.register(BrokerMetricNames.TOTAL_CONNECTION_COUNT, this,
metrics -> Double.valueOf(messagingServer.getTotalConnectionCount()),
ActiveMQServerControl.TOTAL_CONNECTION_COUNT_DESCRIPTION);
- builder.register(BrokerMetricNames.ADDRESS_MEMORY_USAGE, this,
metrics -> Double.valueOf(messagingServerControl.getAddressMemoryUsage()),
ActiveMQServerControl.ADDRESS_MEMORY_USAGE_DESCRIPTION);
-
builder.register(BrokerMetricNames.ADDRESS_MEMORY_USAGE_PERCENTAGE, this,
metrics ->
Double.valueOf(messagingServerControl.getAddressMemoryUsagePercentage()),
ActiveMQServerControl.ADDRESS_MEMORY_USAGE_PERCENTAGE_DESCRIPTION);
- builder.register(BrokerMetricNames.DISK_STORE_USAGE, this, metrics
-> Double.valueOf(messagingServer.getDiskStoreUsage()),
ActiveMQServerControl.DISK_STORE_USAGE_DESCRIPTION);
+ builder.build(BrokerMetricNames.CONNECTION_COUNT, messagingServer,
metrics -> Double.valueOf(messagingServer.getConnectionCount()),
ActiveMQServerControl.CONNECTION_COUNT_DESCRIPTION);
+ builder.build(BrokerMetricNames.TOTAL_CONNECTION_COUNT,
messagingServer, metrics ->
Double.valueOf(messagingServer.getTotalConnectionCount()),
ActiveMQServerControl.TOTAL_CONNECTION_COUNT_DESCRIPTION);
+ builder.build(BrokerMetricNames.ADDRESS_MEMORY_USAGE,
messagingServer, metrics ->
Double.valueOf(messagingServerControl.getAddressMemoryUsage()),
ActiveMQServerControl.ADDRESS_MEMORY_USAGE_DESCRIPTION);
+ builder.build(BrokerMetricNames.ADDRESS_MEMORY_USAGE_PERCENTAGE,
messagingServer, metrics ->
Double.valueOf(messagingServerControl.getAddressMemoryUsagePercentage()),
ActiveMQServerControl.ADDRESS_MEMORY_USAGE_PERCENTAGE_DESCRIPTION);
+ builder.build(BrokerMetricNames.DISK_STORE_USAGE, messagingServer,
metrics -> Double.valueOf(messagingServer.getDiskStoreUsage()),
ActiveMQServerControl.DISK_STORE_USAGE_DESCRIPTION);
});
}
}
@@ -266,10 +266,10 @@ public class ManagementServiceImpl implements
ManagementService {
MetricsManager metricsManager = messagingServer.getMetricsManager();
if (metricsManager != null) {
metricsManager.registerAddressGauge(addressInfo.getName().toString(), builder
-> {
- builder.register(AddressMetricNames.ROUTED_MESSAGE_COUNT, this,
metrics -> Double.valueOf(addressInfo.getRoutedMessageCount()),
AddressControl.ROUTED_MESSAGE_COUNT_DESCRIPTION);
- builder.register(AddressMetricNames.UNROUTED_MESSAGE_COUNT,
this, metrics -> Double.valueOf(addressInfo.getUnRoutedMessageCount()),
AddressControl.UNROUTED_MESSAGE_COUNT_DESCRIPTION);
- builder.register(AddressMetricNames.ADDRESS_SIZE, this, metrics
-> Double.valueOf(addressControl.getAddressSize()),
AddressControl.ADDRESS_SIZE_DESCRIPTION);
- builder.register(AddressMetricNames.PAGES_COUNT, this, metrics
-> Double.valueOf(addressControl.getNumberOfPages()),
AddressControl.NUMBER_OF_PAGES_DESCRIPTION);
+ builder.build(AddressMetricNames.ROUTED_MESSAGE_COUNT,
addressInfo, metrics -> Double.valueOf(addressInfo.getRoutedMessageCount()),
AddressControl.ROUTED_MESSAGE_COUNT_DESCRIPTION);
+ builder.build(AddressMetricNames.UNROUTED_MESSAGE_COUNT,
addressInfo, metrics -> Double.valueOf(addressInfo.getUnRoutedMessageCount()),
AddressControl.UNROUTED_MESSAGE_COUNT_DESCRIPTION);
+ builder.build(AddressMetricNames.ADDRESS_SIZE, addressInfo,
metrics -> Double.valueOf(addressControl.getAddressSize()),
AddressControl.ADDRESS_SIZE_DESCRIPTION);
+ builder.build(AddressMetricNames.PAGES_COUNT, addressInfo,
metrics -> Double.valueOf(addressControl.getNumberOfPages()),
AddressControl.NUMBER_OF_PAGES_DESCRIPTION);
});
}
}
@@ -330,26 +330,26 @@ public class ManagementServiceImpl implements
ManagementService {
MetricsManager metricsManager = messagingServer.getMetricsManager();
if (metricsManager != null) {
metricsManager.registerQueueGauge(queue.getAddress().toString(),
queue.getName().toString(), (builder) -> {
- builder.register(QueueMetricNames.MESSAGE_COUNT, this, metrics
-> Double.valueOf(queue.getMessageCount()),
QueueControl.MESSAGE_COUNT_DESCRIPTION);
- builder.register(QueueMetricNames.DURABLE_MESSAGE_COUNT, this,
metrics -> Double.valueOf(queue.getDurableMessageCount()),
QueueControl.DURABLE_MESSAGE_COUNT_DESCRIPTION);
- builder.register(QueueMetricNames.PERSISTENT_SIZE, this,
metrics -> Double.valueOf(queue.getPersistentSize()),
QueueControl.PERSISTENT_SIZE_DESCRIPTION);
- builder.register(QueueMetricNames.DURABLE_PERSISTENT_SIZE,
this, metrics -> Double.valueOf(queue.getDurablePersistentSize()),
QueueControl.DURABLE_PERSISTENT_SIZE_DESCRIPTION);
-
- builder.register(QueueMetricNames.DELIVERING_MESSAGE_COUNT,
this, metrics -> Double.valueOf(queue.getDeliveringCount()),
QueueControl.DELIVERING_MESSAGE_COUNT_DESCRIPTION);
-
builder.register(QueueMetricNames.DELIVERING_DURABLE_MESSAGE_COUNT, this,
metrics -> Double.valueOf(queue.getDurableDeliveringCount()),
QueueControl.DURABLE_DELIVERING_MESSAGE_COUNT_DESCRIPTION);
- builder.register(QueueMetricNames.DELIVERING_PERSISTENT_SIZE,
this, metrics -> Double.valueOf(queue.getDeliveringSize()),
QueueControl.DELIVERING_SIZE_DESCRIPTION);
-
builder.register(QueueMetricNames.DELIVERING_DURABLE_PERSISTENT_SIZE, this,
metrics -> Double.valueOf(queue.getDurableDeliveringSize()),
QueueControl.DURABLE_DELIVERING_SIZE_DESCRIPTION);
-
- builder.register(QueueMetricNames.SCHEDULED_MESSAGE_COUNT,
this, metrics -> Double.valueOf(queue.getScheduledCount()),
QueueControl.SCHEDULED_MESSAGE_COUNT_DESCRIPTION);
-
builder.register(QueueMetricNames.SCHEDULED_DURABLE_MESSAGE_COUNT, this,
metrics -> Double.valueOf(queue.getDurableScheduledCount()),
QueueControl.DURABLE_SCHEDULED_MESSAGE_COUNT_DESCRIPTION);
- builder.register(QueueMetricNames.SCHEDULED_PERSISTENT_SIZE,
this, metrics -> Double.valueOf(queue.getScheduledSize()),
QueueControl.SCHEDULED_SIZE_DESCRIPTION);
-
builder.register(QueueMetricNames.SCHEDULED_DURABLE_PERSISTENT_SIZE, this,
metrics -> Double.valueOf(queue.getDurableScheduledSize()),
QueueControl.DURABLE_SCHEDULED_SIZE_DESCRIPTION);
-
- builder.register(QueueMetricNames.MESSAGES_ACKNOWLEDGED, this,
metrics -> Double.valueOf(queue.getMessagesAcknowledged()),
QueueControl.MESSAGES_ACKNOWLEDGED_DESCRIPTION);
- builder.register(QueueMetricNames.MESSAGES_ADDED, this, metrics
-> Double.valueOf(queue.getMessagesAdded()),
QueueControl.MESSAGES_ADDED_DESCRIPTION);
- builder.register(QueueMetricNames.MESSAGES_KILLED, this,
metrics -> Double.valueOf(queue.getMessagesKilled()),
QueueControl.MESSAGES_KILLED_DESCRIPTION);
- builder.register(QueueMetricNames.MESSAGES_EXPIRED, this,
metrics -> Double.valueOf(queue.getMessagesExpired()),
QueueControl.MESSAGES_EXPIRED_DESCRIPTION);
- builder.register(QueueMetricNames.CONSUMER_COUNT, this, metrics
-> Double.valueOf(queue.getConsumerCount()),
QueueControl.CONSUMER_COUNT_DESCRIPTION);
+ builder.build(QueueMetricNames.MESSAGE_COUNT, queue, metrics ->
Double.valueOf(queue.getMessageCount()),
QueueControl.MESSAGE_COUNT_DESCRIPTION);
+ builder.build(QueueMetricNames.DURABLE_MESSAGE_COUNT, queue,
metrics -> Double.valueOf(queue.getDurableMessageCount()),
QueueControl.DURABLE_MESSAGE_COUNT_DESCRIPTION);
+ builder.build(QueueMetricNames.PERSISTENT_SIZE, queue, metrics
-> Double.valueOf(queue.getPersistentSize()),
QueueControl.PERSISTENT_SIZE_DESCRIPTION);
+ builder.build(QueueMetricNames.DURABLE_PERSISTENT_SIZE, queue,
metrics -> Double.valueOf(queue.getDurablePersistentSize()),
QueueControl.DURABLE_PERSISTENT_SIZE_DESCRIPTION);
+
+ builder.build(QueueMetricNames.DELIVERING_MESSAGE_COUNT, queue,
metrics -> Double.valueOf(queue.getDeliveringCount()),
QueueControl.DELIVERING_MESSAGE_COUNT_DESCRIPTION);
+
builder.build(QueueMetricNames.DELIVERING_DURABLE_MESSAGE_COUNT, queue, metrics
-> Double.valueOf(queue.getDurableDeliveringCount()),
QueueControl.DURABLE_DELIVERING_MESSAGE_COUNT_DESCRIPTION);
+ builder.build(QueueMetricNames.DELIVERING_PERSISTENT_SIZE,
queue, metrics -> Double.valueOf(queue.getDeliveringSize()),
QueueControl.DELIVERING_SIZE_DESCRIPTION);
+
builder.build(QueueMetricNames.DELIVERING_DURABLE_PERSISTENT_SIZE, queue,
metrics -> Double.valueOf(queue.getDurableDeliveringSize()),
QueueControl.DURABLE_DELIVERING_SIZE_DESCRIPTION);
+
+ builder.build(QueueMetricNames.SCHEDULED_MESSAGE_COUNT, queue,
metrics -> Double.valueOf(queue.getScheduledCount()),
QueueControl.SCHEDULED_MESSAGE_COUNT_DESCRIPTION);
+ builder.build(QueueMetricNames.SCHEDULED_DURABLE_MESSAGE_COUNT,
queue, metrics -> Double.valueOf(queue.getDurableScheduledCount()),
QueueControl.DURABLE_SCHEDULED_MESSAGE_COUNT_DESCRIPTION);
+ builder.build(QueueMetricNames.SCHEDULED_PERSISTENT_SIZE,
queue, metrics -> Double.valueOf(queue.getScheduledSize()),
QueueControl.SCHEDULED_SIZE_DESCRIPTION);
+
builder.build(QueueMetricNames.SCHEDULED_DURABLE_PERSISTENT_SIZE, queue,
metrics -> Double.valueOf(queue.getDurableScheduledSize()),
QueueControl.DURABLE_SCHEDULED_SIZE_DESCRIPTION);
+
+ builder.build(QueueMetricNames.MESSAGES_ACKNOWLEDGED, queue,
metrics -> Double.valueOf(queue.getMessagesAcknowledged()),
QueueControl.MESSAGES_ACKNOWLEDGED_DESCRIPTION);
+ builder.build(QueueMetricNames.MESSAGES_ADDED, queue, metrics
-> Double.valueOf(queue.getMessagesAdded()),
QueueControl.MESSAGES_ADDED_DESCRIPTION);
+ builder.build(QueueMetricNames.MESSAGES_KILLED, queue, metrics
-> Double.valueOf(queue.getMessagesKilled()),
QueueControl.MESSAGES_KILLED_DESCRIPTION);
+ builder.build(QueueMetricNames.MESSAGES_EXPIRED, queue, metrics
-> Double.valueOf(queue.getMessagesExpired()),
QueueControl.MESSAGES_EXPIRED_DESCRIPTION);
+ builder.build(QueueMetricNames.CONSUMER_COUNT, queue, metrics
-> Double.valueOf(queue.getConsumerCount()),
QueueControl.CONSUMER_COUNT_DESCRIPTION);
});
}
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/metrics/MetricsManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/metrics/MetricsManager.java
index d9d19984e8..e5c84d48df 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/metrics/MetricsManager.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/metrics/MetricsManager.java
@@ -34,6 +34,7 @@ import
io.micrometer.core.instrument.binder.jvm.JvmThreadMetrics;
import io.netty.buffer.PooledByteBufAllocator;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.config.MetricsConfiguration;
+import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.slf4j.Logger;
@@ -80,15 +81,14 @@ public class MetricsManager {
@FunctionalInterface
public interface MetricGaugeBuilder {
- void register(String metricName, Object state, ToDoubleFunction f,
String description);
+ void build(String metricName, Object state, ToDoubleFunction f, String
description);
}
public void registerQueueGauge(String address, String queue,
Consumer<MetricGaugeBuilder> builder) {
- final MeterRegistry meterRegistry = this.meterRegistry;
- if (meterRegistry == null ||
!addressSettingsRepository.getMatch(address).isEnableMetrics()) {
+ if (this.meterRegistry == null ||
!addressSettingsRepository.getMatch(address).isEnableMetrics()) {
return;
}
- final List<Gauge.Builder> newMeters = new ArrayList<>();
+ final List<Gauge.Builder> gaugeBuilders = new ArrayList<>();
builder.accept((metricName, state, f, description) -> {
Gauge.Builder meter = Gauge
.builder("artemis." + metricName, state, f)
@@ -96,70 +96,66 @@ public class MetricsManager {
.tag("address", address)
.tag("queue", queue)
.description(description);
- newMeters.add(meter);
+ gaugeBuilders.add(meter);
});
- final String resource = ResourceNames.QUEUE + queue;
- registerMeter(newMeters, resource);
+ registerMeters(gaugeBuilders, ResourceNames.QUEUE + queue);
}
public void registerAddressGauge(String address,
Consumer<MetricGaugeBuilder> builder) {
- final MeterRegistry meterRegistry = this.meterRegistry;
- if (meterRegistry == null ||
!addressSettingsRepository.getMatch(address).isEnableMetrics()) {
+ if (this.meterRegistry == null ||
!addressSettingsRepository.getMatch(address).isEnableMetrics()) {
return;
}
- final List<Gauge.Builder> newMeters = new ArrayList<>();
+ final List<Gauge.Builder> gaugeBuilders = new ArrayList<>();
builder.accept((metricName, state, f, description) -> {
Gauge.Builder meter = Gauge
.builder("artemis." + metricName, state, f)
.tag("broker", brokerName)
.tag("address", address)
.description(description);
- newMeters.add(meter);
+ gaugeBuilders.add(meter);
});
- final String resource = ResourceNames.ADDRESS + address;
- registerMeter(newMeters, resource);
+ registerMeters(gaugeBuilders, ResourceNames.ADDRESS + address);
}
public void registerBrokerGauge(Consumer<MetricGaugeBuilder> builder) {
- final MeterRegistry meterRegistry = this.meterRegistry;
- if (meterRegistry == null) {
+ if (this.meterRegistry == null) {
return;
}
- final List<Gauge.Builder> newMeters = new ArrayList<>();
+ final List<Gauge.Builder> gaugeBuilders = new ArrayList<>();
builder.accept((metricName, state, f, description) -> {
Gauge.Builder meter = Gauge
.builder("artemis." + metricName, state, f)
.tag("broker", brokerName)
.description(description);
- newMeters.add(meter);
+ gaugeBuilders.add(meter);
});
- final String resource = ResourceNames.BROKER + "." + brokerName;
- registerMeter(newMeters, resource);
+ registerMeters(gaugeBuilders, ResourceNames.BROKER + "." + brokerName);
}
- private void registerMeter(List<Gauge.Builder> newMeters, String resource) {
- this.meters.compute(resource, (s, meters) -> {
- //the old meters are ignored on purpose
- meters = new ArrayList<>(newMeters.size());
- for (Gauge.Builder gaugeBuilder : newMeters) {
- Gauge gauge = gaugeBuilder.register(meterRegistry);
- meters.add(gauge);
- logger.debug("Registered meter: {}", gauge.getId());
- }
- return meters;
- });
+ private void registerMeters(List<Gauge.Builder> gaugeBuilders, String
resource) {
+ if (meters.get(resource) != null) {
+ throw ActiveMQMessageBundle.BUNDLE.metersAlreadyRegistered(resource);
+ }
+ logger.debug("Registering meters for {}", resource);
+ List<Meter> newMeters = new ArrayList<>(gaugeBuilders.size());
+ for (Gauge.Builder gaugeBuilder : gaugeBuilders) {
+ Gauge gauge = gaugeBuilder.register(meterRegistry);
+ newMeters.add(gauge);
+ logger.debug("Registered meter: {}", gauge.getId());
+ }
+ meters.put(resource, newMeters);
}
- public void remove(String component) {
- meters.computeIfPresent(component, (s, meters) -> {
- if (meters == null) {
- return null;
- }
- for (Meter meter : meters) {
+ public void remove(String resource) {
+ List<Meter> resourceMeters = meters.remove(resource);
+ if (resourceMeters != null) {
+ logger.debug("Unregistering meters for {}", resource);
+ for (Meter meter : resourceMeters) {
Meter removed = meterRegistry.remove(meter);
logger.debug("Unregistered meter: {}", removed.getId());
}
- return null;
- });
+ } else {
+ logger.debug("Attempted to unregister meters for {}, but none were
found.", resource);
+ }
}
}
diff --git a/pom.xml b/pom.xml
index 147d820bae..5ecce22dd0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -147,7 +147,7 @@
<servicemix.json-1.1.spec.version>2.9.0</servicemix.json-1.1.spec.version>
<version.org.jacoco>0.8.8</version.org.jacoco>
<version.org.jacoco.plugin>0.8.8</version.org.jacoco.plugin>
- <version.micrometer>1.8.5</version.micrometer>
+ <version.micrometer>1.9.5</version.micrometer>
<hamcrest.version>2.1</hamcrest.version>
<junit.version>4.13.2</junit.version>
<surefire.version>2.22.2</surefire.version>