Repository: nifi Updated Branches: refs/heads/master 5930c0c21 -> 6201c06c9
NIFI-4392 Create a MetricReportingTask with GraphiteMetricService This closes #2171. Signed-off-by: Bryan Bende <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/6201c06c Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/6201c06c Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/6201c06c Branch: refs/heads/master Commit: 6201c06c996746024249b1f448cea09d16dc7ba8 Parents: 5930c0c Author: Omer Hadari <[email protected]> Authored: Sun Sep 24 22:24:09 2017 +0300 Committer: Bryan Bende <[email protected]> Committed: Sun Oct 8 10:44:11 2017 -0400 ---------------------------------------------------------------------- nifi-assembly/pom.xml | 10 + .../org/apache/nifi/util/MockEventAccess.java | 8 +- .../pom.xml | 40 +++ .../src/main/resources/META-INF/NOTICE | 25 ++ .../nifi-metrics-reporter-service-api/pom.xml | 27 ++ .../reporter/service/MetricReporterService.java | 40 +++ .../nifi-metrics-reporting-nar/pom.xml | 42 +++ .../src/main/resources/META-INF/NOTICE | 25 ++ .../nifi-metrics-reporting-task/pom.xml | 55 ++++ .../org/apache/nifi/metrics/FlowMetricSet.java | 95 +++++++ .../org/apache/nifi/metrics/MetricNames.java | 35 +++ .../service/GraphiteMetricReporterService.java | 180 +++++++++++++ .../reporting/task/MetricsReportingTask.java | 151 +++++++++++ ...org.apache.nifi.controller.ControllerService | 15 ++ .../org.apache.nifi.reporting.ReportingTask | 15 ++ .../GraphiteMetricReporterServiceTest.java | 211 +++++++++++++++ .../task/MetricsReportingTaskTest.java | 254 +++++++++++++++++++ .../nifi-metrics-reporting-bundle/pom.xml | 57 +++++ nifi-nar-bundles/pom.xml | 1 + pom.xml | 12 + 20 files changed, 1297 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/6201c06c/nifi-assembly/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index 108438b..7710b3d 100755 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -516,6 +516,16 @@ <artifactId>nifi-redis-nar</artifactId> <type>nar</type> </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-metrics-reporter-service-api-nar</artifactId> + <type>nar</type> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-metrics-reporting-nar</artifactId> + <type>nar</type> + </dependency> </dependencies> <profiles> <profile> http://git-wip-us.apache.org/repos/asf/nifi/blob/6201c06c/nifi-mock/src/main/java/org/apache/nifi/util/MockEventAccess.java ---------------------------------------------------------------------- diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockEventAccess.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockEventAccess.java index 2a2aab2..38d1619 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockEventAccess.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockEventAccess.java @@ -18,7 +18,9 @@ package org.apache.nifi.util; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.nifi.action.Action; import org.apache.nifi.controller.status.ProcessGroupStatus; @@ -31,10 +33,14 @@ public class MockEventAccess implements EventAccess { private ProcessGroupStatus processGroupStatus; private final List<ProvenanceEventRecord> provenanceRecords = new ArrayList<>(); private final List<Action> flowChanges = new ArrayList<>(); + private final Map<String, ProcessGroupStatus> processGroupStatusMap = new HashMap<>(); public void setProcessGroupStatus(final ProcessGroupStatus status) { this.processGroupStatus = status; } + public void setProcessGroupStatus(String groupId, final ProcessGroupStatus status) { + processGroupStatusMap.put(groupId, status); + } @Override public ProcessGroupStatus getControllerStatus() { @@ -43,7 +49,7 @@ public class MockEventAccess implements EventAccess { @Override public ProcessGroupStatus getGroupStatus(final String groupId) { - return null; + return processGroupStatusMap.get(groupId); } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/6201c06c/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporter-service-api-nar/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporter-service-api-nar/pom.xml b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporter-service-api-nar/pom.xml new file mode 100644 index 0000000..de48b81 --- /dev/null +++ b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporter-service-api-nar/pom.xml @@ -0,0 +1,40 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>nifi-metrics-reporting-bundle</artifactId> + <groupId>org.apache.nifi</groupId> + <version>1.5.0-SNAPSHOT</version> + </parent> + <packaging>nar</packaging> + <modelVersion>4.0.0</modelVersion> + + <artifactId>nifi-metrics-reporter-service-api-nar</artifactId> + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-standard-services-api-nar</artifactId> + <type>nar</type> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-metrics-reporter-service-api</artifactId> + <version>1.5.0-SNAPSHOT</version> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/6201c06c/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporter-service-api-nar/src/main/resources/META-INF/NOTICE ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporter-service-api-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporter-service-api-nar/src/main/resources/META-INF/NOTICE new file mode 100644 index 0000000..76b99fd --- /dev/null +++ b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporter-service-api-nar/src/main/resources/META-INF/NOTICE @@ -0,0 +1,25 @@ +nifi-metrics-reporter-service-api-nar +Copyright 2015-2017 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +****************** +Apache Software License v2 +****************** + +The following binary components are provided under the Apache Software License v2 + + (ASLv2) Dropwizard Metrics + The following NOTICE information applies: + Metrics + Copyright 2010-2013 Coda Hale and Yammer, Inc., 2014-2017 Dropwizard Team + + This product includes software developed by Coda Hale and Yammer, Inc. + + This product includes code derived from the JSR-166 project (ThreadLocalRandom, Striped64, + LongAdder), which was released with the following comments: + + Written by Doug Lea with assistance from members of JCP JSR-166 + Expert Group and released to the public domain, as explained at + http://creativecommons.org/publicdomain/zero/1.0/ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/6201c06c/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporter-service-api/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporter-service-api/pom.xml b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporter-service-api/pom.xml new file mode 100644 index 0000000..173d35c --- /dev/null +++ b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporter-service-api/pom.xml @@ -0,0 +1,27 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>nifi-metrics-reporting-bundle</artifactId> + <groupId>org.apache.nifi</groupId> + <version>1.5.0-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>nifi-metrics-reporter-service-api</artifactId> +</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/6201c06c/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporter-service-api/src/main/java/org/apache/nifi/metrics/reporting/reporter/service/MetricReporterService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporter-service-api/src/main/java/org/apache/nifi/metrics/reporting/reporter/service/MetricReporterService.java b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporter-service-api/src/main/java/org/apache/nifi/metrics/reporting/reporter/service/MetricReporterService.java new file mode 100644 index 0000000..d87be15 --- /dev/null +++ b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporter-service-api/src/main/java/org/apache/nifi/metrics/reporting/reporter/service/MetricReporterService.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.metrics.reporting.reporter.service; + +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.ScheduledReporter; +import org.apache.nifi.controller.ControllerService; +import org.apache.nifi.processor.exception.ProcessException; + +/** + * An interface for controller services used by MetricsReportingTask. In order to report to a new + * client, implement this interface and make sure to return the desired implementation of {@link ScheduledReporter}. + * + * @author Omer Hadari + */ +public interface MetricReporterService extends ControllerService { + + /** + * Create a reporter to a metric client (i.e. graphite). + * + * @param metricRegistry registry with the metrics to report. + * @return an instance of the reporter. + * @throws ProcessException if there was an error creating the reporter. + */ + ScheduledReporter createReporter(MetricRegistry metricRegistry) throws ProcessException; +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6201c06c/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-nar/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-nar/pom.xml b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-nar/pom.xml new file mode 100644 index 0000000..f92b17b --- /dev/null +++ b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-nar/pom.xml @@ -0,0 +1,42 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>nifi-metrics-reporting-bundle</artifactId> + <groupId>org.apache.nifi</groupId> + <version>1.5.0-SNAPSHOT</version> + </parent> + <packaging>nar</packaging> + <modelVersion>4.0.0</modelVersion> + + <artifactId>nifi-metrics-reporting-nar</artifactId> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-metrics-reporting-task</artifactId> + <version>1.5.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-metrics-reporter-service-api-nar</artifactId> + <version>1.5.0-SNAPSHOT</version> + <type>nar</type> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/6201c06c/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-nar/src/main/resources/META-INF/NOTICE ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-nar/src/main/resources/META-INF/NOTICE new file mode 100644 index 0000000..504e3f1 --- /dev/null +++ b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-nar/src/main/resources/META-INF/NOTICE @@ -0,0 +1,25 @@ +nifi-metrics-reporting-nar +Copyright 2015-2017 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +****************** +Apache Software License v2 +****************** + +The following binary components are provided under the Apache Software License v2 + + (ASLv2) Dropwizard Metrics + The following NOTICE information applies: + Metrics + Copyright 2010-2013 Coda Hale and Yammer, Inc., 2014-2017 Dropwizard Team + + This product includes software developed by Coda Hale and Yammer, Inc. + + This product includes code derived from the JSR-166 project (ThreadLocalRandom, Striped64, + LongAdder), which was released with the following comments: + + Written by Doug Lea with assistance from members of JCP JSR-166 + Expert Group and released to the public domain, as explained at + http://creativecommons.org/publicdomain/zero/1.0/ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/6201c06c/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/pom.xml b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/pom.xml new file mode 100644 index 0000000..7b437bc --- /dev/null +++ b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/pom.xml @@ -0,0 +1,55 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>nifi-metrics-reporting-bundle</artifactId> + <groupId>org.apache.nifi</groupId> + <version>1.5.0-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>nifi-metrics-reporting-task</artifactId> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-utils</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-metrics-reporter-service-api</artifactId> + <version>1.5.0-SNAPSHOT</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>io.dropwizard.metrics</groupId> + <artifactId>metrics-graphite</artifactId> + <version>3.1.2</version> + </dependency> + <dependency> + <groupId>io.dropwizard.metrics</groupId> + <artifactId>metrics-jvm</artifactId> + <version>3.1.2</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mock</artifactId> + <scope>test</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/6201c06c/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/java/org/apache/nifi/metrics/FlowMetricSet.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/java/org/apache/nifi/metrics/FlowMetricSet.java b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/java/org/apache/nifi/metrics/FlowMetricSet.java new file mode 100644 index 0000000..d6e2fba --- /dev/null +++ b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/java/org/apache/nifi/metrics/FlowMetricSet.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.metrics; + +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Metric; +import com.codahale.metrics.MetricSet; +import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.controller.status.ProcessorStatus; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +/** + * A metric set of NiFi instance related metrics. + * + * @author Omer Hadari + */ +public class FlowMetricSet implements MetricSet { + + + /** + * Reference to the process status that should be reported. Should be updated when the status changes. + */ + private final AtomicReference<ProcessGroupStatus> currentStatusReference; + + /** + * Create a metric set that will look at a given process status reference for deciding metrics. + * + * @param currentStatusReference a reference to the process status. + */ + public FlowMetricSet(AtomicReference<ProcessGroupStatus> currentStatusReference) { + this.currentStatusReference = currentStatusReference; + } + + /** + * Create a map of {@link Gauge}s for the {@link #currentStatusReference}. This methods reports the metrics as + * found in the reference. + * + * @return map between the metric name and a {@link Gauge} to it's value. + */ + @Override + public Map<String, Metric> getMetrics() { + + Map<String, Metric> metrics = new HashMap<>(); + + metrics.put(MetricNames.ACTIVE_THREADS, (Gauge<Integer>) () -> currentStatusReference.get().getActiveThreadCount()); + metrics.put(MetricNames.BYTES_QUEUED, (Gauge<Long>) () -> currentStatusReference.get().getQueuedContentSize()); + metrics.put(MetricNames.BYTES_READ, (Gauge<Long>) () -> currentStatusReference.get().getBytesRead()); + metrics.put(MetricNames.BYTES_RECEIVED, (Gauge<Long>) () -> currentStatusReference.get().getBytesReceived()); + metrics.put(MetricNames.BYTES_SENT, (Gauge<Long>) () -> currentStatusReference.get().getBytesSent()); + metrics.put(MetricNames.BYTES_WRITTEN, (Gauge<Long>) () -> currentStatusReference.get().getBytesWritten()); + metrics.put(MetricNames.FLOW_FILES_RECEIVED, (Gauge<Integer>) () -> currentStatusReference.get().getFlowFilesReceived()); + metrics.put(MetricNames.FLOW_FILES_QUEUED, (Gauge<Integer>) () -> currentStatusReference.get().getQueuedCount()); + metrics.put(MetricNames.FLOW_FILES_SENT, (Gauge<Integer>) () -> currentStatusReference.get().getFlowFilesSent()); + metrics.put(MetricNames.TOTAL_TASK_DURATION_NANOS, (Gauge<Long>) () -> calculateProcessingNanos(currentStatusReference.get())); + + return metrics; + } + + /** + * Calculate the total processing time of a process group. + * + * @param status the current process group status. + * @return the total amount of nanoseconds spent in each processor in the process group. + */ + private long calculateProcessingNanos(final ProcessGroupStatus status) { + long nanos = 0L; + + for (final ProcessorStatus procStats : status.getProcessorStatus()) { + nanos += procStats.getProcessingNanos(); + } + + for (final ProcessGroupStatus childGroupStatus : status.getProcessGroupStatus()) { + nanos += calculateProcessingNanos(childGroupStatus); + } + + return nanos; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6201c06c/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/java/org/apache/nifi/metrics/MetricNames.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/java/org/apache/nifi/metrics/MetricNames.java b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/java/org/apache/nifi/metrics/MetricNames.java new file mode 100644 index 0000000..fa06b8b --- /dev/null +++ b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/java/org/apache/nifi/metrics/MetricNames.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.metrics; + +/** + * The Metric names to send to Ambari. + */ +public interface MetricNames { + + // NiFi Metrics + String FLOW_FILES_RECEIVED = "FlowFilesReceivedLast5Minutes"; + String BYTES_RECEIVED = "BytesReceivedLast5Minutes"; + String FLOW_FILES_SENT = "FlowFilesSentLast5Minutes"; + String BYTES_SENT = "BytesSentLast5Minutes"; + String FLOW_FILES_QUEUED = "FlowFilesQueued"; + String BYTES_QUEUED = "BytesQueued"; + String BYTES_READ = "BytesReadLast5Minutes"; + String BYTES_WRITTEN = "BytesWrittenLast5Minutes"; + String ACTIVE_THREADS = "ActiveThreads"; + String TOTAL_TASK_DURATION_NANOS = "TotalTaskDurationNanoSeconds"; +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6201c06c/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/java/org/apache/nifi/metrics/reporting/reporter/service/GraphiteMetricReporterService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/java/org/apache/nifi/metrics/reporting/reporter/service/GraphiteMetricReporterService.java b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/java/org/apache/nifi/metrics/reporting/reporter/service/GraphiteMetricReporterService.java new file mode 100644 index 0000000..55623ce --- /dev/null +++ b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/java/org/apache/nifi/metrics/reporting/reporter/service/GraphiteMetricReporterService.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.metrics.reporting.reporter.service; + +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.ScheduledReporter; +import com.codahale.metrics.graphite.Graphite; +import com.codahale.metrics.graphite.GraphiteReporter; +import com.codahale.metrics.graphite.GraphiteSender; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.metrics.reporting.task.MetricsReportingTask; +import org.apache.nifi.processor.util.StandardValidators; + +import javax.net.SocketFactory; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * A controller service that provides metric reporters for graphite, can be used by {@link MetricsReportingTask}. + * + * @author Omer Hadari + */ +@Tags({"metrics", "reporting", "graphite"}) +@CapabilityDescription("A controller service that provides metric reporters for graphite. " + + "Used by MetricsReportingTask.") +public class GraphiteMetricReporterService extends AbstractControllerService implements MetricReporterService { + + /** + * Points to the hostname of the graphite listener. + */ + public static final PropertyDescriptor HOST = new PropertyDescriptor.Builder() + .name("host") + .displayName("Host") + .description("The hostname of the carbon listener") + .required(true) + .addValidator(StandardValidators.URI_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + /** + * Points to the port on which the graphite server listens. + */ + public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder() + .name("port") + .displayName("Port") + .description("The port on which carbon listens") + .required(true) + .addValidator(StandardValidators.PORT_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + /** + * Points to the charset name that the graphite server expects. + */ + public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder() + .name("charset") + .displayName("Charset") + .description("The charset used by the graphite server") + .required(true) + .defaultValue("UTF-8") + .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) + .build(); + + /** + * Prefix for all metric names sent by reporters - for separation of NiFi stats in graphite. + */ + protected static final PropertyDescriptor METRIC_NAME_PREFIX = new PropertyDescriptor.Builder() + .name("metric name prefix") + .displayName("Metric Name Prefix") + .description("A prefix that will be used for all metric names sent by reporters provided by this service.") + .required(true) + .defaultValue("nifi") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + /** + * List of property descriptors used by the service. + */ + private static final List<PropertyDescriptor> properties; + + static { + final List<PropertyDescriptor> props = new ArrayList<>(); + props.add(HOST); + props.add(PORT); + props.add(CHARSET); + props.add(METRIC_NAME_PREFIX); + properties = Collections.unmodifiableList(props); + } + + /** + * Graphite sender, a connection to the server. + */ + private GraphiteSender graphiteSender; + + /** + * The configured {@link #METRIC_NAME_PREFIX} value. + */ + private String metricNamePrefix; + + /** + * Create the {@link #graphiteSender} according to configuration. + * + * @param context used to access properties. + */ + @OnEnabled + public void onEnabled(final ConfigurationContext context) { + String host = context.getProperty(HOST).evaluateAttributeExpressions().getValue(); + int port = context.getProperty(PORT).evaluateAttributeExpressions().asInteger(); + Charset charset = Charset.forName(context.getProperty(CHARSET).getValue()); + graphiteSender = createSender(host, port, charset); + metricNamePrefix = context.getProperty(METRIC_NAME_PREFIX).evaluateAttributeExpressions().getValue(); + } + + /** + * Close the graphite sender. + * + * @throws IOException if failed to close the connection. + */ + @OnDisabled + public void shutdown() throws IOException { + try { + graphiteSender.close(); + } finally { + graphiteSender = null; + } + } + + /** + * Use the {@link #graphiteSender} in order to create a reporter. + * + * @param metricRegistry registry with the metrics to report. + * @return a reporter instance. + */ + @Override + public ScheduledReporter createReporter(MetricRegistry metricRegistry) { + return GraphiteReporter.forRegistry(metricRegistry).prefixedWith(metricNamePrefix).build(graphiteSender); + + } + + /** + * Create a sender. + * + * @param host the hostname of the server to connect to. + * @param port the port on which the server listens. + * @param charset the charset in which the server expects logs. + * @return The created sender. + */ + protected GraphiteSender createSender(String host, int port, Charset charset) { + return new Graphite(host, port, SocketFactory.getDefault(), charset); + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6201c06c/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/java/org/apache/nifi/metrics/reporting/task/MetricsReportingTask.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/java/org/apache/nifi/metrics/reporting/task/MetricsReportingTask.java b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/java/org/apache/nifi/metrics/reporting/task/MetricsReportingTask.java new file mode 100644 index 0000000..37eb194 --- /dev/null +++ b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/java/org/apache/nifi/metrics/reporting/task/MetricsReportingTask.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.metrics.reporting.task; + +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.ScheduledReporter; +import com.codahale.metrics.jvm.MemoryUsageGaugeSet; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.metrics.FlowMetricSet; +import org.apache.nifi.metrics.reporting.reporter.service.MetricReporterService; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.reporting.AbstractReportingTask; +import org.apache.nifi.reporting.ReportingContext; +import org.apache.nifi.reporting.ReportingInitializationContext; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; + +/** + * A reporting task for NiFi instance and JVM related metrics. + * <p> + * This task reports metrics to services according to a provided {@link ScheduledReporter}, reached by using a + * {@link MetricReporterService}. In order to report to different clients, simply use different implementations of + * the controller service. + * + * @author Omer Hadari + * @see MetricReporterService + */ +@Tags({"metrics", "reporting"}) +@CapabilityDescription("This reporting task reports a set of metrics regarding the JVM and the NiFi instance" + + "to a reporter. The reporter is provided by a MetricReporterService. It can be optionally used for a specific" + + "process group if a property with the group id is provided.") +public class MetricsReportingTask extends AbstractReportingTask { + + /** + * Points to the service which provides {@link ScheduledReporter} instances. + */ + protected static final PropertyDescriptor REPORTER_SERVICE = new PropertyDescriptor.Builder() + .name("metric reporter service") + .displayName("Metric Reporter Service") + .description("The service that provides a reporter for the gathered metrics") + .identifiesControllerService(MetricReporterService.class) + .required(true) + .build(); + + /** + * Metrics of the process group with this ID should be reported. If not specified, use the root process group. + */ + protected static final PropertyDescriptor PROCESS_GROUP_ID = new PropertyDescriptor.Builder() + .name("process group id") + .displayName("Process Group ID") + .description("The id of the process group to report. If not specified, metrics of the root process group" + + "are reported.") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + /** + * Contains the metrics that should be reported. + */ + private MetricRegistry metricRegistry; + + /** + * Used for actually reporting metrics. + */ + private ScheduledReporter reporter; + + // Protected for testing sake. DO NOT ACCESS FOR OTHER PURPOSES. + /** + * Points to the most recent process group status seen by this task. + */ + protected AtomicReference<ProcessGroupStatus> currentStatusReference; + + /** + * Register all wanted metrics to {@link #metricRegistry}. + * <p> + * {@inheritDoc} + */ + @Override + protected void init(ReportingInitializationContext config) { + metricRegistry = new MetricRegistry(); + currentStatusReference = new AtomicReference<>(); + metricRegistry.registerAll(new MemoryUsageGaugeSet()); + metricRegistry.registerAll(new FlowMetricSet(currentStatusReference)); + } + + /** + * Populate {@link #reporter} using the {@link MetricReporterService}. If the reporter is active already, + * do nothing. + * + * @param context used for accessing the controller service. + */ + @OnScheduled + public void connect(ConfigurationContext context) { + if (reporter == null) { + reporter = ((MetricReporterService) context.getProperty(REPORTER_SERVICE).asControllerService()) + .createReporter(metricRegistry); + } + } + + /** + * Report the registered metrics. + * + * @param context used for getting the most recent {@link ProcessGroupStatus}. + */ + @Override + public void onTrigger(ReportingContext context) { + String groupId = context.getProperty(PROCESS_GROUP_ID).evaluateAttributeExpressions().getValue(); + + ProcessGroupStatus statusToReport = groupId == null + ? context.getEventAccess().getControllerStatus() + : context.getEventAccess().getGroupStatus(groupId); + + if (statusToReport != null) { + currentStatusReference.set(statusToReport); + reporter.report(); + } else { + getLogger().error("Process group with provided group id could not be found."); + } + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(REPORTER_SERVICE); + properties.add(PROCESS_GROUP_ID); + return Collections.unmodifiableList(properties); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6201c06c/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService new file mode 100644 index 0000000..0ddc70a --- /dev/null +++ b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.metrics.reporting.reporter.service.GraphiteMetricReporterService \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/6201c06c/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask new file mode 100644 index 0000000..8c554a4 --- /dev/null +++ b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.metrics.reporting.task.MetricsReportingTask http://git-wip-us.apache.org/repos/asf/nifi/blob/6201c06c/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/test/java/org/apache/nifi/metrics/reporting/reporter/service/GraphiteMetricReporterServiceTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/test/java/org/apache/nifi/metrics/reporting/reporter/service/GraphiteMetricReporterServiceTest.java b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/test/java/org/apache/nifi/metrics/reporting/reporter/service/GraphiteMetricReporterServiceTest.java new file mode 100644 index 0000000..e7257a0 --- /dev/null +++ b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/test/java/org/apache/nifi/metrics/reporting/reporter/service/GraphiteMetricReporterServiceTest.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.metrics.reporting.reporter.service; + +import com.codahale.metrics.Gauge; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.ScheduledReporter; +import com.codahale.metrics.graphite.GraphiteSender; +import org.apache.nifi.processor.Processor; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; + +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Test class for {@link GraphiteMetricReporterService}. + * + * @author Omer Hadari + */ +@RunWith(MockitoJUnitRunner.class) +public class GraphiteMetricReporterServiceTest { + + /** + * Service identifier for registerting the tested service to the tests runner. + */ + private static final String SERVICE_IDENTIFIER = "graphite-metric-reporter-service"; + + /** + * Sample host name for the {@link GraphiteMetricReporterService#HOST} property. + */ + private static final String TEST_HOST = "some-host"; + + /** + * Sample port for the {@link GraphiteMetricReporterService#PORT} property. + */ + private static final int TEST_PORT = 12345; + + /** + * Sample charset for the {@link GraphiteMetricReporterService#CHARSET} property. + */ + private static final Charset TEST_CHARSET = StandardCharsets.UTF_16LE; + + /** + * Sample prefix for metric names. + */ + private static final String METRIC_NAMES_PREFIX = "test-metric-name-prefix"; + + /** + * Sample metric for verifying that a graphite sender with the correct configuration is used. + */ + private static final String TEST_METRIC_NAME = "test-metric"; + + /** + * The fixed value of {@link #TEST_METRIC_NAME}. + */ + private static final int TEST_METRIC_VALUE = 2; + + /** + * Dummy processor for creating {@link #runner}. + */ + @Mock + private Processor processorDummy; + + /** + * Mock sender for verifying creation with the correct configuration. + */ + @Mock + private GraphiteSender graphiteSenderMock; + + /** + * Stub metric registry, that contains the test metrics. + */ + private MetricRegistry metricRegistryStub; + + /** + * Test runner for activating and configuring the service. + */ + private TestRunner runner; + + /** + * The test subject. + */ + private GraphiteMetricReporterService testedService; + + /** + * Instantiate the runner and mocks between tests. Register metrics to the {@link #metricRegistryStub}. + */ + @Before + public void setUp() throws Exception { + runner = TestRunners.newTestRunner(processorDummy); + testedService = new GraphiteMetricReporterService(); + + metricRegistryStub = new MetricRegistry(); + metricRegistryStub.register(TEST_METRIC_NAME, ((Gauge<Integer>) () -> TEST_METRIC_VALUE)); + + } + + + /** + * Make sure that a correctly configured service can be activated. + */ + @Test + public void testGraphiteMetricReporterSanityConfiguration() throws Exception { + runner.addControllerService(SERVICE_IDENTIFIER, testedService); + setServiceProperties(TEST_HOST, TEST_PORT, TEST_CHARSET, METRIC_NAMES_PREFIX); + runner.enableControllerService(testedService); + + runner.assertValid(testedService); + } + + + /** + * Make sure that a correctly configured service provides a reporter for the matching configuration, and + * actually reports to the correct address. + */ + @Test + public void testCreateReporterUsesCorrectSender() throws Exception { + testedService = new TestableGraphiteMetricReporterService(); + runner.addControllerService(SERVICE_IDENTIFIER, testedService); + setServiceProperties(TEST_HOST, TEST_PORT, TEST_CHARSET, METRIC_NAMES_PREFIX); + when(graphiteSenderMock.isConnected()).thenReturn(false); + runner.enableControllerService(testedService); + + ScheduledReporter createdReporter = testedService.createReporter(metricRegistryStub); + createdReporter.report(); + + String expectedMetricName = MetricRegistry.name(METRIC_NAMES_PREFIX, TEST_METRIC_NAME); + verify(graphiteSenderMock).send(eq(expectedMetricName), eq(String.valueOf(TEST_METRIC_VALUE)), anyLong()); + } + + /** + * Make sure that {@link GraphiteMetricReporterService#shutdown()} closes the connection to graphite. + */ + @Test + public void testShutdownClosesSender() throws Exception { + testedService = new TestableGraphiteMetricReporterService(); + runner.addControllerService(SERVICE_IDENTIFIER, testedService); + setServiceProperties(TEST_HOST, TEST_PORT, TEST_CHARSET, METRIC_NAMES_PREFIX); + runner.enableControllerService(testedService); + runner.disableControllerService(testedService); + + verify(graphiteSenderMock).close(); + } + + /** + * Set the test subject's properties. + * + * @param host populates {@link GraphiteMetricReporterService#HOST}. + * @param port populates {@link GraphiteMetricReporterService#PORT}. + * @param charset populates {@link GraphiteMetricReporterService#CHARSET}. + * @param metricNamesPrefix populates {@link GraphiteMetricReporterService#METRIC_NAME_PREFIX}. + */ + private void setServiceProperties(String host, int port, Charset charset, String metricNamesPrefix) { + runner.setProperty(testedService, GraphiteMetricReporterService.HOST, host); + runner.setProperty(testedService, GraphiteMetricReporterService.PORT, String.valueOf(port)); + runner.setProperty(testedService, GraphiteMetricReporterService.CHARSET, charset.name()); + runner.setProperty(testedService, GraphiteMetricReporterService.METRIC_NAME_PREFIX, metricNamesPrefix); + } + + /** + * This class is a patch. It overrides {@link GraphiteMetricReporterService#createSender(String, int, Charset)} + * so that it is possible to verify a correct creation of graphite senders according to property values. + */ + private class TestableGraphiteMetricReporterService extends GraphiteMetricReporterService { + + /** + * Overrides the actual methods in order to inject the mock {@link #graphiteSenderMock}. + * <p> + * If this method is called with the test property values, it returns the mock. Otherwise operate + * regularly. + * + * @param host the provided hostname. + * @param port the provided port. + * @param charset the provided graphite server charset. + * @return {@link #graphiteSenderMock} if all params were the constant test params, regular result otherwise. + */ + @Override + protected GraphiteSender createSender(String host, int port, Charset charset) { + if (TEST_HOST.equals(host) && TEST_PORT == port && TEST_CHARSET.equals(charset)) { + return graphiteSenderMock; + + } + return super.createSender(host, port, charset); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6201c06c/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/test/java/org/apache/nifi/metrics/reporting/task/MetricsReportingTaskTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/test/java/org/apache/nifi/metrics/reporting/task/MetricsReportingTaskTest.java b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/test/java/org/apache/nifi/metrics/reporting/task/MetricsReportingTaskTest.java new file mode 100644 index 0000000..0619e73 --- /dev/null +++ b/nifi-nar-bundles/nifi-metrics-reporting-bundle/nifi-metrics-reporting-task/src/test/java/org/apache/nifi/metrics/reporting/task/MetricsReportingTaskTest.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.metrics.reporting.task; + +import com.codahale.metrics.Metric; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.ScheduledReporter; +import com.codahale.metrics.jvm.MemoryUsageGaugeSet; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.ControllerService; +import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.metrics.FlowMetricSet; +import org.apache.nifi.metrics.reporting.reporter.service.MetricReporterService; +import org.apache.nifi.reporting.ReportingContext; +import org.apache.nifi.reporting.ReportingInitializationContext; +import org.apache.nifi.state.MockStateManager; +import org.apache.nifi.util.MockComponentLog; +import org.apache.nifi.util.MockConfigurationContext; +import org.apache.nifi.util.MockReportingContext; +import org.apache.nifi.util.MockReportingInitializationContext; +import org.apache.nifi.util.MockVariableRegistry; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Test class for {@link MetricsReportingTask}. + * + * @author Omer Hadari + */ +@RunWith(MockitoJUnitRunner.class) +public class MetricsReportingTaskTest { + + /** + * Identifier for {@link #reporterServiceStub}. + */ + private static final String REPORTER_SERVICE_IDENTIFIER = "reporter-service"; + + /** + * Id for the group with status {@link #innerGroupStatus}. + */ + private static final String TEST_GROUP_ID = "test-process-group-id"; + + /** + * Id for the {@link #reportingInitContextStub}. + */ + private static final String TEST_INIT_CONTEXT_ID = "test-init-context-id"; + + /** + * Name for {@link #reportingInitContextStub}. + */ + private static final String TEST_INIT_CONTEXT_NAME = "test-init-context-name"; + + /** + * Id for the tested tested reporting task. + */ + private static final String TEST_TASK_ID = "test-task-id"; + + + /** + * Stub context, used by {@link MetricsReportingTask#onTrigger(ReportingContext)} for reaching the status. + */ + private MockReportingContext reportingContextStub; + + /** + * Stub context, used by {@link MetricsReportingTask#connect(ConfigurationContext)} for reaching the service. + */ + private MockConfigurationContext configurationContextStub; + + /** + * Stub service for providing {@link #reporterMock}, used for actual reporting + */ + @Mock + private MetricReporterService reporterServiceStub; + + /** + * Mock reporter, used for verifying actual reporting. + */ + @Mock + private ScheduledReporter reporterMock; + + /** + * A status for the "root" process group. + */ + private ProcessGroupStatus rootGroupStatus; + + /** + * Same as {@link #rootGroupStatus}, used when {@link MetricsReportingTask#PROCESS_GROUP_ID} is set. + */ + private ProcessGroupStatus innerGroupStatus; + + /** + * Stub initialization context for calling {@link MetricsReportingTask#initialize(ReportingInitializationContext)}. + */ + private MockReportingInitializationContext reportingInitContextStub; + + /** + * The test subject. + */ + private MetricsReportingTask testedReportingTask; + + /** + * Set up the test environment and mock behaviour. This includes registering {@link #reporterServiceStub} in the + * different contexts, overriding {@link MetricsReportingTask#currentStatusReference} and instantiating the test + * subject. + */ + @Before + public void setUp() throws Exception { + Map<String, ControllerService> services = new HashMap<>(); + services.put(REPORTER_SERVICE_IDENTIFIER, reporterServiceStub); + testedReportingTask = new MetricsReportingTask(); + reportingContextStub = new MockReportingContext( + services, new MockStateManager(testedReportingTask), new MockVariableRegistry()); + + rootGroupStatus = new ProcessGroupStatus(); + innerGroupStatus = new ProcessGroupStatus(); + when(reporterServiceStub.createReporter(any())).thenReturn(reporterMock); + when(reporterServiceStub.getIdentifier()).thenReturn(REPORTER_SERVICE_IDENTIFIER); + reportingContextStub.setProperty(MetricsReportingTask.REPORTER_SERVICE.getName(), REPORTER_SERVICE_IDENTIFIER); + reportingContextStub.addControllerService(reporterServiceStub, REPORTER_SERVICE_IDENTIFIER); + + configurationContextStub = new MockConfigurationContext(reportingContextStub.getProperties(), + reportingContextStub.getControllerServiceLookup()); + reportingInitContextStub = new MockReportingInitializationContext( + TEST_INIT_CONTEXT_ID, + TEST_INIT_CONTEXT_NAME, + new MockComponentLog(TEST_TASK_ID, testedReportingTask)); + } + + /** + * Make sure that in a single life cycle the correct metrics are registered, the correct {@link ProcessGroupStatus} + * is used and that metrics are actually reported. + */ + @Test + public void testValidLifeCycleReportsCorrectly() throws Exception { + reportingContextStub.getEventAccess().setProcessGroupStatus(rootGroupStatus); + + testedReportingTask.initialize(reportingInitContextStub); + testedReportingTask.connect(configurationContextStub); + testedReportingTask.onTrigger(reportingContextStub); + verify(reporterMock).report(); + + // Verify correct metrics are registered + ArgumentCaptor<MetricRegistry> registryCaptor = ArgumentCaptor.forClass(MetricRegistry.class); + verify(reporterServiceStub).createReporter(registryCaptor.capture()); + MetricRegistry usedRegistry = registryCaptor.getValue(); + Map<String, Metric> usedMetrics = usedRegistry.getMetrics(); + assertTrue(usedMetrics.keySet().containsAll(new MemoryUsageGaugeSet().getMetrics().keySet())); + assertTrue(usedMetrics.keySet() + .containsAll(new FlowMetricSet(testedReportingTask.currentStatusReference).getMetrics().keySet())); + + // Verify the most current ProcessGroupStatus is updated + assertEquals(testedReportingTask.currentStatusReference.get(), rootGroupStatus); + } + + /** + * Make sure that in a single life cycle the correct metrics are registered, the correct {@link ProcessGroupStatus} + * is used and that metrics are actually reported. + */ + @Test + public void testValidLifeCycleReportsCorrectlyProcessGroupSpecified() throws Exception { + reportingContextStub.setProperty(MetricsReportingTask.PROCESS_GROUP_ID.getName(), TEST_GROUP_ID); + reportingContextStub.getEventAccess().setProcessGroupStatus(TEST_GROUP_ID, innerGroupStatus); + + testedReportingTask.initialize(reportingInitContextStub); + testedReportingTask.connect(configurationContextStub); + testedReportingTask.onTrigger(reportingContextStub); + verify(reporterMock).report(); + + // Verify correct metrics are registered + ArgumentCaptor<MetricRegistry> registryCaptor = ArgumentCaptor.forClass(MetricRegistry.class); + verify(reporterServiceStub).createReporter(registryCaptor.capture()); + MetricRegistry usedRegistry = registryCaptor.getValue(); + Map<String, Metric> usedMetrics = usedRegistry.getMetrics(); + assertTrue(usedMetrics.keySet().containsAll(new MemoryUsageGaugeSet().getMetrics().keySet())); + assertTrue(usedMetrics.keySet() + .containsAll(new FlowMetricSet(testedReportingTask.currentStatusReference).getMetrics().keySet())); + + // Verify the most current ProcessGroupStatus is updated + assertEquals(testedReportingTask.currentStatusReference.get(), innerGroupStatus); + } + + /** + * Make sure that in a single life cycle the correct metrics are registered, the correct {@link ProcessGroupStatus} + * is used and that metrics are actually reported. + */ + @Test + public void testInvalidProcessGroupId() throws Exception { + reportingContextStub.setProperty(MetricsReportingTask.PROCESS_GROUP_ID.getName(), TEST_GROUP_ID + "-invalid"); + reportingContextStub.getEventAccess().setProcessGroupStatus(TEST_GROUP_ID, innerGroupStatus); + + testedReportingTask.initialize(reportingInitContextStub); + testedReportingTask.connect(configurationContextStub); + testedReportingTask.onTrigger(reportingContextStub); + verify(reporterMock, never()).report(); + assertNull(testedReportingTask.currentStatusReference.get()); + } + + /** + * Make sure that {@link MetricsReportingTask#connect(ConfigurationContext)} does not create a new reporter + * if there is already an active reporter. + */ + @Test + public void testConnectCreatesSingleReporter() throws Exception { + testedReportingTask.initialize(reportingInitContextStub); + testedReportingTask.connect(configurationContextStub); + testedReportingTask.connect(configurationContextStub); + + verify(reporterServiceStub, times(1)).createReporter(any()); + } + + /** + * Sanity check for registered properties. + */ + @Test + public void testGetSupportedPropertyDescriptorsSanity() throws Exception { + List<PropertyDescriptor> expected = Arrays.asList( + MetricsReportingTask.REPORTER_SERVICE, + MetricsReportingTask.PROCESS_GROUP_ID); + assertEquals(expected, testedReportingTask.getSupportedPropertyDescriptors()); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/6201c06c/nifi-nar-bundles/nifi-metrics-reporting-bundle/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-metrics-reporting-bundle/pom.xml b/nifi-nar-bundles/nifi-metrics-reporting-bundle/pom.xml new file mode 100644 index 0000000..2006ebf --- /dev/null +++ b/nifi-nar-bundles/nifi-metrics-reporting-bundle/pom.xml @@ -0,0 +1,57 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>nifi-nar-bundles</artifactId> + <groupId>org.apache.nifi</groupId> + <version>1.5.0-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>nifi-metrics-reporting-bundle</artifactId> + <packaging>pom</packaging> + <modules> + <module>nifi-metrics-reporting-task</module> + <module>nifi-metrics-reporting-nar</module> + <module>nifi-metrics-reporter-service-api</module> + <module>nifi-metrics-reporter-service-api-nar</module> + </modules> + + <dependencyManagement> + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-metrics-reporting-task</artifactId> + <version>1.5.0-SNAPSHOT</version> + </dependency> + </dependencies> + </dependencyManagement> + + <dependencies> + <dependency> + <groupId>io.dropwizard.metrics</groupId> + <artifactId>metrics-core</artifactId> + <version>3.1.2</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + <scope>provided</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/6201c06c/nifi-nar-bundles/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml index 0771c62..330daba 100755 --- a/nifi-nar-bundles/pom.xml +++ b/nifi-nar-bundles/pom.xml @@ -88,6 +88,7 @@ <module>nifi-extension-utils</module> <module>nifi-grpc-bundle</module> <module>nifi-redis-bundle</module> + <module>nifi-metrics-reporting-bundle</module> </modules> <build> http://git-wip-us.apache.org/repos/asf/nifi/blob/6201c06c/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 56961ec..ab46861 100644 --- a/pom.xml +++ b/pom.xml @@ -1489,6 +1489,18 @@ <version>1.5.0-SNAPSHOT</version> <type>nar</type> </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-metrics-reporter-service-api-nar</artifactId> + <version>1.5.0-SNAPSHOT</version> + <type>nar</type> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-metrics-reporting-nar</artifactId> + <version>1.5.0-SNAPSHOT</version> + <type>nar</type> + </dependency> <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-properties</artifactId>
