http://git-wip-us.apache.org/repos/asf/nifi/blob/376d3c4e/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/test/java/org/apache/nifi/reporting/datadog/TestDataDogReportingTask.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/test/java/org/apache/nifi/reporting/datadog/TestDataDogReportingTask.java
 
b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/test/java/org/apache/nifi/reporting/datadog/TestDataDogReportingTask.java
new file mode 100644
index 0000000..b785d40
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/test/java/org/apache/nifi/reporting/datadog/TestDataDogReportingTask.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.datadog;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.MetricRegistry;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.AtomicDouble;
+import com.yammer.metrics.core.VirtualMachineMetrics;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.reporting.EventAccess;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.reporting.ReportingContext;
+import org.apache.nifi.reporting.ReportingInitializationContext;
+import org.apache.nifi.reporting.datadog.metrics.MetricsService;
+import org.apache.nifi.util.MockPropertyValue;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.logging.Logger;
+
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.verify;
+
+
+public class TestDataDogReportingTask {
+
+    private ProcessGroupStatus status;
+    private ProcessorStatus procStatus;
+    private ConcurrentHashMap<String, AtomicDouble> metricsMap;
+    private MetricRegistry metricRegistry;
+    private MetricsService metricsService;
+    private String env = "dev";
+    private String prefix = "nifi";
+    private ReportingContext context;
+    private ReportingInitializationContext initContext;
+    private ConfigurationContext configurationContext;
+    private volatile VirtualMachineMetrics virtualMachineMetrics;
+    private Logger logger;
+
+    @Before
+    public void setup() {
+        initProcessGroupStatus();
+        initProcessorStatuses();
+        initContexts();
+    }
+
+    //init all contexts
+    private void initContexts() {
+        configurationContext = Mockito.mock(ConfigurationContext.class);
+        context = Mockito.mock(ReportingContext.class);
+        Mockito.when(context.getProperty(DataDogReportingTask.ENVIRONMENT))
+                .thenReturn(new MockPropertyValue(env, null));
+        Mockito.when(context.getProperty(DataDogReportingTask.METRICS_PREFIX))
+                .thenReturn(new MockPropertyValue(prefix, null));
+        Mockito.when(context.getProperty(DataDogReportingTask.API_KEY))
+                .thenReturn(new MockPropertyValue("agent", null));
+        
Mockito.when(context.getProperty(DataDogReportingTask.DATADOG_TRANSPORT))
+                .thenReturn(new MockPropertyValue("DataDog Agent", null));
+        EventAccess eventAccess = Mockito.mock(EventAccess.class);
+        Mockito.when(eventAccess.getControllerStatus()).thenReturn(status);
+        Mockito.when(context.getEventAccess()).thenReturn(eventAccess);
+
+        logger = Mockito.mock(Logger.class);
+        initContext = Mockito.mock(ReportingInitializationContext.class);
+        
Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString());
+        //Mockito.when(initContext.getLogger()).thenReturn(logger);
+        metricsMap = new ConcurrentHashMap<>();
+        metricRegistry = Mockito.mock(MetricRegistry.class);
+        virtualMachineMetrics = VirtualMachineMetrics.getInstance();
+        metricsService = Mockito.mock(MetricsService.class);
+
+    }
+
+    //test onTrigger method
+    @Test
+    public void testOnTrigger() throws InitializationException, IOException {
+        DataDogReportingTask dataDogReportingTask = new 
TestableDataDogReportingTask();
+        dataDogReportingTask.initialize(initContext);
+        dataDogReportingTask.setup(configurationContext);
+        dataDogReportingTask.onTrigger(context);
+
+        verify(metricsService, 
atLeast(1)).getProcessorMetrics(Mockito.<ProcessorStatus>any());
+        verify(metricsService, 
atLeast(1)).getJVMMetrics(Mockito.<VirtualMachineMetrics>any());
+    }
+
+
+    //test updating metrics of processors
+    @Test
+    public void testUpdateMetricsProcessor() throws InitializationException, 
IOException {
+        MetricsService ms = new MetricsService();
+        Map<String, Double> processorMetrics = 
ms.getProcessorMetrics(procStatus);
+        Map<String, String> tagsMap = ImmutableMap.of("env", "test");
+        DataDogReportingTask dataDogReportingTask = new 
TestableDataDogReportingTask();
+        dataDogReportingTask.initialize(initContext);
+        dataDogReportingTask.setup(configurationContext);
+        dataDogReportingTask.updateMetrics(processorMetrics, 
Optional.of("sampleProcessor"), tagsMap);
+
+        
verify(metricRegistry).register(eq("nifi.sampleProcessor.FlowFilesReceivedLast5Minutes"),
 Mockito.<Gauge>any());
+        
verify(metricRegistry).register(eq("nifi.sampleProcessor.ActiveThreads"), 
Mockito.<Gauge>any());
+        
verify(metricRegistry).register(eq("nifi.sampleProcessor.BytesWrittenLast5Minutes"),
 Mockito.<Gauge>any());
+        
verify(metricRegistry).register(eq("nifi.sampleProcessor.BytesReadLast5Minutes"),
 Mockito.<Gauge>any());
+        
verify(metricRegistry).register(eq("nifi.sampleProcessor.FlowFilesSentLast5Minutes"),
 Mockito.<Gauge>any());
+    }
+
+    //test updating JMV metrics
+    @Test
+    public void testUpdateMetricsJVM() throws InitializationException, 
IOException {
+        MetricsService ms = new MetricsService();
+        Map<String, Double> processorMetrics = 
ms.getJVMMetrics(virtualMachineMetrics);
+        Map<String, String> tagsMap = ImmutableMap.of("env", "test");
+
+        DataDogReportingTask dataDogReportingTask = new 
TestableDataDogReportingTask();
+        dataDogReportingTask.initialize(initContext);
+        dataDogReportingTask.setup(configurationContext);
+
+        dataDogReportingTask.updateMetrics(processorMetrics, 
Optional.<String>absent(), tagsMap);
+        verify(metricRegistry).register(eq("nifi.flow.jvm.heap_usage"), 
Mockito.<Gauge>any());
+        verify(metricRegistry).register(eq("nifi.flow.jvm.thread_count"), 
Mockito.<Gauge>any());
+        
verify(metricRegistry).register(eq("nifi.flow.jvm.thread_states.terminated"), 
Mockito.<Gauge>any());
+        verify(metricRegistry).register(eq("nifi.flow.jvm.heap_used"), 
Mockito.<Gauge>any());
+        
verify(metricRegistry).register(eq("nifi.flow.jvm.thread_states.runnable"), 
Mockito.<Gauge>any());
+        
verify(metricRegistry).register(eq("nifi.flow.jvm.thread_states.timed_waiting"),
 Mockito.<Gauge>any());
+        verify(metricRegistry).register(eq("nifi.flow.jvm.uptime"), 
Mockito.<Gauge>any());
+        
verify(metricRegistry).register(eq("nifi.flow.jvm.daemon_thread_count"), 
Mockito.<Gauge>any());
+        
verify(metricRegistry).register(eq("nifi.flow.jvm.file_descriptor_usage"), 
Mockito.<Gauge>any());
+        
verify(metricRegistry).register(eq("nifi.flow.jvm.thread_states.blocked"), 
Mockito.<Gauge>any());
+    }
+
+
+    private void initProcessGroupStatus() {
+        status = new ProcessGroupStatus();
+        status.setId("1234");
+        status.setFlowFilesReceived(5);
+        status.setBytesReceived(10000);
+        status.setFlowFilesSent(10);
+        status.setBytesSent(20000);
+        status.setQueuedCount(100);
+        status.setQueuedContentSize(1024L);
+        status.setBytesRead(60000L);
+        status.setBytesWritten(80000L);
+        status.setActiveThreadCount(5);
+        status.setInputCount(2);
+        status.setOutputCount(4);
+    }
+
+    private void initProcessorStatuses() {
+        procStatus = new ProcessorStatus();
+        procStatus.setProcessingNanos(123456789);
+        procStatus.setInputCount(2);
+        procStatus.setOutputCount(4);
+        procStatus.setActiveThreadCount(6);
+        procStatus.setBytesSent(1256);
+        procStatus.setName("sampleProcessor");
+        Collection<ProcessorStatus> processorStatuses = new ArrayList<>();
+        processorStatuses.add(procStatus);
+        status.setProcessorStatus(processorStatuses);
+
+        ProcessGroupStatus groupStatus = new ProcessGroupStatus();
+        groupStatus.setProcessorStatus(processorStatuses);
+
+        Collection<ProcessGroupStatus> groupStatuses = new ArrayList<>();
+        groupStatuses.add(groupStatus);
+        status.setProcessGroupStatus(groupStatuses);
+    }
+
+    private class TestableDataDogReportingTask extends DataDogReportingTask {
+        @Override
+        protected MetricsService getMetricsService() {
+            return metricsService;
+        }
+
+        @Override
+        protected DDMetricRegistryBuilder getMetricRegistryBuilder() {
+            return new DDMetricRegistryBuilder();
+        }
+
+        @Override
+        protected MetricRegistry getMetricRegistry() {
+            return metricRegistry;
+        }
+
+        @Override
+        protected ConcurrentHashMap<String, AtomicDouble> getMetricsMap() {
+            return metricsMap;
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/376d3c4e/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/test/java/org/apache/nifi/reporting/datadog/TestMetricsService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/test/java/org/apache/nifi/reporting/datadog/TestMetricsService.java
 
b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/test/java/org/apache/nifi/reporting/datadog/TestMetricsService.java
new file mode 100644
index 0000000..f50cbb1
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/test/java/org/apache/nifi/reporting/datadog/TestMetricsService.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.datadog;
+
+import com.yammer.metrics.core.VirtualMachineMetrics;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.reporting.datadog.metrics.MetricNames;
+import org.apache.nifi.reporting.datadog.metrics.MetricsService;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class TestMetricsService {
+
+    private ProcessGroupStatus status;
+    private MetricsService metricsService;
+
+    @Before
+    public void init() {
+        status = new ProcessGroupStatus();
+        metricsService = new MetricsService();
+        status.setId("1234");
+        status.setFlowFilesReceived(5);
+        status.setBytesReceived(10000);
+        status.setFlowFilesSent(10);
+        status.setBytesSent(20000);
+        status.setQueuedCount(100);
+        status.setQueuedContentSize(1024L);
+        status.setBytesRead(60000L);
+        status.setBytesWritten(80000L);
+        status.setActiveThreadCount(5);
+    }
+
+    //test group status metric retreiveing
+    @Test
+    public void testGetProcessGroupStatusMetrics() {
+        ProcessorStatus procStatus = new ProcessorStatus();
+        List<ProcessorStatus> processorStatuses = new ArrayList<>();
+        processorStatuses.add(procStatus);
+        status.setProcessorStatus(processorStatuses);
+
+        final Map<String, Double> metrics = 
metricsService.getDataFlowMetrics(status);
+
+        
Assert.assertTrue(metrics.containsKey(MetricNames.FLOW_FILES_RECEIVED));
+        Assert.assertTrue(metrics.containsKey(MetricNames.BYTES_RECEIVED));
+        Assert.assertTrue(metrics.containsKey(MetricNames.FLOW_FILES_SENT));
+        Assert.assertTrue(metrics.containsKey(MetricNames.BYTES_SENT));
+        Assert.assertTrue(metrics.containsKey(MetricNames.FLOW_FILES_QUEUED));
+        Assert.assertTrue(metrics.containsKey(MetricNames.BYTES_QUEUED));
+        Assert.assertTrue(metrics.containsKey(MetricNames.BYTES_READ));
+        Assert.assertTrue(metrics.containsKey(MetricNames.BYTES_WRITTEN));
+        Assert.assertTrue(metrics.containsKey(MetricNames.ACTIVE_THREADS));
+    }
+
+    //test processor status metric retreiveing
+    @Test
+    public void testGetProcessorGroupStatusMetrics() {
+        ProcessorStatus procStatus = new ProcessorStatus();
+        List<ProcessorStatus> processorStatuses = new ArrayList<>();
+        processorStatuses.add(procStatus);
+        status.setProcessorStatus(processorStatuses);
+
+        final Map<String, Double> metrics = 
metricsService.getProcessorMetrics(procStatus);
+
+        Assert.assertTrue(metrics.containsKey(MetricNames.BYTES_READ));
+        Assert.assertTrue(metrics.containsKey(MetricNames.BYTES_WRITTEN));
+        
Assert.assertTrue(metrics.containsKey(MetricNames.FLOW_FILES_RECEIVED));
+        Assert.assertTrue(metrics.containsKey(MetricNames.FLOW_FILES_SENT));
+        Assert.assertTrue(metrics.containsKey(MetricNames.ACTIVE_THREADS));
+    }
+
+    //test JVM status metric retreiveing
+    @Test
+    public void testGetVirtualMachineMetrics() {
+        final VirtualMachineMetrics virtualMachineMetrics = 
VirtualMachineMetrics.getInstance();
+
+        final Map<String, Double> metrics = 
metricsService.getJVMMetrics(virtualMachineMetrics);
+        Assert.assertTrue(metrics.containsKey(MetricNames.JVM_UPTIME));
+        Assert.assertTrue(metrics.containsKey(MetricNames.JVM_HEAP_USED));
+        Assert.assertTrue(metrics.containsKey(MetricNames.JVM_HEAP_USAGE));
+        Assert.assertTrue(metrics.containsKey(MetricNames.JVM_NON_HEAP_USAGE));
+        
Assert.assertTrue(metrics.containsKey(MetricNames.JVM_THREAD_STATES_RUNNABLE));
+        
Assert.assertTrue(metrics.containsKey(MetricNames.JVM_THREAD_STATES_BLOCKED));
+        
Assert.assertTrue(metrics.containsKey(MetricNames.JVM_THREAD_STATES_TIMED_WAITING));
+        
Assert.assertTrue(metrics.containsKey(MetricNames.JVM_THREAD_STATES_TERMINATED));
+        Assert.assertTrue(metrics.containsKey(MetricNames.JVM_THREAD_COUNT));
+        
Assert.assertTrue(metrics.containsKey(MetricNames.JVM_DAEMON_THREAD_COUNT));
+        
Assert.assertTrue(metrics.containsKey(MetricNames.JVM_FILE_DESCRIPTOR_USAGE));
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/376d3c4e/nifi-nar-bundles/nifi-datadog-bundle/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-datadog-bundle/pom.xml 
b/nifi-nar-bundles/nifi-datadog-bundle/pom.xml
new file mode 100644
index 0000000..eb6b87c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-datadog-bundle/pom.xml
@@ -0,0 +1,41 @@
+<?xml version="1.0"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+    <!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-nar-bundles</artifactId>
+        <version>1.0.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-datadog-bundle</artifactId>
+    <packaging>pom</packaging>
+
+    <modules>
+        <module>nifi-datadog-reporting-task</module>
+        <module>nifi-datadog-nar</module>
+    </modules>
+    
+    <dependencyManagement>
+        <dependencies>
+            <dependency>
+                <groupId>org.glassfish.jersey.core</groupId>
+                <artifactId>jersey-client</artifactId>
+                <version>2.19</version>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/376d3c4e/nifi-nar-bundles/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml
old mode 100644
new mode 100755
index 6ebd2da..f785c87
--- a/nifi-nar-bundles/pom.xml
+++ b/nifi-nar-bundles/pom.xml
@@ -66,6 +66,7 @@
         <module>nifi-evtx-bundle</module>
         <module>nifi-slack-bundle</module>
         <module>nifi-snmp-bundle</module>
+        <module>nifi-datadog-bundle</module>
         <module>nifi-windows-event-log-bundle</module>
         <module>nifi-ignite-bundle</module>
         <module>nifi-email-bundle</module>

Reply via email to