http://git-wip-us.apache.org/repos/asf/nifi/blob/376d3c4e/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/test/java/org/apache/nifi/reporting/datadog/TestDataDogReportingTask.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/test/java/org/apache/nifi/reporting/datadog/TestDataDogReportingTask.java b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/test/java/org/apache/nifi/reporting/datadog/TestDataDogReportingTask.java new file mode 100644 index 0000000..b785d40 --- /dev/null +++ b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/test/java/org/apache/nifi/reporting/datadog/TestDataDogReportingTask.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.reporting.datadog; + +import com.codahale.metrics.Gauge; +import com.codahale.metrics.MetricRegistry; +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.AtomicDouble; +import com.yammer.metrics.core.VirtualMachineMetrics; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.controller.status.ProcessorStatus; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.reporting.EventAccess; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.reporting.ReportingContext; +import org.apache.nifi.reporting.ReportingInitializationContext; +import org.apache.nifi.reporting.datadog.metrics.MetricsService; +import org.apache.nifi.util.MockPropertyValue; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.logging.Logger; + +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.verify; + + +public class TestDataDogReportingTask { + + private ProcessGroupStatus status; + private ProcessorStatus procStatus; + private ConcurrentHashMap<String, AtomicDouble> metricsMap; + private MetricRegistry metricRegistry; + private MetricsService metricsService; + private String env = "dev"; + private String prefix = "nifi"; + private ReportingContext context; + private ReportingInitializationContext initContext; + private ConfigurationContext configurationContext; + private volatile VirtualMachineMetrics virtualMachineMetrics; + private Logger logger; + + @Before + public void setup() { + initProcessGroupStatus(); + initProcessorStatuses(); + initContexts(); + } + + //init all contexts + private void initContexts() { + configurationContext = Mockito.mock(ConfigurationContext.class); + context = Mockito.mock(ReportingContext.class); + Mockito.when(context.getProperty(DataDogReportingTask.ENVIRONMENT)) + .thenReturn(new MockPropertyValue(env, null)); + Mockito.when(context.getProperty(DataDogReportingTask.METRICS_PREFIX)) + .thenReturn(new MockPropertyValue(prefix, null)); + Mockito.when(context.getProperty(DataDogReportingTask.API_KEY)) + .thenReturn(new MockPropertyValue("agent", null)); + Mockito.when(context.getProperty(DataDogReportingTask.DATADOG_TRANSPORT)) + .thenReturn(new MockPropertyValue("DataDog Agent", null)); + EventAccess eventAccess = Mockito.mock(EventAccess.class); + Mockito.when(eventAccess.getControllerStatus()).thenReturn(status); + Mockito.when(context.getEventAccess()).thenReturn(eventAccess); + + logger = Mockito.mock(Logger.class); + initContext = Mockito.mock(ReportingInitializationContext.class); + Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString()); + //Mockito.when(initContext.getLogger()).thenReturn(logger); + metricsMap = new ConcurrentHashMap<>(); + metricRegistry = Mockito.mock(MetricRegistry.class); + virtualMachineMetrics = VirtualMachineMetrics.getInstance(); + metricsService = Mockito.mock(MetricsService.class); + + } + + //test onTrigger method + @Test + public void testOnTrigger() throws InitializationException, IOException { + DataDogReportingTask dataDogReportingTask = new TestableDataDogReportingTask(); + dataDogReportingTask.initialize(initContext); + dataDogReportingTask.setup(configurationContext); + dataDogReportingTask.onTrigger(context); + + verify(metricsService, atLeast(1)).getProcessorMetrics(Mockito.<ProcessorStatus>any()); + verify(metricsService, atLeast(1)).getJVMMetrics(Mockito.<VirtualMachineMetrics>any()); + } + + + //test updating metrics of processors + @Test + public void testUpdateMetricsProcessor() throws InitializationException, IOException { + MetricsService ms = new MetricsService(); + Map<String, Double> processorMetrics = ms.getProcessorMetrics(procStatus); + Map<String, String> tagsMap = ImmutableMap.of("env", "test"); + DataDogReportingTask dataDogReportingTask = new TestableDataDogReportingTask(); + dataDogReportingTask.initialize(initContext); + dataDogReportingTask.setup(configurationContext); + dataDogReportingTask.updateMetrics(processorMetrics, Optional.of("sampleProcessor"), tagsMap); + + verify(metricRegistry).register(eq("nifi.sampleProcessor.FlowFilesReceivedLast5Minutes"), Mockito.<Gauge>any()); + verify(metricRegistry).register(eq("nifi.sampleProcessor.ActiveThreads"), Mockito.<Gauge>any()); + verify(metricRegistry).register(eq("nifi.sampleProcessor.BytesWrittenLast5Minutes"), Mockito.<Gauge>any()); + verify(metricRegistry).register(eq("nifi.sampleProcessor.BytesReadLast5Minutes"), Mockito.<Gauge>any()); + verify(metricRegistry).register(eq("nifi.sampleProcessor.FlowFilesSentLast5Minutes"), Mockito.<Gauge>any()); + } + + //test updating JMV metrics + @Test + public void testUpdateMetricsJVM() throws InitializationException, IOException { + MetricsService ms = new MetricsService(); + Map<String, Double> processorMetrics = ms.getJVMMetrics(virtualMachineMetrics); + Map<String, String> tagsMap = ImmutableMap.of("env", "test"); + + DataDogReportingTask dataDogReportingTask = new TestableDataDogReportingTask(); + dataDogReportingTask.initialize(initContext); + dataDogReportingTask.setup(configurationContext); + + dataDogReportingTask.updateMetrics(processorMetrics, Optional.<String>absent(), tagsMap); + verify(metricRegistry).register(eq("nifi.flow.jvm.heap_usage"), Mockito.<Gauge>any()); + verify(metricRegistry).register(eq("nifi.flow.jvm.thread_count"), Mockito.<Gauge>any()); + verify(metricRegistry).register(eq("nifi.flow.jvm.thread_states.terminated"), Mockito.<Gauge>any()); + verify(metricRegistry).register(eq("nifi.flow.jvm.heap_used"), Mockito.<Gauge>any()); + verify(metricRegistry).register(eq("nifi.flow.jvm.thread_states.runnable"), Mockito.<Gauge>any()); + verify(metricRegistry).register(eq("nifi.flow.jvm.thread_states.timed_waiting"), Mockito.<Gauge>any()); + verify(metricRegistry).register(eq("nifi.flow.jvm.uptime"), Mockito.<Gauge>any()); + verify(metricRegistry).register(eq("nifi.flow.jvm.daemon_thread_count"), Mockito.<Gauge>any()); + verify(metricRegistry).register(eq("nifi.flow.jvm.file_descriptor_usage"), Mockito.<Gauge>any()); + verify(metricRegistry).register(eq("nifi.flow.jvm.thread_states.blocked"), Mockito.<Gauge>any()); + } + + + private void initProcessGroupStatus() { + status = new ProcessGroupStatus(); + status.setId("1234"); + status.setFlowFilesReceived(5); + status.setBytesReceived(10000); + status.setFlowFilesSent(10); + status.setBytesSent(20000); + status.setQueuedCount(100); + status.setQueuedContentSize(1024L); + status.setBytesRead(60000L); + status.setBytesWritten(80000L); + status.setActiveThreadCount(5); + status.setInputCount(2); + status.setOutputCount(4); + } + + private void initProcessorStatuses() { + procStatus = new ProcessorStatus(); + procStatus.setProcessingNanos(123456789); + procStatus.setInputCount(2); + procStatus.setOutputCount(4); + procStatus.setActiveThreadCount(6); + procStatus.setBytesSent(1256); + procStatus.setName("sampleProcessor"); + Collection<ProcessorStatus> processorStatuses = new ArrayList<>(); + processorStatuses.add(procStatus); + status.setProcessorStatus(processorStatuses); + + ProcessGroupStatus groupStatus = new ProcessGroupStatus(); + groupStatus.setProcessorStatus(processorStatuses); + + Collection<ProcessGroupStatus> groupStatuses = new ArrayList<>(); + groupStatuses.add(groupStatus); + status.setProcessGroupStatus(groupStatuses); + } + + private class TestableDataDogReportingTask extends DataDogReportingTask { + @Override + protected MetricsService getMetricsService() { + return metricsService; + } + + @Override + protected DDMetricRegistryBuilder getMetricRegistryBuilder() { + return new DDMetricRegistryBuilder(); + } + + @Override + protected MetricRegistry getMetricRegistry() { + return metricRegistry; + } + + @Override + protected ConcurrentHashMap<String, AtomicDouble> getMetricsMap() { + return metricsMap; + } + + } + +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/376d3c4e/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/test/java/org/apache/nifi/reporting/datadog/TestMetricsService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/test/java/org/apache/nifi/reporting/datadog/TestMetricsService.java b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/test/java/org/apache/nifi/reporting/datadog/TestMetricsService.java new file mode 100644 index 0000000..f50cbb1 --- /dev/null +++ b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/test/java/org/apache/nifi/reporting/datadog/TestMetricsService.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.reporting.datadog; + +import com.yammer.metrics.core.VirtualMachineMetrics; +import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.controller.status.ProcessorStatus; +import org.apache.nifi.reporting.datadog.metrics.MetricNames; +import org.apache.nifi.reporting.datadog.metrics.MetricsService; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class TestMetricsService { + + private ProcessGroupStatus status; + private MetricsService metricsService; + + @Before + public void init() { + status = new ProcessGroupStatus(); + metricsService = new MetricsService(); + status.setId("1234"); + status.setFlowFilesReceived(5); + status.setBytesReceived(10000); + status.setFlowFilesSent(10); + status.setBytesSent(20000); + status.setQueuedCount(100); + status.setQueuedContentSize(1024L); + status.setBytesRead(60000L); + status.setBytesWritten(80000L); + status.setActiveThreadCount(5); + } + + //test group status metric retreiveing + @Test + public void testGetProcessGroupStatusMetrics() { + ProcessorStatus procStatus = new ProcessorStatus(); + List<ProcessorStatus> processorStatuses = new ArrayList<>(); + processorStatuses.add(procStatus); + status.setProcessorStatus(processorStatuses); + + final Map<String, Double> metrics = metricsService.getDataFlowMetrics(status); + + Assert.assertTrue(metrics.containsKey(MetricNames.FLOW_FILES_RECEIVED)); + Assert.assertTrue(metrics.containsKey(MetricNames.BYTES_RECEIVED)); + Assert.assertTrue(metrics.containsKey(MetricNames.FLOW_FILES_SENT)); + Assert.assertTrue(metrics.containsKey(MetricNames.BYTES_SENT)); + Assert.assertTrue(metrics.containsKey(MetricNames.FLOW_FILES_QUEUED)); + Assert.assertTrue(metrics.containsKey(MetricNames.BYTES_QUEUED)); + Assert.assertTrue(metrics.containsKey(MetricNames.BYTES_READ)); + Assert.assertTrue(metrics.containsKey(MetricNames.BYTES_WRITTEN)); + Assert.assertTrue(metrics.containsKey(MetricNames.ACTIVE_THREADS)); + } + + //test processor status metric retreiveing + @Test + public void testGetProcessorGroupStatusMetrics() { + ProcessorStatus procStatus = new ProcessorStatus(); + List<ProcessorStatus> processorStatuses = new ArrayList<>(); + processorStatuses.add(procStatus); + status.setProcessorStatus(processorStatuses); + + final Map<String, Double> metrics = metricsService.getProcessorMetrics(procStatus); + + Assert.assertTrue(metrics.containsKey(MetricNames.BYTES_READ)); + Assert.assertTrue(metrics.containsKey(MetricNames.BYTES_WRITTEN)); + Assert.assertTrue(metrics.containsKey(MetricNames.FLOW_FILES_RECEIVED)); + Assert.assertTrue(metrics.containsKey(MetricNames.FLOW_FILES_SENT)); + Assert.assertTrue(metrics.containsKey(MetricNames.ACTIVE_THREADS)); + } + + //test JVM status metric retreiveing + @Test + public void testGetVirtualMachineMetrics() { + final VirtualMachineMetrics virtualMachineMetrics = VirtualMachineMetrics.getInstance(); + + final Map<String, Double> metrics = metricsService.getJVMMetrics(virtualMachineMetrics); + Assert.assertTrue(metrics.containsKey(MetricNames.JVM_UPTIME)); + Assert.assertTrue(metrics.containsKey(MetricNames.JVM_HEAP_USED)); + Assert.assertTrue(metrics.containsKey(MetricNames.JVM_HEAP_USAGE)); + Assert.assertTrue(metrics.containsKey(MetricNames.JVM_NON_HEAP_USAGE)); + Assert.assertTrue(metrics.containsKey(MetricNames.JVM_THREAD_STATES_RUNNABLE)); + Assert.assertTrue(metrics.containsKey(MetricNames.JVM_THREAD_STATES_BLOCKED)); + Assert.assertTrue(metrics.containsKey(MetricNames.JVM_THREAD_STATES_TIMED_WAITING)); + Assert.assertTrue(metrics.containsKey(MetricNames.JVM_THREAD_STATES_TERMINATED)); + Assert.assertTrue(metrics.containsKey(MetricNames.JVM_THREAD_COUNT)); + Assert.assertTrue(metrics.containsKey(MetricNames.JVM_DAEMON_THREAD_COUNT)); + Assert.assertTrue(metrics.containsKey(MetricNames.JVM_FILE_DESCRIPTOR_USAGE)); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/376d3c4e/nifi-nar-bundles/nifi-datadog-bundle/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-datadog-bundle/pom.xml b/nifi-nar-bundles/nifi-datadog-bundle/pom.xml new file mode 100644 index 0000000..eb6b87c --- /dev/null +++ b/nifi-nar-bundles/nifi-datadog-bundle/pom.xml @@ -0,0 +1,41 @@ +<?xml version="1.0"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-nar-bundles</artifactId> + <version>1.0.0-SNAPSHOT</version> + </parent> + + <artifactId>nifi-datadog-bundle</artifactId> + <packaging>pom</packaging> + + <modules> + <module>nifi-datadog-reporting-task</module> + <module>nifi-datadog-nar</module> + </modules> + + <dependencyManagement> + <dependencies> + <dependency> + <groupId>org.glassfish.jersey.core</groupId> + <artifactId>jersey-client</artifactId> + <version>2.19</version> + </dependency> + </dependencies> + </dependencyManagement> +</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/376d3c4e/nifi-nar-bundles/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml old mode 100644 new mode 100755 index 6ebd2da..f785c87 --- a/nifi-nar-bundles/pom.xml +++ b/nifi-nar-bundles/pom.xml @@ -66,6 +66,7 @@ <module>nifi-evtx-bundle</module> <module>nifi-slack-bundle</module> <module>nifi-snmp-bundle</module> + <module>nifi-datadog-bundle</module> <module>nifi-windows-event-log-bundle</module> <module>nifi-ignite-bundle</module> <module>nifi-email-bundle</module>
