This is an automated email from the ASF dual-hosted git repository.
joewitt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 66e2741 NIFI-5535 NIFI-4713 This closes #4858. Change metric tagging
in DataDogReportingTask
66e2741 is described below
commit 66e2741871e4de25631ffe32b961a622ca5e3ceb
Author: Florimond Manca <[email protected]>
AuthorDate: Fri Oct 30 17:26:44 2020 +0100
NIFI-5535 NIFI-4713 This closes #4858. Change metric tagging in
DataDogReportingTask
Remove low-value default tags
Send processor name as tag instead of as part of metric name
Signed-off-by: Joe Witt <[email protected]>
---
.../reporting/datadog/DDMetricRegistryBuilder.java | 11 +-----
.../reporting/datadog/DataDogReportingTask.java | 36 +++++++----------
.../reporting/datadog/metrics/MetricNames.java | 14 -------
.../reporting/datadog/metrics/MetricsService.java | 45 +++-------------------
.../datadog/TestDataDogReportingTask.java | 35 ++++++++---------
5 files changed, 38 insertions(+), 103 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/DDMetricRegistryBuilder.java
b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/DDMetricRegistryBuilder.java
index c00eec5..f53604d 100644
---
a/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/DDMetricRegistryBuilder.java
+++
b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/DDMetricRegistryBuilder.java
@@ -24,8 +24,6 @@ import org.coursera.metrics.datadog.transport.UdpTransport;
import java.io.IOException;
import java.net.InetAddress;
-import java.util.Arrays;
-import java.util.List;
/**
* Class configures MetricRegistry (passed outside or created from scratch)
with Datadog support
@@ -34,7 +32,6 @@ public class DDMetricRegistryBuilder {
private MetricRegistry metricRegistry = null;
- private List<String> tags = Arrays.asList();
private DatadogReporter datadogReporter;
private String apiKey = "";
private Transport transport;
@@ -44,11 +41,6 @@ public class DDMetricRegistryBuilder {
return this;
}
- public DDMetricRegistryBuilder setTags(List<String> tags) {
- this.tags = tags;
- return this;
- }
-
public DatadogReporter getDatadogReporter() {
return datadogReporter;
}
@@ -86,8 +78,7 @@ public class DDMetricRegistryBuilder {
DatadogReporter.forRegistry(metricRegistry)
.withHost(InetAddress.getLocalHost().getHostName())
.withTransport(transport)
- .withTags(tags)
.build();
return reporter;
}
-}
\ No newline at end of file
+}
diff --git
a/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/DataDogReportingTask.java
b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/DataDogReportingTask.java
index 3747571..8ea2da0 100644
---
a/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/DataDogReportingTask.java
+++
b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/DataDogReportingTask.java
@@ -18,7 +18,6 @@ package org.apache.nifi.reporting.datadog;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
-import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.AtomicDouble;
@@ -114,8 +113,7 @@ public class DataDogReportingTask extends
AbstractReportingTask {
metricsPrefix = METRICS_PREFIX.getDefaultValue();
environment = ENVIRONMENT.getDefaultValue();
virtualMachineMetrics = JmxJvmMetrics.getInstance();
- ddMetricRegistryBuilder.setMetricRegistry(metricRegistry)
- .setTags(metricsService.getAllTagsList());
+ ddMetricRegistryBuilder.setMetricRegistry(metricRegistry);
}
@Override
@@ -145,9 +143,9 @@ public class DataDogReportingTask extends
AbstractReportingTask {
ddMetricRegistryBuilder.getDatadogReporter().report();
}
- protected void updateMetrics(Map<String, Double> metrics, Optional<String>
processorName, Map<String, String> tags) {
+ protected void updateMetrics(Map<String, Double> metrics, Map<String,
String> tags) {
for (Map.Entry<String, Double> entry : metrics.entrySet()) {
- final String metricName = buildMetricName(processorName,
entry.getKey());
+ final String metricName = buildMetricName(entry.getKey());
logger.debug(metricName + ": " + entry.getValue());
//if metric is not registered yet - register it
if (!metricsMap.containsKey(metricName)) {
@@ -163,37 +161,31 @@ public class DataDogReportingTask extends
AbstractReportingTask {
final List<ProcessorStatus> processorStatuses = new ArrayList<>();
populateProcessorStatuses(processGroupStatus, processorStatuses);
for (final ProcessorStatus processorStatus : processorStatuses) {
- updateMetrics(metricsService.getProcessorMetrics(processorStatus),
- Optional.of(processorStatus.getName()), defaultTags);
+ final Map<String, String> processorTags = new
HashMap<>(defaultTags);
+
processorTags.putAll(metricsService.getProcessorTags(processorStatus));
+ updateMetrics(metricsService.getProcessorMetrics(processorStatus),
processorTags);
}
final List<ConnectionStatus> connectionStatuses = new ArrayList<>();
populateConnectionStatuses(processGroupStatus, connectionStatuses);
for (ConnectionStatus connectionStatus: connectionStatuses) {
- Map<String, String> connectionStatusTags = new
HashMap<>(defaultTags);
-
connectionStatusTags.putAll(metricsService.getConnectionStatusTags(connectionStatus));
-
updateMetrics(metricsService.getConnectionStatusMetrics(connectionStatus),
Optional.<String>absent(), connectionStatusTags);
+
updateMetrics(metricsService.getConnectionStatusMetrics(connectionStatus),
defaultTags);
}
final List<PortStatus> inputPortStatuses = new ArrayList<>();
populateInputPortStatuses(processGroupStatus, inputPortStatuses);
for (PortStatus portStatus: inputPortStatuses) {
- Map<String, String> portTags = new HashMap<>(defaultTags);
- portTags.putAll(metricsService.getPortStatusTags(portStatus));
- updateMetrics(metricsService.getPortStatusMetrics(portStatus),
Optional.<String>absent(), portTags);
+ updateMetrics(metricsService.getPortStatusMetrics(portStatus),
defaultTags);
}
final List<PortStatus> outputPortStatuses = new ArrayList<>();
populateOutputPortStatuses(processGroupStatus, outputPortStatuses);
for (PortStatus portStatus: outputPortStatuses) {
- Map<String, String> portTags = new HashMap<>(defaultTags);
- portTags.putAll(metricsService.getPortStatusTags(portStatus));
- updateMetrics(metricsService.getPortStatusMetrics(portStatus),
Optional.<String>absent(), portTags);
+ updateMetrics(metricsService.getPortStatusMetrics(portStatus),
defaultTags);
}
- updateMetrics(metricsService.getJVMMetrics(virtualMachineMetrics),
- Optional.<String>absent(), defaultTags);
- updateMetrics(metricsService.getDataFlowMetrics(processGroupStatus),
Optional.<String>absent(), defaultTags);
+ updateMetrics(metricsService.getJVMMetrics(virtualMachineMetrics),
defaultTags);
+ updateMetrics(metricsService.getDataFlowMetrics(processGroupStatus),
defaultTags);
}
private class MetricGauge implements Gauge, DynamicTagsCallback {
@@ -258,8 +250,8 @@ public class DataDogReportingTask extends
AbstractReportingTask {
}
}
- private String buildMetricName(Optional<String> processorName, String
metricName) {
- return metricsPrefix + "." + processorName.or("flow") + "." +
metricName;
+ private String buildMetricName(String metricName) {
+ return metricsPrefix + "." + metricName;
}
protected MetricsService getMetricsService() {
@@ -277,4 +269,4 @@ public class DataDogReportingTask extends
AbstractReportingTask {
protected ConcurrentHashMap<String, AtomicDouble> getMetricsMap() {
return new ConcurrentHashMap<>();
}
-}
\ No newline at end of file
+}
diff --git
a/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/metrics/MetricNames.java
b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/metrics/MetricNames.java
index b176a33..58f5e34 100644
---
a/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/metrics/MetricNames.java
+++
b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/metrics/MetricNames.java
@@ -57,18 +57,4 @@ public interface MetricNames {
//Connection status metrics
String QUEUED_COUNT = "QueuedCount";
String QUEUED_BYTES = "QueuedBytes";
-
- //Port status tags
- String PORT_ID = "port-id";
- String PORT_GROUP_ID = "port-group-id";
- String PORT_NAME = "port-name";
-
- //Connection status tags
- String CONNECTION_ID = "connection-id";
- String CONNECTION_GROUP_ID = "connection-group-id";
- String CONNECTION_NAME = "connection-name";
- String CONNECTION_SOURCE_ID = "connection-source-id";
- String CONNECTION_SOURCE_NAME = "connection-source-name";
- String CONNECTION_DESTINATION_ID = "connection-destination-id";
- String CONNECTTION_DESTINATION_NAME = "connection-destination-name";
}
diff --git
a/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/metrics/MetricsService.java
b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/metrics/MetricsService.java
index 3e26073..6c743b6 100644
---
a/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/metrics/MetricsService.java
+++
b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/metrics/MetricsService.java
@@ -23,9 +23,7 @@ import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.metrics.jvm.JmxJvmMetrics;
import org.apache.nifi.processor.DataUnit;
-import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -46,6 +44,12 @@ public class MetricsService {
return metrics;
}
+ public Map<String, String> getProcessorTags(ProcessorStatus status) {
+ Map<String, String> tags = new HashMap<>();
+ tags.put("processor", status.getName());
+ return tags;
+ }
+
public Map<String, Double> getPortStatusMetrics(PortStatus status){
final Map<String, Double> metrics = new HashMap<>();
metrics.put(MetricNames.ACTIVE_THREADS, new
Double(status.getActiveThreadCount()));
@@ -60,26 +64,6 @@ public class MetricsService {
return metrics;
}
- public Map<String,String> getPortStatusTags(PortStatus status) {
- final Map<String, String> portTags = new HashMap<>();
- portTags.put(MetricNames.PORT_ID, status.getId());
- portTags.put(MetricNames.PORT_GROUP_ID, status.getGroupId());
- portTags.put(MetricNames.PORT_NAME, status.getName());
- return portTags;
- }
-
- public Map<String,String> getConnectionStatusTags(ConnectionStatus status)
{
- final Map<String, String> connectionTags = new HashMap<>();
- connectionTags.put(MetricNames.CONNECTION_ID, status.getId());
- connectionTags.put(MetricNames.CONNECTION_NAME, status.getName());
- connectionTags.put(MetricNames.CONNECTION_GROUP_ID,
status.getGroupId());
- connectionTags.put(MetricNames.CONNECTION_DESTINATION_ID,
status.getDestinationId());
- connectionTags.put(MetricNames.CONNECTTION_DESTINATION_NAME,
status.getDestinationName());
- connectionTags.put(MetricNames.CONNECTION_SOURCE_ID,
status.getSourceId());
- connectionTags.put(MetricNames.CONNECTION_SOURCE_NAME,
status.getSourceName());
- return connectionTags;
- }
-
public Map<String, Double> getConnectionStatusMetrics(ConnectionStatus
status) {
final Map<String, Double> metrics = new HashMap<>();
metrics.put(MetricNames.INPUT_COUNT, new
Double(status.getInputCount()));
@@ -109,23 +93,6 @@ public class MetricsService {
return metrics;
}
- public List<String> getAllTagsList() {
- List<String> tagsList = new ArrayList<>();
- tagsList.add("env");
- tagsList.add("dataflow_id");
- tagsList.add(MetricNames.PORT_ID);
- tagsList.add(MetricNames.PORT_NAME);
- tagsList.add(MetricNames.PORT_GROUP_ID);
- tagsList.add(MetricNames.CONNECTION_ID);
- tagsList.add(MetricNames.CONNECTION_NAME);
- tagsList.add(MetricNames.CONNECTION_GROUP_ID);
- tagsList.add(MetricNames.CONNECTION_SOURCE_ID);
- tagsList.add(MetricNames.CONNECTION_SOURCE_NAME);
- tagsList.add(MetricNames.CONNECTION_DESTINATION_ID);
- tagsList.add(MetricNames.CONNECTTION_DESTINATION_NAME);
- return tagsList;
- }
-
//virtual machine metrics
public Map<String, Double> getJVMMetrics(JmxJvmMetrics
virtualMachineMetrics) {
final Map<String, Double> metrics = new HashMap<>();
diff --git
a/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/test/java/org/apache/nifi/reporting/datadog/TestDataDogReportingTask.java
b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/test/java/org/apache/nifi/reporting/datadog/TestDataDogReportingTask.java
index 2b5de42..71f7227 100644
---
a/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/test/java/org/apache/nifi/reporting/datadog/TestDataDogReportingTask.java
+++
b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/test/java/org/apache/nifi/reporting/datadog/TestDataDogReportingTask.java
@@ -18,7 +18,6 @@ package org.apache.nifi.reporting.datadog;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
-import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.AtomicDouble;
import org.apache.nifi.controller.ConfigurationContext;
@@ -119,13 +118,13 @@ public class TestDataDogReportingTask {
DataDogReportingTask dataDogReportingTask = new
TestableDataDogReportingTask();
dataDogReportingTask.initialize(initContext);
dataDogReportingTask.setup(configurationContext);
- dataDogReportingTask.updateMetrics(processorMetrics,
Optional.of("sampleProcessor"), tagsMap);
+ dataDogReportingTask.updateMetrics(processorMetrics, tagsMap);
-
verify(metricRegistry).register(eq("nifi.sampleProcessor.FlowFilesReceivedLast5Minutes"),
Mockito.<Gauge>any());
-
verify(metricRegistry).register(eq("nifi.sampleProcessor.ActiveThreads"),
Mockito.<Gauge>any());
-
verify(metricRegistry).register(eq("nifi.sampleProcessor.BytesWrittenLast5Minutes"),
Mockito.<Gauge>any());
-
verify(metricRegistry).register(eq("nifi.sampleProcessor.BytesReadLast5Minutes"),
Mockito.<Gauge>any());
-
verify(metricRegistry).register(eq("nifi.sampleProcessor.FlowFilesSentLast5Minutes"),
Mockito.<Gauge>any());
+
verify(metricRegistry).register(eq("nifi.FlowFilesReceivedLast5Minutes"),
Mockito.<Gauge>any());
+ verify(metricRegistry).register(eq("nifi.ActiveThreads"),
Mockito.<Gauge>any());
+ verify(metricRegistry).register(eq("nifi.BytesWrittenLast5Minutes"),
Mockito.<Gauge>any());
+ verify(metricRegistry).register(eq("nifi.BytesReadLast5Minutes"),
Mockito.<Gauge>any());
+ verify(metricRegistry).register(eq("nifi.FlowFilesSentLast5Minutes"),
Mockito.<Gauge>any());
}
//test updating JMV metrics
@@ -139,17 +138,17 @@ public class TestDataDogReportingTask {
dataDogReportingTask.initialize(initContext);
dataDogReportingTask.setup(configurationContext);
- dataDogReportingTask.updateMetrics(processorMetrics,
Optional.<String>absent(), tagsMap);
- verify(metricRegistry).register(eq("nifi.flow.jvm.heap_usage"),
Mockito.<Gauge>any());
- verify(metricRegistry).register(eq("nifi.flow.jvm.thread_count"),
Mockito.<Gauge>any());
-
verify(metricRegistry).register(eq("nifi.flow.jvm.thread_states.terminated"),
Mockito.<Gauge>any());
- verify(metricRegistry).register(eq("nifi.flow.jvm.heap_used"),
Mockito.<Gauge>any());
-
verify(metricRegistry).register(eq("nifi.flow.jvm.thread_states.runnable"),
Mockito.<Gauge>any());
-
verify(metricRegistry).register(eq("nifi.flow.jvm.thread_states.timed_waiting"),
Mockito.<Gauge>any());
- verify(metricRegistry).register(eq("nifi.flow.jvm.uptime"),
Mockito.<Gauge>any());
-
verify(metricRegistry).register(eq("nifi.flow.jvm.daemon_thread_count"),
Mockito.<Gauge>any());
-
verify(metricRegistry).register(eq("nifi.flow.jvm.file_descriptor_usage"),
Mockito.<Gauge>any());
-
verify(metricRegistry).register(eq("nifi.flow.jvm.thread_states.blocked"),
Mockito.<Gauge>any());
+ dataDogReportingTask.updateMetrics(processorMetrics, tagsMap);
+ verify(metricRegistry).register(eq("nifi.jvm.heap_usage"),
Mockito.<Gauge>any());
+ verify(metricRegistry).register(eq("nifi.jvm.thread_count"),
Mockito.<Gauge>any());
+
verify(metricRegistry).register(eq("nifi.jvm.thread_states.terminated"),
Mockito.<Gauge>any());
+ verify(metricRegistry).register(eq("nifi.jvm.heap_used"),
Mockito.<Gauge>any());
+ verify(metricRegistry).register(eq("nifi.jvm.thread_states.runnable"),
Mockito.<Gauge>any());
+
verify(metricRegistry).register(eq("nifi.jvm.thread_states.timed_waiting"),
Mockito.<Gauge>any());
+ verify(metricRegistry).register(eq("nifi.jvm.uptime"),
Mockito.<Gauge>any());
+ verify(metricRegistry).register(eq("nifi.jvm.daemon_thread_count"),
Mockito.<Gauge>any());
+ verify(metricRegistry).register(eq("nifi.jvm.file_descriptor_usage"),
Mockito.<Gauge>any());
+ verify(metricRegistry).register(eq("nifi.jvm.thread_states.blocked"),
Mockito.<Gauge>any());
}