This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 66e2741  NIFI-5535 NIFI-4713 This closes #4858. Change metric tagging 
in DataDogReportingTask
66e2741 is described below

commit 66e2741871e4de25631ffe32b961a622ca5e3ceb
Author: Florimond Manca <[email protected]>
AuthorDate: Fri Oct 30 17:26:44 2020 +0100

    NIFI-5535 NIFI-4713 This closes #4858. Change metric tagging in 
DataDogReportingTask
    
    Remove low-value default tags
    
    Send processor name as tag instead of as part of metric name
    
    Signed-off-by: Joe Witt <[email protected]>
---
 .../reporting/datadog/DDMetricRegistryBuilder.java | 11 +-----
 .../reporting/datadog/DataDogReportingTask.java    | 36 +++++++----------
 .../reporting/datadog/metrics/MetricNames.java     | 14 -------
 .../reporting/datadog/metrics/MetricsService.java  | 45 +++-------------------
 .../datadog/TestDataDogReportingTask.java          | 35 ++++++++---------
 5 files changed, 38 insertions(+), 103 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/DDMetricRegistryBuilder.java
 
b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/DDMetricRegistryBuilder.java
index c00eec5..f53604d 100644
--- 
a/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/DDMetricRegistryBuilder.java
+++ 
b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/DDMetricRegistryBuilder.java
@@ -24,8 +24,6 @@ import org.coursera.metrics.datadog.transport.UdpTransport;
 
 import java.io.IOException;
 import java.net.InetAddress;
