NIFI-4809 - Implement a SiteToSiteMetricsReportingTask address review comments
NIFI-4809 - Added Record Writer property added unit tests and additional details doc review comments Signed-off-by: Matthew Burgess <[email protected]> This closes #2430 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/930417b9 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/930417b9 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/930417b9 Branch: refs/heads/master Commit: 930417b9dcabb430df79c683acb9c927aa4f5b8b Parents: 844da06 Author: Pierre Villard <[email protected]> Authored: Tue Jan 23 23:15:18 2018 +0100 Committer: Matthew Burgess <[email protected]> Committed: Thu Mar 15 15:58:31 2018 -0400 ---------------------------------------------------------------------- .../nifi-ambari-reporting-task/pom.xml | 15 +- .../reporting/ambari/AmbariReportingTask.java | 4 +- .../reporting/ambari/api/MetricBuilder.java | 84 ------ .../nifi/reporting/ambari/api/MetricFields.java | 29 -- .../reporting/ambari/api/MetricsBuilder.java | 93 ------ .../reporting/ambari/metrics/MetricNames.java | 55 ---- .../ambari/metrics/MetricsService.java | 131 -------- .../ambari/api/TestMetricsBuilder.java | 2 + .../ambari/metrics/TestMetricsService.java | 2 + .../nifi-reporting-utils/pom.xml | 10 + .../reporting/util/metrics/MetricNames.java | 59 ++++ .../reporting/util/metrics/MetricsService.java | 230 ++++++++++++++ .../util/metrics/api/MetricBuilder.java | 84 ++++++ .../util/metrics/api/MetricFields.java | 29 ++ .../util/metrics/api/MetricsBuilder.java | 93 ++++++ .../nifi-site-to-site-reporting-task/pom.xml | 40 ++- .../AbstractSiteToSiteReportingTask.java | 103 ++++++- .../SiteToSiteBulletinReportingTask.java | 18 +- .../SiteToSiteMetricsReportingTask.java | 221 ++++++++++++++ .../SiteToSiteProvenanceReportingTask.java | 28 +- .../SiteToSiteStatusReportingTask.java | 37 +-- .../org.apache.nifi.reporting.ReportingTask | 3 +- .../additionalDetails.html | 178 +++++++++++ .../additionalDetails.html | 2 +- .../src/main/resources/schema-metrics.avsc | 37 +++ .../TestSiteToSiteMetricsReportingTask.java | 296 +++++++++++++++++++ 26 files changed, 1398 insertions(+), 485 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/930417b9/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/pom.xml b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/pom.xml index 5f30ec5..88f4a3b 100644 --- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/pom.xml +++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/pom.xml @@ -30,21 +30,11 @@ <artifactId>jersey-client</artifactId> </dependency> <dependency> - <groupId>org.glassfish</groupId> - <artifactId>javax.json</artifactId> - <version>1.0.4</version> - </dependency> - <dependency> <groupId>javax.json</groupId> <artifactId>javax.json-api</artifactId> <version>1.0</version> </dependency> <dependency> - <groupId>com.yammer.metrics</groupId> - <artifactId>metrics-core</artifactId> - <version>2.2.0</version> - </dependency> - <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-api</artifactId> </dependency> @@ -53,6 +43,11 @@ <artifactId>nifi-utils</artifactId> <version>1.6.0-SNAPSHOT</version> </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-reporting-utils</artifactId> + <version>1.6.0-SNAPSHOT</version> + </dependency> <!-- test dependencies --> <dependency> <groupId>org.apache.nifi</groupId> http://git-wip-us.apache.org/repos/asf/nifi/blob/930417b9/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java index a5ce9f4..eadef74 100644 --- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java +++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java @@ -28,8 +28,8 @@ import org.apache.nifi.controller.status.ProcessGroupStatus; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.reporting.AbstractReportingTask; import org.apache.nifi.reporting.ReportingContext; -import org.apache.nifi.reporting.ambari.api.MetricsBuilder; -import org.apache.nifi.reporting.ambari.metrics.MetricsService; +import org.apache.nifi.reporting.util.metrics.MetricsService; +import org.apache.nifi.reporting.util.metrics.api.MetricsBuilder; import org.apache.nifi.scheduling.SchedulingStrategy; import javax.json.Json; http://git-wip-us.apache.org/repos/asf/nifi/blob/930417b9/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricBuilder.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricBuilder.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricBuilder.java deleted file mode 100644 index 8e234ce..0000000 --- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricBuilder.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.reporting.ambari.api; - -import javax.json.JsonBuilderFactory; -import javax.json.JsonObject; - -/** - * Builds the JsonObject for an individual metric. - */ -public class MetricBuilder { - - private final JsonBuilderFactory factory; - - private String applicationId; - private String instanceId; - private String hostname; - private String timestamp; - private String metricName; - private String metricValue; - - public MetricBuilder(final JsonBuilderFactory factory) { - this.factory = factory; - } - - public MetricBuilder applicationId(final String applicationId) { - this.applicationId = applicationId; - return this; - } - - public MetricBuilder instanceId(final String instanceId) { - this.instanceId = instanceId; - return this; - } - - public MetricBuilder hostname(final String hostname) { - this.hostname = hostname; - return this; - } - - public MetricBuilder timestamp(final long timestamp) { - this.timestamp = String.valueOf(timestamp); - return this; - } - - public MetricBuilder metricName(final String metricName) { - this.metricName = metricName; - return this; - } - - public MetricBuilder metricValue(final String metricValue) { - this.metricValue = metricValue; - return this; - } - - public JsonObject build() { - return factory.createObjectBuilder() - .add(MetricFields.METRIC_NAME, metricName) - .add(MetricFields.APP_ID, applicationId) - .add(MetricFields.INSTANCE_ID, instanceId) - .add(MetricFields.HOSTNAME, hostname) - .add(MetricFields.TIMESTAMP, timestamp) - .add(MetricFields.START_TIME, timestamp) - .add(MetricFields.METRICS, - factory.createObjectBuilder() - .add(String.valueOf(timestamp), metricValue) - ).build(); - } - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/930417b9/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricFields.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricFields.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricFields.java deleted file mode 100644 index 1c1629c..0000000 --- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricFields.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.reporting.ambari.api; - -public interface MetricFields { - - String METRIC_NAME = "metricname"; - String APP_ID = "appid"; - String INSTANCE_ID = "instanceid"; - String HOSTNAME = "hostname"; - String TIMESTAMP = "timestamp"; - String START_TIME = "starttime"; - String METRICS = "metrics"; - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/930417b9/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricsBuilder.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricsBuilder.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricsBuilder.java deleted file mode 100644 index 11b4db5..0000000 --- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricsBuilder.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.reporting.ambari.api; - -import javax.json.JsonArrayBuilder; -import javax.json.JsonBuilderFactory; -import javax.json.JsonObject; -import javax.json.JsonObjectBuilder; -import java.util.HashMap; -import java.util.Map; - -/** - * Builds the overall JsonObject for the Metrics. - */ -public class MetricsBuilder { - - static final String ROOT_JSON_ELEMENT = "metrics"; - - private final JsonBuilderFactory factory; - - private long timestamp; - private String applicationId; - private String instanceId; - private String hostname; - private Map<String,String> metrics = new HashMap<>(); - - public MetricsBuilder(final JsonBuilderFactory factory) { - this.factory = factory; - } - - public MetricsBuilder applicationId(final String applicationId) { - this.applicationId = applicationId; - return this; - } - - public MetricsBuilder instanceId(final String instanceId) { - this.instanceId = instanceId; - return this; - } - - public MetricsBuilder hostname(final String hostname) { - this.hostname = hostname; - return this; - } - - public MetricsBuilder timestamp(final long timestamp) { - this.timestamp = timestamp; - return this; - } - - public MetricsBuilder metric(final String name, String value) { - this.metrics.put(name, value); - return this; - } - - public MetricsBuilder addAllMetrics(final Map<String,String> metrics) { - this.metrics.putAll(metrics); - return this; - } - - public JsonObject build() { - // builds JsonObject for individual metrics - final MetricBuilder metricBuilder = new MetricBuilder(factory); - metricBuilder.instanceId(instanceId).applicationId(applicationId).timestamp(timestamp).hostname(hostname); - - final JsonArrayBuilder metricArrayBuilder = factory.createArrayBuilder(); - - for (Map.Entry<String,String> entry : metrics.entrySet()) { - metricBuilder.metricName(entry.getKey()).metricValue(entry.getValue()); - metricArrayBuilder.add(metricBuilder.build()); - } - - // add the array of metrics to a top-level json object - final JsonObjectBuilder metricsBuilder = factory.createObjectBuilder(); - metricsBuilder.add(ROOT_JSON_ELEMENT, metricArrayBuilder); - return metricsBuilder.build(); - } - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/930417b9/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricNames.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricNames.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricNames.java deleted file mode 100644 index 20cfa4e..0000000 --- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricNames.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.reporting.ambari.metrics; - -/** - * The Metric names to send to Ambari. - */ -public interface MetricNames { - - // Metric Name separator - String METRIC_NAME_SEPARATOR = "."; - - // NiFi Metrics - String FLOW_FILES_RECEIVED = "FlowFilesReceivedLast5Minutes"; - String BYTES_RECEIVED = "BytesReceivedLast5Minutes"; - String FLOW_FILES_SENT = "FlowFilesSentLast5Minutes"; - String BYTES_SENT = "BytesSentLast5Minutes"; - String FLOW_FILES_QUEUED = "FlowFilesQueued"; - String BYTES_QUEUED = "BytesQueued"; - String BYTES_READ = "BytesReadLast5Minutes"; - String BYTES_WRITTEN = "BytesWrittenLast5Minutes"; - String ACTIVE_THREADS = "ActiveThreads"; - String TOTAL_TASK_DURATION_SECONDS = "TotalTaskDurationSeconds"; - String TOTAL_TASK_DURATION_NANOS = "TotalTaskDurationNanoSeconds"; - - // JVM Metrics - String JVM_UPTIME = "jvm.uptime"; - String JVM_HEAP_USED = "jvm.heap_used"; - String JVM_HEAP_USAGE = "jvm.heap_usage"; - String JVM_NON_HEAP_USAGE = "jvm.non_heap_usage"; - String JVM_THREAD_STATES_RUNNABLE = "jvm.thread_states.runnable"; - String JVM_THREAD_STATES_BLOCKED = "jvm.thread_states.blocked"; - String JVM_THREAD_STATES_TIMED_WAITING = "jvm.thread_states.timed_waiting"; - String JVM_THREAD_STATES_TERMINATED = "jvm.thread_states.terminated"; - String JVM_THREAD_COUNT = "jvm.thread_count"; - String JVM_DAEMON_THREAD_COUNT = "jvm.daemon_thread_count"; - String JVM_FILE_DESCRIPTOR_USAGE = "jvm.file_descriptor_usage"; - String JVM_GC_RUNS = "jvm.gc.runs"; - String JVM_GC_TIME = "jvm.gc.time"; - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/930417b9/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricsService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricsService.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricsService.java deleted file mode 100644 index cef257d..0000000 --- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricsService.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.reporting.ambari.metrics; - -import com.yammer.metrics.core.VirtualMachineMetrics; -import org.apache.nifi.controller.status.ProcessGroupStatus; -import org.apache.nifi.controller.status.ProcessorStatus; - -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -/** - * A service used to produce key/value metrics based on a given input. - */ -public class MetricsService { - - /** - * Generates a Map of metrics for a ProcessGroupStatus instance. - * - * @param status a ProcessGroupStatus to get metrics from - * @param appendPgId if true, the process group ID will be appended at the end of the metric name - * @return a map of metrics for the given status - */ - public Map<String,String> getMetrics(ProcessGroupStatus status, boolean appendPgId) { - final Map<String,String> metrics = new HashMap<>(); - metrics.put(appendPgId(MetricNames.FLOW_FILES_RECEIVED, status, appendPgId), String.valueOf(status.getFlowFilesReceived())); - metrics.put(appendPgId(MetricNames.BYTES_RECEIVED, status, appendPgId), String.valueOf(status.getBytesReceived())); - metrics.put(appendPgId(MetricNames.FLOW_FILES_SENT, status, appendPgId), String.valueOf(status.getFlowFilesSent())); - metrics.put(appendPgId(MetricNames.BYTES_SENT, status, appendPgId), String.valueOf(status.getBytesSent())); - metrics.put(appendPgId(MetricNames.FLOW_FILES_QUEUED, status, appendPgId), String.valueOf(status.getQueuedCount())); - metrics.put(appendPgId(MetricNames.BYTES_QUEUED, status, appendPgId), String.valueOf(status.getQueuedContentSize())); - metrics.put(appendPgId(MetricNames.BYTES_READ, status, appendPgId), String.valueOf(status.getBytesRead())); - metrics.put(appendPgId(MetricNames.BYTES_WRITTEN, status, appendPgId), String.valueOf(status.getBytesWritten())); - metrics.put(appendPgId(MetricNames.ACTIVE_THREADS, status, appendPgId), String.valueOf(status.getActiveThreadCount())); - - final long durationNanos = calculateProcessingNanos(status); - metrics.put(appendPgId(MetricNames.TOTAL_TASK_DURATION_NANOS, status, appendPgId), String.valueOf(durationNanos)); - - final long durationSeconds = TimeUnit.SECONDS.convert(durationNanos, TimeUnit.NANOSECONDS); - metrics.put(appendPgId(MetricNames.TOTAL_TASK_DURATION_SECONDS, status, appendPgId), String.valueOf(durationSeconds)); - - return metrics; - } - - /** - * Generates a Map of metrics for VirtualMachineMetrics. - * - * @param virtualMachineMetrics a VirtualMachineMetrics instance to get metrics from - * @return a map of metrics from the given VirtualMachineStatus - */ - public Map<String,String> getMetrics(VirtualMachineMetrics virtualMachineMetrics) { - final Map<String,String> metrics = new HashMap<>(); - metrics.put(MetricNames.JVM_UPTIME, String.valueOf(virtualMachineMetrics.uptime())); - metrics.put(MetricNames.JVM_HEAP_USED, String.valueOf(virtualMachineMetrics.heapUsed())); - metrics.put(MetricNames.JVM_HEAP_USAGE, String.valueOf(virtualMachineMetrics.heapUsage())); - metrics.put(MetricNames.JVM_NON_HEAP_USAGE, String.valueOf(virtualMachineMetrics.nonHeapUsage())); - metrics.put(MetricNames.JVM_THREAD_COUNT, String.valueOf(virtualMachineMetrics.threadCount())); - metrics.put(MetricNames.JVM_DAEMON_THREAD_COUNT, String.valueOf(virtualMachineMetrics.daemonThreadCount())); - metrics.put(MetricNames.JVM_FILE_DESCRIPTOR_USAGE, String.valueOf(virtualMachineMetrics.fileDescriptorUsage())); - - for (Map.Entry<Thread.State,Double> entry : virtualMachineMetrics.threadStatePercentages().entrySet()) { - final int normalizedValue = (int) (100 * (entry.getValue() == null ? 0 : entry.getValue())); - switch(entry.getKey()) { - case BLOCKED: - metrics.put(MetricNames.JVM_THREAD_STATES_BLOCKED, String.valueOf(normalizedValue)); - break; - case RUNNABLE: - metrics.put(MetricNames.JVM_THREAD_STATES_RUNNABLE, String.valueOf(normalizedValue)); - break; - case TERMINATED: - metrics.put(MetricNames.JVM_THREAD_STATES_TERMINATED, String.valueOf(normalizedValue)); - break; - case TIMED_WAITING: - metrics.put(MetricNames.JVM_THREAD_STATES_TIMED_WAITING, String.valueOf(normalizedValue)); - break; - default: - break; - } - } - - for (Map.Entry<String,VirtualMachineMetrics.GarbageCollectorStats> entry : virtualMachineMetrics.garbageCollectors().entrySet()) { - final String gcName = entry.getKey().replace(" ", ""); - final long runs = entry.getValue().getRuns(); - final long timeMS = entry.getValue().getTime(TimeUnit.MILLISECONDS); - metrics.put(MetricNames.JVM_GC_RUNS + "." + gcName, String.valueOf(runs)); - metrics.put(MetricNames.JVM_GC_TIME + "." + gcName, String.valueOf(timeMS)); - } - - return metrics; - } - - // calculates the total processing time of all processors in nanos - protected long calculateProcessingNanos(final ProcessGroupStatus status) { - long nanos = 0L; - - for (final ProcessorStatus procStats : status.getProcessorStatus()) { - nanos += procStats.getProcessingNanos(); - } - - for (final ProcessGroupStatus childGroupStatus : status.getProcessGroupStatus()) { - nanos += calculateProcessingNanos(childGroupStatus); - } - - return nanos; - } - - // append the process group ID if necessary - private String appendPgId(String name, ProcessGroupStatus status, boolean appendPgId) { - if(appendPgId) { - return name + MetricNames.METRIC_NAME_SEPARATOR + status.getId(); - } else { - return name; - } - } - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/930417b9/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/api/TestMetricsBuilder.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/api/TestMetricsBuilder.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/api/TestMetricsBuilder.java index cdaa453..9b96eb9 100644 --- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/api/TestMetricsBuilder.java +++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/api/TestMetricsBuilder.java @@ -16,6 +16,8 @@ */ package org.apache.nifi.reporting.ambari.api; +import org.apache.nifi.reporting.util.metrics.api.MetricFields; +import org.apache.nifi.reporting.util.metrics.api.MetricsBuilder; import org.junit.Assert; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/nifi/blob/930417b9/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/metrics/TestMetricsService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/metrics/TestMetricsService.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/metrics/TestMetricsService.java index 93224eb..ec0cf6e 100644 --- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/metrics/TestMetricsService.java +++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/metrics/TestMetricsService.java @@ -19,6 +19,8 @@ package org.apache.nifi.reporting.ambari.metrics; import com.yammer.metrics.core.VirtualMachineMetrics; import org.apache.nifi.controller.status.ProcessGroupStatus; import org.apache.nifi.controller.status.ProcessorStatus; +import org.apache.nifi.reporting.util.metrics.MetricNames; +import org.apache.nifi.reporting.util.metrics.MetricsService; import org.junit.Assert; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/nifi/blob/930417b9/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/pom.xml b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/pom.xml index e118271..e05b88b 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/pom.xml +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/pom.xml @@ -40,6 +40,16 @@ <artifactId>commons-lang3</artifactId> <version>3.7</version> </dependency> + <dependency> + <groupId>com.yammer.metrics</groupId> + <artifactId>metrics-core</artifactId> + <version>2.2.0</version> + </dependency> + <dependency> + <groupId>org.glassfish</groupId> + <artifactId>javax.json</artifactId> + <version>1.0.4</version> + </dependency> </dependencies> </project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/930417b9/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/MetricNames.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/MetricNames.java b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/MetricNames.java new file mode 100644 index 0000000..19bb90d --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/MetricNames.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.reporting.util.metrics; + +/** + * The Metric names to send to Ambari. + */ +public interface MetricNames { + + // Metric Name separator + String METRIC_NAME_SEPARATOR = "."; + + // NiFi Metrics + String FLOW_FILES_RECEIVED = "FlowFilesReceivedLast5Minutes"; + String BYTES_RECEIVED = "BytesReceivedLast5Minutes"; + String FLOW_FILES_SENT = "FlowFilesSentLast5Minutes"; + String BYTES_SENT = "BytesSentLast5Minutes"; + String FLOW_FILES_QUEUED = "FlowFilesQueued"; + String BYTES_QUEUED = "BytesQueued"; + String BYTES_READ = "BytesReadLast5Minutes"; + String BYTES_WRITTEN = "BytesWrittenLast5Minutes"; + String ACTIVE_THREADS = "ActiveThreads"; + String TOTAL_TASK_DURATION_SECONDS = "TotalTaskDurationSeconds"; + String TOTAL_TASK_DURATION_NANOS = "TotalTaskDurationNanoSeconds"; + + // JVM Metrics + String JVM_UPTIME = "jvm.uptime"; + String JVM_HEAP_USED = "jvm.heap_used"; + String JVM_HEAP_USAGE = "jvm.heap_usage"; + String JVM_NON_HEAP_USAGE = "jvm.non_heap_usage"; + String JVM_THREAD_STATES_RUNNABLE = "jvm.thread_states.runnable"; + String JVM_THREAD_STATES_BLOCKED = "jvm.thread_states.blocked"; + String JVM_THREAD_STATES_TIMED_WAITING = "jvm.thread_states.timed_waiting"; + String JVM_THREAD_STATES_TERMINATED = "jvm.thread_states.terminated"; + String JVM_THREAD_COUNT = "jvm.thread_count"; + String JVM_DAEMON_THREAD_COUNT = "jvm.daemon_thread_count"; + String JVM_FILE_DESCRIPTOR_USAGE = "jvm.file_descriptor_usage"; + String JVM_GC_RUNS = "jvm.gc.runs"; + String JVM_GC_TIME = "jvm.gc.time"; + + // OS Metrics + String LOAD1MN = "loadAverage1min"; + String CORES = "availableCores"; + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/930417b9/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/MetricsService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/MetricsService.java b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/MetricsService.java new file mode 100644 index 0000000..ed3922a --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/MetricsService.java @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.reporting.util.metrics; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import javax.json.JsonBuilderFactory; +import javax.json.JsonObject; +import javax.json.JsonObjectBuilder; + +import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.controller.status.ProcessorStatus; +import org.apache.nifi.reporting.util.metrics.api.MetricFields; + +import com.yammer.metrics.core.VirtualMachineMetrics; + +/** + * A service used to produce key/value metrics based on a given input. + */ +public class MetricsService { + + /** + * Generates a Map of metrics for a ProcessGroupStatus instance. + * + * @param status a ProcessGroupStatus to get metrics from + * @param appendPgId if true, the process group ID will be appended at the end of the metric name + * @return a map of metrics for the given status + */ + public Map<String,String> getMetrics(ProcessGroupStatus status, boolean appendPgId) { + final Map<String,String> metrics = new HashMap<>(); + + Map<String,Long> longMetrics = getLongMetrics(status, appendPgId); + for (String key : longMetrics.keySet()) { + metrics.put(key, String.valueOf(longMetrics.get(key))); + } + + Map<String,Integer> integerMetrics = getIntegerMetrics(status, appendPgId); + for (String key : integerMetrics.keySet()) { + metrics.put(key, String.valueOf(integerMetrics.get(key))); + } + + return metrics; + } + + private Map<String,Integer> getIntegerMetrics(ProcessGroupStatus status, boolean appendPgId) { + final Map<String,Integer> metrics = new HashMap<>(); + metrics.put(appendPgId(MetricNames.FLOW_FILES_RECEIVED, status, appendPgId), status.getFlowFilesReceived()); + metrics.put(appendPgId(MetricNames.FLOW_FILES_SENT, status, appendPgId), status.getFlowFilesSent()); + metrics.put(appendPgId(MetricNames.FLOW_FILES_QUEUED, status, appendPgId), status.getQueuedCount()); + metrics.put(appendPgId(MetricNames.ACTIVE_THREADS, status, appendPgId), status.getActiveThreadCount()); + return metrics; + } + + private Map<String,Long> getLongMetrics(ProcessGroupStatus status, boolean appendPgId) { + final Map<String,Long> metrics = new HashMap<>(); + metrics.put(appendPgId(MetricNames.BYTES_RECEIVED, status, appendPgId), status.getBytesReceived()); + metrics.put(appendPgId(MetricNames.BYTES_SENT, status, appendPgId), status.getBytesSent()); + metrics.put(appendPgId(MetricNames.BYTES_QUEUED, status, appendPgId), status.getQueuedContentSize()); + metrics.put(appendPgId(MetricNames.BYTES_READ, status, appendPgId), status.getBytesRead()); + metrics.put(appendPgId(MetricNames.BYTES_WRITTEN, status, appendPgId), status.getBytesWritten()); + + final long durationNanos = calculateProcessingNanos(status); + metrics.put(appendPgId(MetricNames.TOTAL_TASK_DURATION_NANOS, status, appendPgId), durationNanos); + + final long durationSeconds = TimeUnit.SECONDS.convert(durationNanos, TimeUnit.NANOSECONDS); + metrics.put(appendPgId(MetricNames.TOTAL_TASK_DURATION_SECONDS, status, appendPgId), durationSeconds); + + return metrics; + } + + /** + * Generates a Map of metrics for VirtualMachineMetrics. + * + * @param virtualMachineMetrics a VirtualMachineMetrics instance to get metrics from + * @return a map of metrics from the given VirtualMachineStatus + */ + public Map<String,String> getMetrics(VirtualMachineMetrics virtualMachineMetrics) { + final Map<String,String> metrics = new HashMap<>(); + + Map<String,Integer> integerMetrics = getIntegerMetrics(virtualMachineMetrics); + for (String key : integerMetrics.keySet()) { + metrics.put(key, String.valueOf(integerMetrics.get(key))); + } + + Map<String,Long> longMetrics = getLongMetrics(virtualMachineMetrics); + for (String key : longMetrics.keySet()) { + metrics.put(key, String.valueOf(longMetrics.get(key))); + } + + Map<String,Double> doubleMetrics = getDoubleMetrics(virtualMachineMetrics); + for (String key : doubleMetrics.keySet()) { + metrics.put(key, String.valueOf(doubleMetrics.get(key))); + } + + return metrics; + } + + // calculates the total processing time of all processors in nanos + protected long calculateProcessingNanos(final ProcessGroupStatus status) { + long nanos = 0L; + + for (final ProcessorStatus procStats : status.getProcessorStatus()) { + nanos += procStats.getProcessingNanos(); + } + + for (final ProcessGroupStatus childGroupStatus : status.getProcessGroupStatus()) { + nanos += calculateProcessingNanos(childGroupStatus); + } + + return nanos; + } + + // append the process group ID if necessary + private String appendPgId(String name, ProcessGroupStatus status, boolean appendPgId) { + if(appendPgId) { + return name + MetricNames.METRIC_NAME_SEPARATOR + status.getId(); + } else { + return name; + } + } + + private Map<String,Double> getDoubleMetrics(VirtualMachineMetrics virtualMachineMetrics) { + final Map<String,Double> metrics = new HashMap<>(); + metrics.put(MetricNames.JVM_HEAP_USED, virtualMachineMetrics.heapUsed()); + metrics.put(MetricNames.JVM_HEAP_USAGE, virtualMachineMetrics.heapUsage()); + metrics.put(MetricNames.JVM_NON_HEAP_USAGE, virtualMachineMetrics.nonHeapUsage()); + metrics.put(MetricNames.JVM_FILE_DESCRIPTOR_USAGE, virtualMachineMetrics.fileDescriptorUsage()); + return metrics; + } + + private Map<String,Long> getLongMetrics(VirtualMachineMetrics virtualMachineMetrics) { + final Map<String,Long> metrics = new HashMap<>(); + metrics.put(MetricNames.JVM_UPTIME, virtualMachineMetrics.uptime()); + + for (Map.Entry<String,VirtualMachineMetrics.GarbageCollectorStats> entry : virtualMachineMetrics.garbageCollectors().entrySet()) { + final String gcName = entry.getKey().replace(" ", ""); + final long runs = entry.getValue().getRuns(); + final long timeMS = entry.getValue().getTime(TimeUnit.MILLISECONDS); + metrics.put(MetricNames.JVM_GC_RUNS + "." + gcName, runs); + metrics.put(MetricNames.JVM_GC_TIME + "." + gcName, timeMS); + } + + return metrics; + } + + private Map<String,Integer> getIntegerMetrics(VirtualMachineMetrics virtualMachineMetrics) { + final Map<String,Integer> metrics = new HashMap<>(); + metrics.put(MetricNames.JVM_DAEMON_THREAD_COUNT, virtualMachineMetrics.daemonThreadCount()); + metrics.put(MetricNames.JVM_THREAD_COUNT, virtualMachineMetrics.threadCount()); + + for (Map.Entry<Thread.State,Double> entry : virtualMachineMetrics.threadStatePercentages().entrySet()) { + final int normalizedValue = (int) (100 * (entry.getValue() == null ? 0 : entry.getValue())); + switch(entry.getKey()) { + case BLOCKED: + metrics.put(MetricNames.JVM_THREAD_STATES_BLOCKED, normalizedValue); + break; + case RUNNABLE: + metrics.put(MetricNames.JVM_THREAD_STATES_RUNNABLE, normalizedValue); + break; + case TERMINATED: + metrics.put(MetricNames.JVM_THREAD_STATES_TERMINATED, normalizedValue); + break; + case TIMED_WAITING: + metrics.put(MetricNames.JVM_THREAD_STATES_TIMED_WAITING, normalizedValue); + break; + default: + break; + } + } + + return metrics; + } + + public JsonObject getMetrics(JsonBuilderFactory factory, ProcessGroupStatus status, VirtualMachineMetrics virtualMachineMetrics, + String applicationId, String id, String hostname, long currentTimeMillis, int availableProcessors, double systemLoad) { + JsonObjectBuilder objectBuilder = factory.createObjectBuilder() + .add(MetricFields.APP_ID, applicationId) + .add(MetricFields.HOSTNAME, hostname) + .add(MetricFields.INSTANCE_ID, status.getId()) + .add(MetricFields.TIMESTAMP, currentTimeMillis); + + objectBuilder + .add(MetricNames.CORES, availableProcessors) + .add(MetricNames.LOAD1MN, systemLoad); + + Map<String,Integer> integerMetrics = getIntegerMetrics(virtualMachineMetrics); + for (String key : integerMetrics.keySet()) { + objectBuilder.add(key.replaceAll("\\.", ""), integerMetrics.get(key)); + } + + Map<String,Long> longMetrics = getLongMetrics(virtualMachineMetrics); + for (String key : longMetrics.keySet()) { + objectBuilder.add(key.replaceAll("\\.", ""), longMetrics.get(key)); + } + + Map<String,Double> doubleMetrics = getDoubleMetrics(virtualMachineMetrics); + for (String key : doubleMetrics.keySet()) { + objectBuilder.add(key.replaceAll("\\.", ""), doubleMetrics.get(key)); + } + + Map<String,Long> longPgMetrics = getLongMetrics(status, false); + for (String key : longPgMetrics.keySet()) { + objectBuilder.add(key, longPgMetrics.get(key)); + } + + Map<String,Integer> integerPgMetrics = getIntegerMetrics(status, false); + for (String key : integerPgMetrics.keySet()) { + objectBuilder.add(key, integerPgMetrics.get(key)); + } + + return objectBuilder.build(); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/930417b9/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricBuilder.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricBuilder.java b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricBuilder.java new file mode 100644 index 0000000..81fb021 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricBuilder.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.reporting.util.metrics.api; + +import javax.json.JsonBuilderFactory; +import javax.json.JsonObject; + +/** + * Builds the JsonObject for an individual metric. + */ +public class MetricBuilder { + + private final JsonBuilderFactory factory; + + private String applicationId; + private String instanceId; + private String hostname; + private String timestamp; + private String metricName; + private String metricValue; + + public MetricBuilder(final JsonBuilderFactory factory) { + this.factory = factory; + } + + public MetricBuilder applicationId(final String applicationId) { + this.applicationId = applicationId; + return this; + } + + public MetricBuilder instanceId(final String instanceId) { + this.instanceId = instanceId; + return this; + } + + public MetricBuilder hostname(final String hostname) { + this.hostname = hostname; + return this; + } + + public MetricBuilder timestamp(final long timestamp) { + this.timestamp = String.valueOf(timestamp); + return this; + } + + public MetricBuilder metricName(final String metricName) { + this.metricName = metricName; + return this; + } + + public MetricBuilder metricValue(final String metricValue) { + this.metricValue = metricValue; + return this; + } + + public JsonObject build() { + return factory.createObjectBuilder() + .add(MetricFields.METRIC_NAME, metricName) + .add(MetricFields.APP_ID, applicationId) + .add(MetricFields.INSTANCE_ID, instanceId) + .add(MetricFields.HOSTNAME, hostname) + .add(MetricFields.TIMESTAMP, timestamp) + .add(MetricFields.START_TIME, timestamp) + .add(MetricFields.METRICS, + factory.createObjectBuilder() + .add(String.valueOf(timestamp), metricValue) + ).build(); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/930417b9/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricFields.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricFields.java b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricFields.java new file mode 100644 index 0000000..4c451ea --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricFields.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.reporting.util.metrics.api; + +public interface MetricFields { + + String METRIC_NAME = "metricname"; + String APP_ID = "appid"; + String INSTANCE_ID = "instanceid"; + String HOSTNAME = "hostname"; + String TIMESTAMP = "timestamp"; + String START_TIME = "starttime"; + String METRICS = "metrics"; + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/930417b9/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricsBuilder.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricsBuilder.java b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricsBuilder.java new file mode 100644 index 0000000..3694720 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricsBuilder.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.reporting.util.metrics.api; + +import javax.json.JsonArrayBuilder; +import javax.json.JsonBuilderFactory; +import javax.json.JsonObject; +import javax.json.JsonObjectBuilder; +import java.util.HashMap; +import java.util.Map; + +/** + * Builds the overall JsonObject for the Metrics. + */ +public class MetricsBuilder { + + static final String ROOT_JSON_ELEMENT = "metrics"; + + private final JsonBuilderFactory factory; + + private long timestamp; + private String applicationId; + private String instanceId; + private String hostname; + private Map<String,String> metrics = new HashMap<>(); + + public MetricsBuilder(final JsonBuilderFactory factory) { + this.factory = factory; + } + + public MetricsBuilder applicationId(final String applicationId) { + this.applicationId = applicationId; + return this; + } + + public MetricsBuilder instanceId(final String instanceId) { + this.instanceId = instanceId; + return this; + } + + public MetricsBuilder hostname(final String hostname) { + this.hostname = hostname; + return this; + } + + public MetricsBuilder timestamp(final long timestamp) { + this.timestamp = timestamp; + return this; + } + + public MetricsBuilder metric(final String name, String value) { + this.metrics.put(name, value); + return this; + } + + public MetricsBuilder addAllMetrics(final Map<String,String> metrics) { + this.metrics.putAll(metrics); + return this; + } + + public JsonObject build() { + // builds JsonObject for individual metrics + final MetricBuilder metricBuilder = new MetricBuilder(factory); + metricBuilder.instanceId(instanceId).applicationId(applicationId).timestamp(timestamp).hostname(hostname); + + final JsonArrayBuilder metricArrayBuilder = factory.createArrayBuilder(); + + for (Map.Entry<String,String> entry : metrics.entrySet()) { + metricBuilder.metricName(entry.getKey()).metricValue(entry.getValue()); + metricArrayBuilder.add(metricBuilder.build()); + } + + // add the array of metrics to a top-level json object + final JsonObjectBuilder metricsBuilder = factory.createObjectBuilder(); + metricsBuilder.add(ROOT_JSON_ELEMENT, metricArrayBuilder); + return metricsBuilder.build(); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/930417b9/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/pom.xml b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/pom.xml index d883529..b664e57 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/pom.xml +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/pom.xml @@ -55,6 +55,24 @@ <version>1.6.0-SNAPSHOT</version> </dependency> <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-record-serialization-service-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-record-serialization-services</artifactId> + <version>1.6.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-record</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-avro-record-utils</artifactId> + <version>1.6.0-SNAPSHOT</version> + </dependency> + <dependency> <groupId>org.glassfish</groupId> <artifactId>javax.json</artifactId> <version>1.0.4</version> @@ -83,10 +101,30 @@ <scope>test</scope> </dependency> <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mock-record-utils</artifactId> + <version>1.6.0-SNAPSHOT</version> + <scope>test</scope> + </dependency> + <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> - <version>4.12</version> <scope>test</scope> </dependency> </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + <configuration> + <excludes combine.children="append"> + <exclude>src/main/resources/schema-metrics.avsc</exclude> + </excludes> + </configuration> + </plugin> + </plugins> + </build> + </project> http://git-wip-us.apache.org/repos/asf/nifi/blob/930417b9/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java index 28106a6..c303203 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java @@ -16,6 +16,18 @@ */ package org.apache.nifi.reporting; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import javax.json.JsonObjectBuilder; +import javax.json.JsonValue; +import javax.net.ssl.SSLContext; + import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.PropertyDescriptor; @@ -24,27 +36,39 @@ import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.Validator; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.events.EventReporter; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.json.JsonTreeRowRecordReader; import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.remote.client.SiteToSiteClient; import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol; import org.apache.nifi.remote.protocol.http.HttpProxy; import org.apache.nifi.remote.util.SiteToSiteRestApiClient; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.ssl.RestrictedSSLContextService; import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.util.StringUtils; -import javax.net.ssl.SSLContext; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.TimeUnit; - /** * Base class for ReportingTasks that send data over site-to-site. */ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingTask { + + protected static final String LAST_EVENT_ID_KEY = "last_event_id"; protected static final String DESTINATION_URL_PATH = "/nifi"; + protected static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; + + private final String dateFormat = RecordFieldType.DATE.getDefaultFormat(); + private final String timeFormat = RecordFieldType.TIME.getDefaultFormat(); + private final String timestampFormat = RecordFieldType.TIMESTAMP.getDefaultFormat(); static final PropertyDescriptor DESTINATION_URL = new PropertyDescriptor.Builder() .name("Destination URL") @@ -140,8 +164,16 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT .sensitive(true) .addValidator(StandardValidators.NON_BLANK_VALIDATOR) .build(); + static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder() + .name("record-writer") + .displayName("Record Writer") + .description("Specifies the Controller Service to use for writing out the records.") + .identifiesControllerService(RecordSetWriterFactory.class) + .required(false) + .build(); protected volatile SiteToSiteClient siteToSiteClient; + protected volatile RecordSchema recordSchema; @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { @@ -187,7 +219,7 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT final SiteToSiteTransportProtocol mode = SiteToSiteTransportProtocol.valueOf(context.getProperty(TRANSPORT_PROTOCOL).getValue()); final HttpProxy httpProxy = mode.equals(SiteToSiteTransportProtocol.RAW) || StringUtils.isEmpty(context.getProperty(HTTP_PROXY_HOSTNAME).getValue()) ? null : new HttpProxy(context.getProperty(HTTP_PROXY_HOSTNAME).getValue(), context.getProperty(HTTP_PROXY_PORT).asInteger(), - context.getProperty(HTTP_PROXY_USERNAME).getValue(), context.getProperty(HTTP_PROXY_PASSWORD).getValue()); + context.getProperty(HTTP_PROXY_USERNAME).getValue(), context.getProperty(HTTP_PROXY_PASSWORD).getValue()); siteToSiteClient = new SiteToSiteClient.Builder() .urls(SiteToSiteRestApiClient.parseClusterUrls(destinationUrl)) @@ -214,6 +246,33 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT return this.siteToSiteClient; } + protected byte[] getData(final ReportingContext context, InputStream in, Map<String, String> attributes) { + try (final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, getLogger(), recordSchema, dateFormat, timeFormat, timestampFormat)) { + + final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); + final RecordSchema writeSchema = writerFactory.getSchema(null, recordSchema); + final ByteArrayOutputStream out = new ByteArrayOutputStream(); + + try (final RecordSetWriter writer = writerFactory.createWriter(getLogger(), writeSchema, out)) { + writer.beginRecordSet(); + + Record record; + while ((record = reader.nextRecord()) != null) { + writer.write(record); + } + + final WriteResult writeResult = writer.finishRecordSet(); + + attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType()); + attributes.putAll(writeResult.getAttributes()); + } + + return out.toByteArray(); + } catch (IOException | SchemaNotFoundException | MalformedRecordException e) { + throw new ProcessException("Failed to write metrics using record writer: " + e.getMessage(), e); + } + } + static class NiFiUrlValidator implements Validator { @Override public ValidationResult validate(final String subject, final String input, final ValidationContext context) { @@ -235,4 +294,34 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT } } } + + protected void addField(final JsonObjectBuilder builder, final String key, final Long value) { + if (value != null) { + builder.add(key, value.longValue()); + } + } + + protected void addField(final JsonObjectBuilder builder, final String key, final Integer value) { + if (value != null) { + builder.add(key, value.intValue()); + } + } + + protected void addField(final JsonObjectBuilder builder, final String key, final String value) { + if (value == null) { + return; + } + + builder.add(key, value); + } + + protected void addField(final JsonObjectBuilder builder, final String key, final String value, final boolean allowNullValues) { + if (value == null) { + if (allowNullValues) { + builder.add(key, JsonValue.NULL); + } + } else { + builder.add(key, value); + } + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/930417b9/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java index 03d8f3b..eddf4be 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java @@ -67,9 +67,6 @@ import java.util.concurrent.TimeUnit; @DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min") public class SiteToSiteBulletinReportingTask extends AbstractSiteToSiteReportingTask { - static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; - static final String LAST_EVENT_ID_KEY = "last_event_id"; - static final PropertyDescriptor PLATFORM = new PropertyDescriptor.Builder() .name("Platform") .description("The value to use for the platform field in each provenance event.") @@ -194,7 +191,7 @@ public class SiteToSiteBulletinReportingTask extends AbstractSiteToSiteReporting lastSentBulletinId = currMaxId; } - static JsonObject serialize(final JsonBuilderFactory factory, final JsonObjectBuilder builder, final Bulletin bulletin, final DateFormat df, + private JsonObject serialize(final JsonBuilderFactory factory, final JsonObjectBuilder builder, final Bulletin bulletin, final DateFormat df, final String platform, final String nodeIdentifier) { addField(builder, "objectId", UUID.randomUUID().toString()); @@ -215,17 +212,4 @@ public class SiteToSiteBulletinReportingTask extends AbstractSiteToSiteReporting return builder.build(); } - private static void addField(final JsonObjectBuilder builder, final String key, final Long value) { - if (value != null) { - builder.add(key, value.longValue()); - } - } - - private static void addField(final JsonObjectBuilder builder, final String key, final String value) { - if (value == null) { - return; - } - builder.add(key, value); - } - } http://git-wip-us.apache.org/repos/asf/nifi/blob/930417b9/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java new file mode 100644 index 0000000..0bb7501 --- /dev/null +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.reporting; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.lang.management.ManagementFactory; +import java.lang.management.OperatingSystemMXBean; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import javax.json.Json; +import javax.json.JsonBuilderFactory; +import javax.json.JsonObject; + +import org.apache.avro.Schema; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.avro.AvroTypeUtil; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.remote.Transaction; +import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.reporting.util.metrics.MetricNames; +import org.apache.nifi.reporting.util.metrics.MetricsService; +import org.apache.nifi.reporting.util.metrics.api.MetricsBuilder; + +import com.yammer.metrics.core.VirtualMachineMetrics; + +@Tags({"status", "metrics", "site", "site to site"}) +@CapabilityDescription("Publishes same metrics as the Ambari Reporting task using the Site To Site protocol.") +public class SiteToSiteMetricsReportingTask extends AbstractSiteToSiteReportingTask { + + static final AllowableValue AMBARI_FORMAT = new AllowableValue("ambari-format", "Ambari Format", "Metrics will be formatted" + + " according to the Ambari Metrics API. See Additional Details in Usage documentation."); + static final AllowableValue RECORD_FORMAT = new AllowableValue("record-format", "Record Format", "Metrics will be formatted" + + " using the Record Writer property of this reporting task. See Additional Details in Usage documentation to" + + " have the description of the default schema."); + + static final PropertyDescriptor APPLICATION_ID = new PropertyDescriptor.Builder() + .name("s2s-metrics-application-id") + .displayName("Application ID") + .description("The Application ID to be included in the metrics") + .required(true) + .expressionLanguageSupported(true) + .defaultValue("nifi") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder() + .name("s2s-metrics-hostname") + .displayName("Hostname") + .description("The Hostname of this NiFi instance to be included in the metrics") + .required(true) + .expressionLanguageSupported(true) + .defaultValue("${hostname(true)}") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + static final PropertyDescriptor FORMAT = new PropertyDescriptor.Builder() + .name("s2s-metrics-format") + .displayName("Output Format") + .description("The output format that will be used for the metrics. If " + RECORD_FORMAT.getDisplayName() + " is selected, " + + "a Record Writer must be provided. If " + AMBARI_FORMAT.getDisplayName() + " is selected, the Record Writer property " + + "should be empty.") + .required(true) + .allowableValues(AMBARI_FORMAT, RECORD_FORMAT) + .defaultValue(AMBARI_FORMAT.getValue()) + .addValidator(Validator.VALID) + .build(); + + private final MetricsService metricsService = new MetricsService(); + + public SiteToSiteMetricsReportingTask() throws IOException { + final InputStream schema = getClass().getClassLoader().getResourceAsStream("schema-metrics.avsc"); + recordSchema = AvroTypeUtil.createSchema(new Schema.Parser().parse(schema)); + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); + properties.add(HOSTNAME); + properties.add(APPLICATION_ID); + properties.add(FORMAT); + properties.add(RECORD_WRITER); + properties.remove(BATCH_SIZE); + return properties; + } + + @Override + protected Collection<ValidationResult> customValidate(ValidationContext validationContext) { + final List<ValidationResult> problems = new ArrayList<>(super.customValidate(validationContext)); + + final boolean isWriterSet = validationContext.getProperty(RECORD_WRITER).isSet(); + if (validationContext.getProperty(FORMAT).getValue().equals(RECORD_FORMAT.getValue()) && !isWriterSet) { + problems.add(new ValidationResult.Builder() + .input("Record Writer") + .valid(false) + .explanation("If using " + RECORD_FORMAT.getDisplayName() + ", a record writer needs to be set.") + .build()); + } + if (validationContext.getProperty(FORMAT).getValue().equals(AMBARI_FORMAT.getValue()) && isWriterSet) { + problems.add(new ValidationResult.Builder() + .input("Record Writer") + .valid(false) + .explanation("If using " + AMBARI_FORMAT.getDisplayName() + ", no record writer should be set.") + .build()); + } + + return problems; + } + + @Override + public void onTrigger(final ReportingContext context) { + final boolean isClustered = context.isClustered(); + final String nodeId = context.getClusterNodeIdentifier(); + if (nodeId == null && isClustered) { + getLogger().debug("This instance of NiFi is configured for clustering, but the Cluster Node Identifier is not yet available. " + + "Will wait for Node Identifier to be established."); + return; + } + + final VirtualMachineMetrics virtualMachineMetrics = VirtualMachineMetrics.getInstance(); + final Map<String, ?> config = Collections.emptyMap(); + final JsonBuilderFactory factory = Json.createBuilderFactory(config); + + final String applicationId = context.getProperty(APPLICATION_ID).evaluateAttributeExpressions().getValue(); + final String hostname = context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue(); + final ProcessGroupStatus status = context.getEventAccess().getControllerStatus(); + + if(status != null) { + final Map<String,String> statusMetrics = metricsService.getMetrics(status, false); + final Map<String,String> jvmMetrics = metricsService.getMetrics(virtualMachineMetrics); + + final MetricsBuilder metricsBuilder = new MetricsBuilder(factory); + final OperatingSystemMXBean os = ManagementFactory.getOperatingSystemMXBean(); + final double systemLoad = os.getSystemLoadAverage(); + + byte[] data; + final Map<String, String> attributes = new HashMap<>(); + + if(context.getProperty(FORMAT).getValue().equals(AMBARI_FORMAT.getValue())) { + final JsonObject metricsObject = metricsBuilder + .applicationId(applicationId) + .instanceId(status.getId()) + .hostname(hostname) + .timestamp(System.currentTimeMillis()) + .addAllMetrics(statusMetrics) + .addAllMetrics(jvmMetrics) + .metric(MetricNames.CORES, String.valueOf(os.getAvailableProcessors())) + .metric(MetricNames.LOAD1MN, String.valueOf(systemLoad >= 0 ? systemLoad : -1)) + .build(); + + data = metricsObject.toString().getBytes(StandardCharsets.UTF_8); + attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json"); + } else { + final JsonObject metricsObject = metricsService.getMetrics(factory, status, virtualMachineMetrics, applicationId, status.getId(), + hostname, System.currentTimeMillis(), os.getAvailableProcessors(), systemLoad >= 0 ? systemLoad : -1); + data = getData(context, new ByteArrayInputStream(metricsObject.toString().getBytes(StandardCharsets.UTF_8)), attributes); + } + + try { + long start = System.nanoTime(); + final Transaction transaction = getClient().createTransaction(TransferDirection.SEND); + if (transaction == null) { + getLogger().debug("All destination nodes are penalized; will attempt to send data later"); + return; + } + + final String transactionId = UUID.randomUUID().toString(); + attributes.put("reporting.task.transaction.id", transactionId); + attributes.put("reporting.task.name", getName()); + attributes.put("reporting.task.uuid", getIdentifier()); + attributes.put("reporting.task.type", this.getClass().getSimpleName()); + + transaction.send(data, attributes); + transaction.confirm(); + transaction.complete(); + + final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + getLogger().info("Successfully sent metrics to destination in {}ms; Transaction ID = {}", new Object[]{transferMillis, transactionId}); + } catch (final Exception e) { + throw new ProcessException("Failed to send metrics to destination due to:" + e.getMessage(), e); + } + + } else { + getLogger().error("No process group status to retrieve metrics"); + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/930417b9/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java index f7a59db..fe407eb 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java @@ -45,7 +45,6 @@ import javax.json.JsonArrayBuilder; import javax.json.JsonBuilderFactory; import javax.json.JsonObject; import javax.json.JsonObjectBuilder; -import javax.json.JsonValue; import java.io.IOException; import java.net.MalformedURLException; import java.net.URL; @@ -75,9 +74,6 @@ import java.util.concurrent.TimeUnit; ) public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReportingTask { - static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; - static final String LAST_EVENT_ID_KEY = "last_event_id"; - static final AllowableValue BEGINNING_OF_STREAM = new AllowableValue("beginning-of-stream", "Beginning of Stream", "Start reading provenance Events from the beginning of the stream (the oldest event first)"); static final AllowableValue END_OF_STREAM = new AllowableValue("end-of-stream", "End of Stream", @@ -306,7 +302,7 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti } - static JsonObject serialize(final JsonBuilderFactory factory, final JsonObjectBuilder builder, final ProvenanceEventRecord event, final DateFormat df, + private JsonObject serialize(final JsonBuilderFactory factory, final JsonObjectBuilder builder, final ProvenanceEventRecord event, final DateFormat df, final String componentName, final String processGroupId, final String processGroupName, final String hostname, final URL nifiUrl, final String applicationName, final String platform, final String nodeIdentifier) { addField(builder, "eventId", UUID.randomUUID().toString()); @@ -370,13 +366,7 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti builder.add(key, mapBuilder); } - private static void addField(final JsonObjectBuilder builder, final String key, final Long value) { - if (value != null) { - builder.add(key, value.longValue()); - } - } - - private static void addField(final JsonObjectBuilder builder, final JsonBuilderFactory factory, final String key, final Collection<String> values) { + private void addField(final JsonObjectBuilder builder, final JsonBuilderFactory factory, final String key, final Collection<String> values) { if (values == null) { return; } @@ -384,20 +374,6 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti builder.add(key, createJsonArray(factory, values)); } - private static void addField(final JsonObjectBuilder builder, final String key, final String value) { - addField(builder, key, value, false); - } - - private static void addField(final JsonObjectBuilder builder, final String key, final String value, final boolean allowNullValues) { - if (value == null) { - if (allowNullValues) { - builder.add(key, JsonValue.NULL); - } - } else { - builder.add(key, value); - } - } - private static JsonArrayBuilder createJsonArray(JsonBuilderFactory factory, final Collection<String> values) { final JsonArrayBuilder builder = factory.createArrayBuilder(); for (final String value : values) {