-import java.util.Arrays;
-import java.util.List;
 
 /**
  * Class configures MetricRegistry (passed outside or created from scratch) 
with Datadog support
@@ -34,7 +32,6 @@ public class DDMetricRegistryBuilder {
 
 
     private MetricRegistry metricRegistry = null;
-    private List<String> tags = Arrays.asList();
     private DatadogReporter datadogReporter;
     private String apiKey = "";
     private Transport transport;
@@ -44,11 +41,6 @@ public class DDMetricRegistryBuilder {
         return this;
     }
 
-    public DDMetricRegistryBuilder setTags(List<String> tags) {
-        this.tags = tags;
-        return this;
-    }
-
     public DatadogReporter getDatadogReporter() {
         return datadogReporter;
     }
@@ -86,8 +78,7 @@ public class DDMetricRegistryBuilder {
                 DatadogReporter.forRegistry(metricRegistry)
                         .withHost(InetAddress.getLocalHost().getHostName())
                         .withTransport(transport)
-                        .withTags(tags)
                         .build();
         return reporter;
     }
-}
\ No newline at end of file
+}
diff --git 
a/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/DataDogReportingTask.java
 
b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/DataDogReportingTask.java
index 3747571..8ea2da0 100644
--- 
a/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/DataDogReportingTask.java
+++ 
b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/DataDogReportingTask.java
@@ -18,7 +18,6 @@ package org.apache.nifi.reporting.datadog;
 
 import com.codahale.metrics.Gauge;
 import com.codahale.metrics.MetricRegistry;
-import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.AtomicDouble;
@@ -114,8 +113,7 @@ public class DataDogReportingTask extends 
AbstractReportingTask {
         metricsPrefix = METRICS_PREFIX.getDefaultValue();
         environment = ENVIRONMENT.getDefaultValue();
         virtualMachineMetrics = JmxJvmMetrics.getInstance();
-        ddMetricRegistryBuilder.setMetricRegistry(metricRegistry)
-                .setTags(metricsService.getAllTagsList());
+        ddMetricRegistryBuilder.setMetricRegistry(metricRegistry);
     }
 
     @Override
@@ -145,9 +143,9 @@ public class DataDogReportingTask extends 
AbstractReportingTask {
         ddMetricRegistryBuilder.getDatadogReporter().report();
     }
 
-    protected void updateMetrics(Map<String, Double> metrics, Optional<String> 
processorName, Map<String, String> tags) {
+    protected void updateMetrics(Map<String, Double> metrics, Map<String, 
String> tags) {
         for (Map.Entry<String, Double> entry : metrics.entrySet()) {
-            final String metricName = buildMetricName(processorName, 
entry.getKey());
+            final String metricName = buildMetricName(entry.getKey());
             logger.debug(metricName + ": " + entry.getValue());
             //if metric is not registered yet - register it
             if (!metricsMap.containsKey(metricName)) {
@@ -163,37 +161,31 @@ public class DataDogReportingTask extends 
AbstractReportingTask {
         final List<ProcessorStatus> processorStatuses = new ArrayList<>();
         populateProcessorStatuses(processGroupStatus, processorStatuses);
         for (final ProcessorStatus processorStatus : processorStatuses) {
-            updateMetrics(metricsService.getProcessorMetrics(processorStatus),
-                    Optional.of(processorStatus.getName()), defaultTags);
+            final Map<String, String> processorTags = new 
HashMap<>(defaultTags);
+            
processorTags.putAll(metricsService.getProcessorTags(processorStatus));
+            updateMetrics(metricsService.getProcessorMetrics(processorStatus), 
processorTags);
         }
 
         final List<ConnectionStatus> connectionStatuses = new ArrayList<>();
         populateConnectionStatuses(processGroupStatus, connectionStatuses);
         for (ConnectionStatus connectionStatus: connectionStatuses) {
-            Map<String, String> connectionStatusTags = new 
HashMap<>(defaultTags);
-            
connectionStatusTags.putAll(metricsService.getConnectionStatusTags(connectionStatus));
-            
updateMetrics(metricsService.getConnectionStatusMetrics(connectionStatus), 
Optional.<String>absent(), connectionStatusTags);
+            
updateMetrics(metricsService.getConnectionStatusMetrics(connectionStatus), 
defaultTags);
         }
 
         final List<PortStatus> inputPortStatuses = new ArrayList<>();
         populateInputPortStatuses(processGroupStatus, inputPortStatuses);
         for (PortStatus portStatus: inputPortStatuses) {
-            Map<String, String> portTags = new HashMap<>(defaultTags);
-            portTags.putAll(metricsService.getPortStatusTags(portStatus));
-            updateMetrics(metricsService.getPortStatusMetrics(portStatus), 
Optional.<String>absent(), portTags);
+            updateMetrics(metricsService.getPortStatusMetrics(portStatus), 
defaultTags);
         }
 
         final List<PortStatus> outputPortStatuses = new ArrayList<>();
         populateOutputPortStatuses(processGroupStatus, outputPortStatuses);
         for (PortStatus portStatus: outputPortStatuses) {
-            Map<String, String> portTags = new HashMap<>(defaultTags);
-            portTags.putAll(metricsService.getPortStatusTags(portStatus));
-            updateMetrics(metricsService.getPortStatusMetrics(portStatus), 
Optional.<String>absent(), portTags);
+            updateMetrics(metricsService.getPortStatusMetrics(portStatus), 
defaultTags);
         }
 
-        updateMetrics(metricsService.getJVMMetrics(virtualMachineMetrics),
-                Optional.<String>absent(), defaultTags);
-        updateMetrics(metricsService.getDataFlowMetrics(processGroupStatus), 
Optional.<String>absent(), defaultTags);
+        updateMetrics(metricsService.getJVMMetrics(virtualMachineMetrics), 
defaultTags);
+        updateMetrics(metricsService.getDataFlowMetrics(processGroupStatus), 
defaultTags);
     }
 
     private class MetricGauge implements Gauge, DynamicTagsCallback {
@@ -258,8 +250,8 @@ public class DataDogReportingTask extends 
AbstractReportingTask {
         }
     }
 
-    private String buildMetricName(Optional<String> processorName, String 
metricName) {
-        return metricsPrefix + "." + processorName.or("flow") + "." + 
metricName;
+    private String buildMetricName(String metricName) {
+        return metricsPrefix + "." + metricName;
     }
 
     protected MetricsService getMetricsService() {
@@ -277,4 +269,4 @@ public class DataDogReportingTask extends 
AbstractReportingTask {
     protected ConcurrentHashMap<String, AtomicDouble> getMetricsMap() {
         return new ConcurrentHashMap<>();
     }
-}
\ No newline at end of file
+}
diff --git 
a/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/metrics/MetricNames.java
 
b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/metrics/MetricNames.java
index b176a33..58f5e34 100644
--- 
a/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/metrics/MetricNames.java
+++ 
b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/metrics/MetricNames.java
@@ -57,18 +57,4 @@ public interface MetricNames {
     //Connection status metrics
     String QUEUED_COUNT = "QueuedCount";
     String QUEUED_BYTES = "QueuedBytes";
-
-    //Port status tags
-    String PORT_ID = "port-id";
-    String PORT_GROUP_ID = "port-group-id";
-    String PORT_NAME = "port-name";
-
-    //Connection status tags
-    String CONNECTION_ID = "connection-id";
-    String CONNECTION_GROUP_ID = "connection-group-id";
-    String CONNECTION_NAME = "connection-name";
-    String CONNECTION_SOURCE_ID = "connection-source-id";
-    String CONNECTION_SOURCE_NAME = "connection-source-name";
-    String CONNECTION_DESTINATION_ID = "connection-destination-id";
-    String CONNECTTION_DESTINATION_NAME = "connection-destination-name";
 }
diff --git 
a/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/metrics/MetricsService.java
 
b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/metrics/MetricsService.java
index 3e26073..6c743b6 100644
--- 
a/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/metrics/MetricsService.java
+++ 
b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/metrics/MetricsService.java
@@ -23,9 +23,7 @@ import org.apache.nifi.controller.status.ProcessorStatus;
 import org.apache.nifi.metrics.jvm.JmxJvmMetrics;
 import org.apache.nifi.processor.DataUnit;
 
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
@@ -46,6 +44,12 @@ public class MetricsService {
         return metrics;
     }
 
+    public Map<String, String> getProcessorTags(ProcessorStatus status) {
+        Map<String, String> tags = new HashMap<>();
+        tags.put("processor", status.getName());
+        return tags;
+    }
+
     public Map<String, Double> getPortStatusMetrics(PortStatus status){
         final Map<String, Double> metrics = new HashMap<>();
         metrics.put(MetricNames.ACTIVE_THREADS, new 
Double(status.getActiveThreadCount()));
@@ -60,26 +64,6 @@ public class MetricsService {
         return metrics;
     }
 
-    public Map<String,String> getPortStatusTags(PortStatus status) {
-        final Map<String, String> portTags = new HashMap<>();
-        portTags.put(MetricNames.PORT_ID, status.getId());
-        portTags.put(MetricNames.PORT_GROUP_ID, status.getGroupId());
-        portTags.put(MetricNames.PORT_NAME, status.getName());
-        return portTags;
-    }
-
-    public Map<String,String> getConnectionStatusTags(ConnectionStatus status) 
{
-        final Map<String, String> connectionTags = new HashMap<>();
-        connectionTags.put(MetricNames.CONNECTION_ID, status.getId());
-        connectionTags.put(MetricNames.CONNECTION_NAME, status.getName());
-        connectionTags.put(MetricNames.CONNECTION_GROUP_ID, 
status.getGroupId());
-        connectionTags.put(MetricNames.CONNECTION_DESTINATION_ID, 
status.getDestinationId());
-        connectionTags.put(MetricNames.CONNECTTION_DESTINATION_NAME, 
status.getDestinationName());
-        connectionTags.put(MetricNames.CONNECTION_SOURCE_ID, 
status.getSourceId());
-        connectionTags.put(MetricNames.CONNECTION_SOURCE_NAME, 
status.getSourceName());
-        return connectionTags;
-    }
-
     public Map<String, Double> getConnectionStatusMetrics(ConnectionStatus 
status) {
         final Map<String, Double> metrics = new HashMap<>();
         metrics.put(MetricNames.INPUT_COUNT, new 
Double(status.getInputCount()));
@@ -109,23 +93,6 @@ public class MetricsService {
         return metrics;
     }
 
-    public List<String> getAllTagsList() {
-        List<String> tagsList = new ArrayList<>();
-        tagsList.add("env");
-        tagsList.add("dataflow_id");
-        tagsList.add(MetricNames.PORT_ID);
-        tagsList.add(MetricNames.PORT_NAME);
-        tagsList.add(MetricNames.PORT_GROUP_ID);
-        tagsList.add(MetricNames.CONNECTION_ID);
-        tagsList.add(MetricNames.CONNECTION_NAME);
-        tagsList.add(MetricNames.CONNECTION_GROUP_ID);
-        tagsList.add(MetricNames.CONNECTION_SOURCE_ID);
-        tagsList.add(MetricNames.CONNECTION_SOURCE_NAME);
-        tagsList.add(MetricNames.CONNECTION_DESTINATION_ID);
-        tagsList.add(MetricNames.CONNECTTION_DESTINATION_NAME);
-        return tagsList;
-    }
-
     //virtual machine metrics
     public Map<String, Double> getJVMMetrics(JmxJvmMetrics 
virtualMachineMetrics) {
         final Map<String, Double> metrics = new HashMap<>();
diff --git 
a/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/test/java/org/apache/nifi/reporting/datadog/TestDataDogReportingTask.java
 
b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/test/java/org/apache/nifi/reporting/datadog/TestDataDogReportingTask.java
index 2b5de42..71f7227 100644
--- 
a/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/test/java/org/apache/nifi/reporting/datadog/TestDataDogReportingTask.java
+++ 
b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/test/java/org/apache/nifi/reporting/datadog/TestDataDogReportingTask.java
@@ -18,7 +18,6 @@ package org.apache.nifi.reporting.datadog;
 
 import com.codahale.metrics.Gauge;
 import com.codahale.metrics.MetricRegistry;
-import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.AtomicDouble;
 import org.apache.nifi.controller.ConfigurationContext;
@@ -119,13 +118,13 @@ public class TestDataDogReportingTask {
         DataDogReportingTask dataDogReportingTask = new 
TestableDataDogReportingTask();
         dataDogReportingTask.initialize(initContext);
         dataDogReportingTask.setup(configurationContext);
-        dataDogReportingTask.updateMetrics(processorMetrics, 
Optional.of("sampleProcessor"), tagsMap);
+        dataDogReportingTask.updateMetrics(processorMetrics, tagsMap);
 
-        
verify(metricRegistry).register(eq("nifi.sampleProcessor.FlowFilesReceivedLast5Minutes"),
 Mockito.<Gauge>any());
-        
verify(metricRegistry).register(eq("nifi.sampleProcessor.ActiveThreads"), 
Mockito.<Gauge>any());
-        
verify(metricRegistry).register(eq("nifi.sampleProcessor.BytesWrittenLast5Minutes"),
 Mockito.<Gauge>any());
-        
verify(metricRegistry).register(eq("nifi.sampleProcessor.BytesReadLast5Minutes"),
 Mockito.<Gauge>any());
-        
verify(metricRegistry).register(eq("nifi.sampleProcessor.FlowFilesSentLast5Minutes"),
 Mockito.<Gauge>any());
+        
verify(metricRegistry).register(eq("nifi.FlowFilesReceivedLast5Minutes"), 
Mockito.<Gauge>any());
+        verify(metricRegistry).register(eq("nifi.ActiveThreads"), 
Mockito.<Gauge>any());
+        verify(metricRegistry).register(eq("nifi.BytesWrittenLast5Minutes"), 
Mockito.<Gauge>any());
+        verify(metricRegistry).register(eq("nifi.BytesReadLast5Minutes"), 
Mockito.<Gauge>any());
+        verify(metricRegistry).register(eq("nifi.FlowFilesSentLast5Minutes"), 
Mockito.<Gauge>any());
     }
 
     //test updating JMV metrics
@@ -139,17 +138,17 @@ public class TestDataDogReportingTask {
         dataDogReportingTask.initialize(initContext);
         dataDogReportingTask.setup(configurationContext);
 
-        dataDogReportingTask.updateMetrics(processorMetrics, 
Optional.<String>absent(), tagsMap);
-        verify(metricRegistry).register(eq("nifi.flow.jvm.heap_usage"), 
Mockito.<Gauge>any());
-        verify(metricRegistry).register(eq("nifi.flow.jvm.thread_count"), 
Mockito.<Gauge>any());
-        
verify(metricRegistry).register(eq("nifi.flow.jvm.thread_states.terminated"), 
Mockito.<Gauge>any());
-        verify(metricRegistry).register(eq("nifi.flow.jvm.heap_used"), 
Mockito.<Gauge>any());
-        
verify(metricRegistry).register(eq("nifi.flow.jvm.thread_states.runnable"), 
Mockito.<Gauge>any());
-        
verify(metricRegistry).register(eq("nifi.flow.jvm.thread_states.timed_waiting"),
 Mockito.<Gauge>any());
-        verify(metricRegistry).register(eq("nifi.flow.jvm.uptime"), 
Mockito.<Gauge>any());
-        
verify(metricRegistry).register(eq("nifi.flow.jvm.daemon_thread_count"), 
Mockito.<Gauge>any());
-        
verify(metricRegistry).register(eq("nifi.flow.jvm.file_descriptor_usage"), 
Mockito.<Gauge>any());
-        
verify(metricRegistry).register(eq("nifi.flow.jvm.thread_states.blocked"), 
Mockito.<Gauge>any());
+        dataDogReportingTask.updateMetrics(processorMetrics, tagsMap);
+        verify(metricRegistry).register(eq("nifi.jvm.heap_usage"), 
Mockito.<Gauge>any());
+        verify(metricRegistry).register(eq("nifi.jvm.thread_count"), 
Mockito.<Gauge>any());
+        
verify(metricRegistry).register(eq("nifi.jvm.thread_states.terminated"), 
Mockito.<Gauge>any());
+        verify(metricRegistry).register(eq("nifi.jvm.heap_used"), 
Mockito.<Gauge>any());
+        verify(metricRegistry).register(eq("nifi.jvm.thread_states.runnable"), 
Mockito.<Gauge>any());
+        
verify(metricRegistry).register(eq("nifi.jvm.thread_states.timed_waiting"), 
Mockito.<Gauge>any());
+        verify(metricRegistry).register(eq("nifi.jvm.uptime"), 
Mockito.<Gauge>any());
+        verify(metricRegistry).register(eq("nifi.jvm.daemon_thread_count"), 
Mockito.<Gauge>any());
+        verify(metricRegistry).register(eq("nifi.jvm.file_descriptor_usage"), 
Mockito.<Gauge>any());
+        verify(metricRegistry).register(eq("nifi.jvm.thread_states.blocked"), 
Mockito.<Gauge>any());
     }
 
 

Reply via email to