NIFI-4809 - Implement a SiteToSiteMetricsReportingTask

address review comments

NIFI-4809 - Added Record Writer property

added unit tests and additional details doc

review comments

Signed-off-by: Matthew Burgess <[email protected]>

This closes #2430


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/930417b9
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/930417b9
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/930417b9

Branch: refs/heads/master
Commit: 930417b9dcabb430df79c683acb9c927aa4f5b8b
Parents: 844da06
Author: Pierre Villard <[email protected]>
Authored: Tue Jan 23 23:15:18 2018 +0100
Committer: Matthew Burgess <[email protected]>
Committed: Thu Mar 15 15:58:31 2018 -0400

----------------------------------------------------------------------
 .../nifi-ambari-reporting-task/pom.xml          |  15 +-
 .../reporting/ambari/AmbariReportingTask.java   |   4 +-
 .../reporting/ambari/api/MetricBuilder.java     |  84 ------
 .../nifi/reporting/ambari/api/MetricFields.java |  29 --
 .../reporting/ambari/api/MetricsBuilder.java    |  93 ------
 .../reporting/ambari/metrics/MetricNames.java   |  55 ----
 .../ambari/metrics/MetricsService.java          | 131 --------
 .../ambari/api/TestMetricsBuilder.java          |   2 +
 .../ambari/metrics/TestMetricsService.java      |   2 +
 .../nifi-reporting-utils/pom.xml                |  10 +
 .../reporting/util/metrics/MetricNames.java     |  59 ++++
 .../reporting/util/metrics/MetricsService.java  | 230 ++++++++++++++
 .../util/metrics/api/MetricBuilder.java         |  84 ++++++
 .../util/metrics/api/MetricFields.java          |  29 ++
 .../util/metrics/api/MetricsBuilder.java        |  93 ++++++
 .../nifi-site-to-site-reporting-task/pom.xml    |  40 ++-
 .../AbstractSiteToSiteReportingTask.java        | 103 ++++++-
 .../SiteToSiteBulletinReportingTask.java        |  18 +-
 .../SiteToSiteMetricsReportingTask.java         | 221 ++++++++++++++
 .../SiteToSiteProvenanceReportingTask.java      |  28 +-
 .../SiteToSiteStatusReportingTask.java          |  37 +--
 .../org.apache.nifi.reporting.ReportingTask     |   3 +-
 .../additionalDetails.html                      | 178 +++++++++++
 .../additionalDetails.html                      |   2 +-
 .../src/main/resources/schema-metrics.avsc      |  37 +++
 .../TestSiteToSiteMetricsReportingTask.java     | 296 +++++++++++++++++++
 26 files changed, 1398 insertions(+), 485 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/930417b9/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/pom.xml 
b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/pom.xml
index 5f30ec5..88f4a3b 100644
--- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/pom.xml
+++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/pom.xml
@@ -30,21 +30,11 @@
             <artifactId>jersey-client</artifactId>
         </dependency>
         <dependency>
-            <groupId>org.glassfish</groupId>
-            <artifactId>javax.json</artifactId>
-            <version>1.0.4</version>
-        </dependency>
-        <dependency>
             <groupId>javax.json</groupId>
             <artifactId>javax.json-api</artifactId>
             <version>1.0</version>
         </dependency>
         <dependency>
-            <groupId>com.yammer.metrics</groupId>
-            <artifactId>metrics-core</artifactId>
-            <version>2.2.0</version>
-        </dependency>
-        <dependency>
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-api</artifactId>
         </dependency>
@@ -53,6 +43,11 @@
             <artifactId>nifi-utils</artifactId>
             <version>1.6.0-SNAPSHOT</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-reporting-utils</artifactId>
+            <version>1.6.0-SNAPSHOT</version>
+        </dependency>
         <!-- test dependencies -->
         <dependency>
             <groupId>org.apache.nifi</groupId>

http://git-wip-us.apache.org/repos/asf/nifi/blob/930417b9/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java
 
b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java
index a5ce9f4..eadef74 100644
--- 
a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java
+++ 
b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java
@@ -28,8 +28,8 @@ import org.apache.nifi.controller.status.ProcessGroupStatus;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.reporting.AbstractReportingTask;
 import org.apache.nifi.reporting.ReportingContext;
-import org.apache.nifi.reporting.ambari.api.MetricsBuilder;
-import org.apache.nifi.reporting.ambari.metrics.MetricsService;
+import org.apache.nifi.reporting.util.metrics.MetricsService;
+import org.apache.nifi.reporting.util.metrics.api.MetricsBuilder;
 import org.apache.nifi.scheduling.SchedulingStrategy;
 
 import javax.json.Json;

http://git-wip-us.apache.org/repos/asf/nifi/blob/930417b9/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricBuilder.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricBuilder.java
 
b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricBuilder.java
deleted file mode 100644
index 8e234ce..0000000
--- 
a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricBuilder.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.reporting.ambari.api;
-
-import javax.json.JsonBuilderFactory;
-import javax.json.JsonObject;
-
-/**
- * Builds the JsonObject for an individual metric.
- */
-public class MetricBuilder {
-
-    private final JsonBuilderFactory factory;
-
-    private String applicationId;
-    private String instanceId;
-    private String hostname;
-    private String timestamp;
-    private String metricName;
-    private String metricValue;
-
-    public MetricBuilder(final JsonBuilderFactory factory) {
-        this.factory = factory;
-    }
-
-    public MetricBuilder applicationId(final String applicationId) {
-        this.applicationId = applicationId;
-        return this;
-    }
-
-    public MetricBuilder instanceId(final String instanceId) {
-        this.instanceId = instanceId;
-        return this;
-    }
-
-    public MetricBuilder hostname(final String hostname) {
-        this.hostname = hostname;
-        return this;
-    }
-
-    public MetricBuilder timestamp(final long timestamp) {
-        this.timestamp = String.valueOf(timestamp);
-        return this;
-    }
-
-    public MetricBuilder metricName(final String metricName) {
-        this.metricName = metricName;
-        return this;
-    }
-
-    public MetricBuilder metricValue(final String metricValue) {
-        this.metricValue = metricValue;
-        return this;
-    }
-
-    public JsonObject build() {
-        return factory.createObjectBuilder()
-                .add(MetricFields.METRIC_NAME, metricName)
-                .add(MetricFields.APP_ID, applicationId)
-                .add(MetricFields.INSTANCE_ID, instanceId)
-                .add(MetricFields.HOSTNAME, hostname)
-                .add(MetricFields.TIMESTAMP, timestamp)
-                .add(MetricFields.START_TIME, timestamp)
-                .add(MetricFields.METRICS,
-                        factory.createObjectBuilder()
-                                .add(String.valueOf(timestamp), metricValue)
-                ).build();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/930417b9/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricFields.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricFields.java
 
b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricFields.java
deleted file mode 100644
index 1c1629c..0000000
--- 
a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricFields.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.reporting.ambari.api;
-
-public interface MetricFields {
-
-    String METRIC_NAME = "metricname";
-    String APP_ID = "appid";
-    String INSTANCE_ID = "instanceid";
-    String HOSTNAME = "hostname";
-    String TIMESTAMP = "timestamp";
-    String START_TIME = "starttime";
-    String METRICS = "metrics";
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/930417b9/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricsBuilder.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricsBuilder.java
 
b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricsBuilder.java
deleted file mode 100644
index 11b4db5..0000000
--- 
a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricsBuilder.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.reporting.ambari.api;
-
-import javax.json.JsonArrayBuilder;
-import javax.json.JsonBuilderFactory;
-import javax.json.JsonObject;
-import javax.json.JsonObjectBuilder;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Builds the overall JsonObject for the Metrics.
- */
-public class MetricsBuilder {
-
-    static final String ROOT_JSON_ELEMENT = "metrics";
-
-    private final JsonBuilderFactory factory;
-
-    private long timestamp;
-    private String applicationId;
-    private String instanceId;
-    private String hostname;
-    private Map<String,String> metrics = new HashMap<>();
-
-    public MetricsBuilder(final JsonBuilderFactory factory) {
-        this.factory = factory;
-    }
-
-    public MetricsBuilder applicationId(final String applicationId) {
-        this.applicationId = applicationId;
-        return this;
-    }
-
-    public MetricsBuilder instanceId(final String instanceId) {
-        this.instanceId = instanceId;
-        return this;
-    }
-
-    public MetricsBuilder hostname(final String hostname) {
-        this.hostname = hostname;
-        return this;
-    }
-
-    public MetricsBuilder timestamp(final long timestamp) {
-        this.timestamp = timestamp;
-        return this;
-    }
-
-    public MetricsBuilder metric(final String name, String value) {
-        this.metrics.put(name, value);
-        return this;
-    }
-
-    public MetricsBuilder addAllMetrics(final Map<String,String> metrics) {
-        this.metrics.putAll(metrics);
-        return this;
-    }
-
-    public JsonObject build() {
-        // builds JsonObject for individual metrics
-        final MetricBuilder metricBuilder = new MetricBuilder(factory);
-        
metricBuilder.instanceId(instanceId).applicationId(applicationId).timestamp(timestamp).hostname(hostname);
-
-        final JsonArrayBuilder metricArrayBuilder = 
factory.createArrayBuilder();
-
-        for (Map.Entry<String,String> entry : metrics.entrySet()) {
-            
metricBuilder.metricName(entry.getKey()).metricValue(entry.getValue());
-            metricArrayBuilder.add(metricBuilder.build());
-        }
-
-        // add the array of metrics to a top-level json object
-        final JsonObjectBuilder metricsBuilder = factory.createObjectBuilder();
-        metricsBuilder.add(ROOT_JSON_ELEMENT, metricArrayBuilder);
-        return metricsBuilder.build();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/930417b9/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricNames.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricNames.java
 
b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricNames.java
deleted file mode 100644
index 20cfa4e..0000000
--- 
a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricNames.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.reporting.ambari.metrics;
-
-/**
- * The Metric names to send to Ambari.
- */
-public interface MetricNames {
-
-    // Metric Name separator
-    String METRIC_NAME_SEPARATOR = ".";
-
-    // NiFi Metrics
-    String FLOW_FILES_RECEIVED = "FlowFilesReceivedLast5Minutes";
-    String BYTES_RECEIVED = "BytesReceivedLast5Minutes";
-    String FLOW_FILES_SENT = "FlowFilesSentLast5Minutes";
-    String BYTES_SENT = "BytesSentLast5Minutes";
-    String FLOW_FILES_QUEUED = "FlowFilesQueued";
-    String BYTES_QUEUED = "BytesQueued";
-    String BYTES_READ = "BytesReadLast5Minutes";
-    String BYTES_WRITTEN = "BytesWrittenLast5Minutes";
-    String ACTIVE_THREADS = "ActiveThreads";
-    String TOTAL_TASK_DURATION_SECONDS = "TotalTaskDurationSeconds";
-    String TOTAL_TASK_DURATION_NANOS = "TotalTaskDurationNanoSeconds";
-
-    // JVM Metrics
-    String JVM_UPTIME = "jvm.uptime";
-    String JVM_HEAP_USED = "jvm.heap_used";
-    String JVM_HEAP_USAGE = "jvm.heap_usage";
-    String JVM_NON_HEAP_USAGE = "jvm.non_heap_usage";
-    String JVM_THREAD_STATES_RUNNABLE = "jvm.thread_states.runnable";
-    String JVM_THREAD_STATES_BLOCKED = "jvm.thread_states.blocked";
-    String JVM_THREAD_STATES_TIMED_WAITING = "jvm.thread_states.timed_waiting";
-    String JVM_THREAD_STATES_TERMINATED = "jvm.thread_states.terminated";
-    String JVM_THREAD_COUNT = "jvm.thread_count";
-    String JVM_DAEMON_THREAD_COUNT = "jvm.daemon_thread_count";
-    String JVM_FILE_DESCRIPTOR_USAGE = "jvm.file_descriptor_usage";
-    String JVM_GC_RUNS = "jvm.gc.runs";
-    String JVM_GC_TIME = "jvm.gc.time";
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/930417b9/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricsService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricsService.java
 
b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricsService.java
deleted file mode 100644
index cef257d..0000000
--- 
a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricsService.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.reporting.ambari.metrics;
-
-import com.yammer.metrics.core.VirtualMachineMetrics;
-import org.apache.nifi.controller.status.ProcessGroupStatus;
-import org.apache.nifi.controller.status.ProcessorStatus;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-/**
- * A service used to produce key/value metrics based on a given input.
- */
-public class MetricsService {
-
-    /**
-     * Generates a Map of metrics for a ProcessGroupStatus instance.
-     *
-     * @param status a ProcessGroupStatus to get metrics from
-     * @param appendPgId if true, the process group ID will be appended at the 
end of the metric name
-     * @return a map of metrics for the given status
-     */
-    public Map<String,String> getMetrics(ProcessGroupStatus status, boolean 
appendPgId) {
-        final Map<String,String> metrics = new HashMap<>();
-        metrics.put(appendPgId(MetricNames.FLOW_FILES_RECEIVED, status, 
appendPgId), String.valueOf(status.getFlowFilesReceived()));
-        metrics.put(appendPgId(MetricNames.BYTES_RECEIVED, status, 
appendPgId), String.valueOf(status.getBytesReceived()));
-        metrics.put(appendPgId(MetricNames.FLOW_FILES_SENT, status, 
appendPgId), String.valueOf(status.getFlowFilesSent()));
-        metrics.put(appendPgId(MetricNames.BYTES_SENT, status, appendPgId), 
String.valueOf(status.getBytesSent()));
-        metrics.put(appendPgId(MetricNames.FLOW_FILES_QUEUED, status, 
appendPgId), String.valueOf(status.getQueuedCount()));
-        metrics.put(appendPgId(MetricNames.BYTES_QUEUED, status, appendPgId), 
String.valueOf(status.getQueuedContentSize()));
-        metrics.put(appendPgId(MetricNames.BYTES_READ, status, appendPgId), 
String.valueOf(status.getBytesRead()));
-        metrics.put(appendPgId(MetricNames.BYTES_WRITTEN, status, appendPgId), 
String.valueOf(status.getBytesWritten()));
-        metrics.put(appendPgId(MetricNames.ACTIVE_THREADS, status, 
appendPgId), String.valueOf(status.getActiveThreadCount()));
-
-        final long durationNanos = calculateProcessingNanos(status);
-        metrics.put(appendPgId(MetricNames.TOTAL_TASK_DURATION_NANOS, status, 
appendPgId), String.valueOf(durationNanos));
-
-        final long durationSeconds = TimeUnit.SECONDS.convert(durationNanos, 
TimeUnit.NANOSECONDS);
-        metrics.put(appendPgId(MetricNames.TOTAL_TASK_DURATION_SECONDS, 
status, appendPgId), String.valueOf(durationSeconds));
-
-        return metrics;
-    }
-
-    /**
-     * Generates a Map of metrics for VirtualMachineMetrics.
-     *
-     * @param virtualMachineMetrics a VirtualMachineMetrics instance to get 
metrics from
-     * @return a map of metrics from the given VirtualMachineStatus
-     */
-    public Map<String,String> getMetrics(VirtualMachineMetrics 
virtualMachineMetrics) {
-        final Map<String,String> metrics = new HashMap<>();
-        metrics.put(MetricNames.JVM_UPTIME, 
String.valueOf(virtualMachineMetrics.uptime()));
-        metrics.put(MetricNames.JVM_HEAP_USED, 
String.valueOf(virtualMachineMetrics.heapUsed()));
-        metrics.put(MetricNames.JVM_HEAP_USAGE, 
String.valueOf(virtualMachineMetrics.heapUsage()));
-        metrics.put(MetricNames.JVM_NON_HEAP_USAGE, 
String.valueOf(virtualMachineMetrics.nonHeapUsage()));
-        metrics.put(MetricNames.JVM_THREAD_COUNT, 
String.valueOf(virtualMachineMetrics.threadCount()));
-        metrics.put(MetricNames.JVM_DAEMON_THREAD_COUNT, 
String.valueOf(virtualMachineMetrics.daemonThreadCount()));
-        metrics.put(MetricNames.JVM_FILE_DESCRIPTOR_USAGE, 
String.valueOf(virtualMachineMetrics.fileDescriptorUsage()));
-
-        for (Map.Entry<Thread.State,Double> entry : 
virtualMachineMetrics.threadStatePercentages().entrySet()) {
-            final int normalizedValue = (int) (100 * (entry.getValue() == null 
? 0 : entry.getValue()));
-            switch(entry.getKey()) {
-                case BLOCKED:
-                    metrics.put(MetricNames.JVM_THREAD_STATES_BLOCKED, 
String.valueOf(normalizedValue));
-                    break;
-                case RUNNABLE:
-                    metrics.put(MetricNames.JVM_THREAD_STATES_RUNNABLE, 
String.valueOf(normalizedValue));
-                    break;
-                case TERMINATED:
-                    metrics.put(MetricNames.JVM_THREAD_STATES_TERMINATED, 
String.valueOf(normalizedValue));
-                    break;
-                case TIMED_WAITING:
-                    metrics.put(MetricNames.JVM_THREAD_STATES_TIMED_WAITING, 
String.valueOf(normalizedValue));
-                    break;
-                default:
-                    break;
-            }
-        }
-
-        for (Map.Entry<String,VirtualMachineMetrics.GarbageCollectorStats> 
entry : virtualMachineMetrics.garbageCollectors().entrySet()) {
-            final String gcName = entry.getKey().replace(" ", "");
-            final long runs = entry.getValue().getRuns();
-            final long timeMS = 
entry.getValue().getTime(TimeUnit.MILLISECONDS);
-            metrics.put(MetricNames.JVM_GC_RUNS + "." + gcName, 
String.valueOf(runs));
-            metrics.put(MetricNames.JVM_GC_TIME + "." + gcName, 
String.valueOf(timeMS));
-        }
-
-        return metrics;
-    }
-
-    // calculates the total processing time of all processors in nanos
-    protected long calculateProcessingNanos(final ProcessGroupStatus status) {
-        long nanos = 0L;
-
-        for (final ProcessorStatus procStats : status.getProcessorStatus()) {
-            nanos += procStats.getProcessingNanos();
-        }
-
-        for (final ProcessGroupStatus childGroupStatus : 
status.getProcessGroupStatus()) {
-            nanos += calculateProcessingNanos(childGroupStatus);
-        }
-
-        return nanos;
-    }
-
-    // append the process group ID if necessary
-    private String appendPgId(String name, ProcessGroupStatus status, boolean 
appendPgId) {
-        if(appendPgId) {
-            return name + MetricNames.METRIC_NAME_SEPARATOR + status.getId();
-        } else {
-            return name;
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/930417b9/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/api/TestMetricsBuilder.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/api/TestMetricsBuilder.java
 
b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/api/TestMetricsBuilder.java
index cdaa453..9b96eb9 100644
--- 
a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/api/TestMetricsBuilder.java
+++ 
b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/api/TestMetricsBuilder.java
@@ -16,6 +16,8 @@
  */
 package org.apache.nifi.reporting.ambari.api;
 
+import org.apache.nifi.reporting.util.metrics.api.MetricFields;
+import org.apache.nifi.reporting.util.metrics.api.MetricsBuilder;
 import org.junit.Assert;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/930417b9/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/metrics/TestMetricsService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/metrics/TestMetricsService.java
 
b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/metrics/TestMetricsService.java
index 93224eb..ec0cf6e 100644
--- 
a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/metrics/TestMetricsService.java
+++ 
b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/metrics/TestMetricsService.java
@@ -19,6 +19,8 @@ package org.apache.nifi.reporting.ambari.metrics;
 import com.yammer.metrics.core.VirtualMachineMetrics;
 import org.apache.nifi.controller.status.ProcessGroupStatus;
 import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.reporting.util.metrics.MetricNames;
+import org.apache.nifi.reporting.util.metrics.MetricsService;
 import org.junit.Assert;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/930417b9/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/pom.xml 
b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/pom.xml
index e118271..e05b88b 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/pom.xml
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/pom.xml
@@ -40,6 +40,16 @@
             <artifactId>commons-lang3</artifactId>
             <version>3.7</version>
         </dependency>
+        <dependency>
+            <groupId>com.yammer.metrics</groupId>
+            <artifactId>metrics-core</artifactId>
+            <version>2.2.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.glassfish</groupId>
+            <artifactId>javax.json</artifactId>
+            <version>1.0.4</version>
+        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/930417b9/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/MetricNames.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/MetricNames.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/MetricNames.java
new file mode 100644
index 0000000..19bb90d
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/MetricNames.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.util.metrics;
+
+/**
+ * The Metric names to send to Ambari.
+ */
+public interface MetricNames {
+
+    // Metric Name separator
+    String METRIC_NAME_SEPARATOR = ".";
+
+    // NiFi Metrics
+    String FLOW_FILES_RECEIVED = "FlowFilesReceivedLast5Minutes";
+    String BYTES_RECEIVED = "BytesReceivedLast5Minutes";
+    String FLOW_FILES_SENT = "FlowFilesSentLast5Minutes";
+    String BYTES_SENT = "BytesSentLast5Minutes";
+    String FLOW_FILES_QUEUED = "FlowFilesQueued";
+    String BYTES_QUEUED = "BytesQueued";
+    String BYTES_READ = "BytesReadLast5Minutes";
+    String BYTES_WRITTEN = "BytesWrittenLast5Minutes";
+    String ACTIVE_THREADS = "ActiveThreads";
+    String TOTAL_TASK_DURATION_SECONDS = "TotalTaskDurationSeconds";
+    String TOTAL_TASK_DURATION_NANOS = "TotalTaskDurationNanoSeconds";
+
+    // JVM Metrics
+    String JVM_UPTIME = "jvm.uptime";
+    String JVM_HEAP_USED = "jvm.heap_used";
+    String JVM_HEAP_USAGE = "jvm.heap_usage";
+    String JVM_NON_HEAP_USAGE = "jvm.non_heap_usage";
+    String JVM_THREAD_STATES_RUNNABLE = "jvm.thread_states.runnable";
+    String JVM_THREAD_STATES_BLOCKED = "jvm.thread_states.blocked";
+    String JVM_THREAD_STATES_TIMED_WAITING = "jvm.thread_states.timed_waiting";
+    String JVM_THREAD_STATES_TERMINATED = "jvm.thread_states.terminated";
+    String JVM_THREAD_COUNT = "jvm.thread_count";
+    String JVM_DAEMON_THREAD_COUNT = "jvm.daemon_thread_count";
+    String JVM_FILE_DESCRIPTOR_USAGE = "jvm.file_descriptor_usage";
+    String JVM_GC_RUNS = "jvm.gc.runs";
+    String JVM_GC_TIME = "jvm.gc.time";
+
+    // OS Metrics
+    String LOAD1MN = "loadAverage1min";
+    String CORES = "availableCores";
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/930417b9/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/MetricsService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/MetricsService.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/MetricsService.java
new file mode 100644
index 0000000..ed3922a
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/MetricsService.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.util.metrics;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import javax.json.JsonBuilderFactory;
+import javax.json.JsonObject;
+import javax.json.JsonObjectBuilder;
+
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.reporting.util.metrics.api.MetricFields;
+
+import com.yammer.metrics.core.VirtualMachineMetrics;
+
+/**
+ * A service used to produce key/value metrics based on a given input.
+ */
+public class MetricsService {
+
+    /**
+     * Generates a Map of metrics for a ProcessGroupStatus instance.
+     *
+     * @param status a ProcessGroupStatus to get metrics from
+     * @param appendPgId if true, the process group ID will be appended at the 
end of the metric name
+     * @return a map of metrics for the given status
+     */
+    public Map<String,String> getMetrics(ProcessGroupStatus status, boolean 
appendPgId) {
+        final Map<String,String> metrics = new HashMap<>();
+
+        Map<String,Long> longMetrics = getLongMetrics(status, appendPgId);
+        for (String key : longMetrics.keySet()) {
+            metrics.put(key, String.valueOf(longMetrics.get(key)));
+        }
+
+        Map<String,Integer> integerMetrics = getIntegerMetrics(status, 
appendPgId);
+        for (String key : integerMetrics.keySet()) {
+            metrics.put(key, String.valueOf(integerMetrics.get(key)));
+        }
+
+        return metrics;
+    }
+
+    private Map<String,Integer> getIntegerMetrics(ProcessGroupStatus status, 
boolean appendPgId) {
+        final Map<String,Integer> metrics = new HashMap<>();
+        metrics.put(appendPgId(MetricNames.FLOW_FILES_RECEIVED, status, 
appendPgId), status.getFlowFilesReceived());
+        metrics.put(appendPgId(MetricNames.FLOW_FILES_SENT, status, 
appendPgId), status.getFlowFilesSent());
+        metrics.put(appendPgId(MetricNames.FLOW_FILES_QUEUED, status, 
appendPgId), status.getQueuedCount());
+        metrics.put(appendPgId(MetricNames.ACTIVE_THREADS, status, 
appendPgId), status.getActiveThreadCount());
+        return metrics;
+    }
+
+    private Map<String,Long> getLongMetrics(ProcessGroupStatus status, boolean 
appendPgId) {
+        final Map<String,Long> metrics = new HashMap<>();
+        metrics.put(appendPgId(MetricNames.BYTES_RECEIVED, status, 
appendPgId), status.getBytesReceived());
+        metrics.put(appendPgId(MetricNames.BYTES_SENT, status, appendPgId), 
status.getBytesSent());
+        metrics.put(appendPgId(MetricNames.BYTES_QUEUED, status, appendPgId), 
status.getQueuedContentSize());
+        metrics.put(appendPgId(MetricNames.BYTES_READ, status, appendPgId), 
status.getBytesRead());
+        metrics.put(appendPgId(MetricNames.BYTES_WRITTEN, status, appendPgId), 
status.getBytesWritten());
+
+        final long durationNanos = calculateProcessingNanos(status);
+        metrics.put(appendPgId(MetricNames.TOTAL_TASK_DURATION_NANOS, status, 
appendPgId), durationNanos);
+
+        final long durationSeconds = TimeUnit.SECONDS.convert(durationNanos, 
TimeUnit.NANOSECONDS);
+        metrics.put(appendPgId(MetricNames.TOTAL_TASK_DURATION_SECONDS, 
status, appendPgId), durationSeconds);
+
+        return metrics;
+    }
+
+    /**
+     * Generates a Map of metrics for VirtualMachineMetrics.
+     *
+     * @param virtualMachineMetrics a VirtualMachineMetrics instance to get 
metrics from
+     * @return a map of metrics from the given VirtualMachineStatus
+     */
+    public Map<String,String> getMetrics(VirtualMachineMetrics 
virtualMachineMetrics) {
+        final Map<String,String> metrics = new HashMap<>();
+
+        Map<String,Integer> integerMetrics = 
getIntegerMetrics(virtualMachineMetrics);
+        for (String key : integerMetrics.keySet()) {
+            metrics.put(key, String.valueOf(integerMetrics.get(key)));
+        }
+
+        Map<String,Long> longMetrics = getLongMetrics(virtualMachineMetrics);
+        for (String key : longMetrics.keySet()) {
+            metrics.put(key, String.valueOf(longMetrics.get(key)));
+        }
+
+        Map<String,Double> doubleMetrics = 
getDoubleMetrics(virtualMachineMetrics);
+        for (String key : doubleMetrics.keySet()) {
+            metrics.put(key, String.valueOf(doubleMetrics.get(key)));
+        }
+
+        return metrics;
+    }
+
+    // calculates the total processing time of all processors in nanos
+    protected long calculateProcessingNanos(final ProcessGroupStatus status) {
+        long nanos = 0L;
+
+        for (final ProcessorStatus procStats : status.getProcessorStatus()) {
+            nanos += procStats.getProcessingNanos();
+        }
+
+        for (final ProcessGroupStatus childGroupStatus : 
status.getProcessGroupStatus()) {
+            nanos += calculateProcessingNanos(childGroupStatus);
+        }
+
+        return nanos;
+    }
+
+    // append the process group ID if necessary
+    private String appendPgId(String name, ProcessGroupStatus status, boolean 
appendPgId) {
+        if(appendPgId) {
+            return name + MetricNames.METRIC_NAME_SEPARATOR + status.getId();
+        } else {
+            return name;
+        }
+    }
+
+    private Map<String,Double> getDoubleMetrics(VirtualMachineMetrics 
virtualMachineMetrics) {
+        final Map<String,Double> metrics = new HashMap<>();
+        metrics.put(MetricNames.JVM_HEAP_USED, 
virtualMachineMetrics.heapUsed());
+        metrics.put(MetricNames.JVM_HEAP_USAGE, 
virtualMachineMetrics.heapUsage());
+        metrics.put(MetricNames.JVM_NON_HEAP_USAGE, 
virtualMachineMetrics.nonHeapUsage());
+        metrics.put(MetricNames.JVM_FILE_DESCRIPTOR_USAGE, 
virtualMachineMetrics.fileDescriptorUsage());
+        return metrics;
+    }
+
+    private Map<String,Long> getLongMetrics(VirtualMachineMetrics 
virtualMachineMetrics) {
+        final Map<String,Long> metrics = new HashMap<>();
+        metrics.put(MetricNames.JVM_UPTIME, virtualMachineMetrics.uptime());
+
+        for (Map.Entry<String,VirtualMachineMetrics.GarbageCollectorStats> 
entry : virtualMachineMetrics.garbageCollectors().entrySet()) {
+            final String gcName = entry.getKey().replace(" ", "");
+            final long runs = entry.getValue().getRuns();
+            final long timeMS = 
entry.getValue().getTime(TimeUnit.MILLISECONDS);
+            metrics.put(MetricNames.JVM_GC_RUNS + "." + gcName, runs);
+            metrics.put(MetricNames.JVM_GC_TIME + "." + gcName, timeMS);
+        }
+
+        return metrics;
+    }
+
+    private Map<String,Integer> getIntegerMetrics(VirtualMachineMetrics 
virtualMachineMetrics) {
+        final Map<String,Integer> metrics = new HashMap<>();
+        metrics.put(MetricNames.JVM_DAEMON_THREAD_COUNT, 
virtualMachineMetrics.daemonThreadCount());
+        metrics.put(MetricNames.JVM_THREAD_COUNT, 
virtualMachineMetrics.threadCount());
+
+        for (Map.Entry<Thread.State,Double> entry : 
virtualMachineMetrics.threadStatePercentages().entrySet()) {
+            final int normalizedValue = (int) (100 * (entry.getValue() == null 
? 0 : entry.getValue()));
+            switch(entry.getKey()) {
+                case BLOCKED:
+                    metrics.put(MetricNames.JVM_THREAD_STATES_BLOCKED, 
normalizedValue);
+                    break;
+                case RUNNABLE:
+                    metrics.put(MetricNames.JVM_THREAD_STATES_RUNNABLE, 
normalizedValue);
+                    break;
+                case TERMINATED:
+                    metrics.put(MetricNames.JVM_THREAD_STATES_TERMINATED, 
normalizedValue);
+                    break;
+                case TIMED_WAITING:
+                    metrics.put(MetricNames.JVM_THREAD_STATES_TIMED_WAITING, 
normalizedValue);
+                    break;
+                default:
+                    break;
+            }
+        }
+
+        return metrics;
+    }
+
+    public JsonObject getMetrics(JsonBuilderFactory factory, 
ProcessGroupStatus status, VirtualMachineMetrics virtualMachineMetrics,
+            String applicationId, String id, String hostname, long 
currentTimeMillis, int availableProcessors, double systemLoad) {
+        JsonObjectBuilder objectBuilder = factory.createObjectBuilder()
+                .add(MetricFields.APP_ID, applicationId)
+                .add(MetricFields.HOSTNAME, hostname)
+                .add(MetricFields.INSTANCE_ID, status.getId())
+                .add(MetricFields.TIMESTAMP, currentTimeMillis);
+
+        objectBuilder
+        .add(MetricNames.CORES, availableProcessors)
+        .add(MetricNames.LOAD1MN, systemLoad);
+
+        Map<String,Integer> integerMetrics = 
getIntegerMetrics(virtualMachineMetrics);
+        for (String key : integerMetrics.keySet()) {
+            objectBuilder.add(key.replaceAll("\\.", ""), 
integerMetrics.get(key));
+        }
+
+        Map<String,Long> longMetrics = getLongMetrics(virtualMachineMetrics);
+        for (String key : longMetrics.keySet()) {
+            objectBuilder.add(key.replaceAll("\\.", ""), longMetrics.get(key));
+        }
+
+        Map<String,Double> doubleMetrics = 
getDoubleMetrics(virtualMachineMetrics);
+        for (String key : doubleMetrics.keySet()) {
+            objectBuilder.add(key.replaceAll("\\.", ""), 
doubleMetrics.get(key));
+        }
+
+        Map<String,Long> longPgMetrics = getLongMetrics(status, false);
+        for (String key : longPgMetrics.keySet()) {
+            objectBuilder.add(key, longPgMetrics.get(key));
+        }
+
+        Map<String,Integer> integerPgMetrics = getIntegerMetrics(status, 
false);
+        for (String key : integerPgMetrics.keySet()) {
+            objectBuilder.add(key, integerPgMetrics.get(key));
+        }
+
+        return objectBuilder.build();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/930417b9/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricBuilder.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricBuilder.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricBuilder.java
new file mode 100644
index 0000000..81fb021
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricBuilder.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.util.metrics.api;
+
+import javax.json.JsonBuilderFactory;
+import javax.json.JsonObject;
+
+/**
+ * Builds the JsonObject for an individual metric.
+ */
+public class MetricBuilder {
+
+    private final JsonBuilderFactory factory;
+
+    private String applicationId;
+    private String instanceId;
+    private String hostname;
+    private String timestamp;
+    private String metricName;
+    private String metricValue;
+
+    public MetricBuilder(final JsonBuilderFactory factory) {
+        this.factory = factory;
+    }
+
+    public MetricBuilder applicationId(final String applicationId) {
+        this.applicationId = applicationId;
+        return this;
+    }
+
+    public MetricBuilder instanceId(final String instanceId) {
+        this.instanceId = instanceId;
+        return this;
+    }
+
+    public MetricBuilder hostname(final String hostname) {
+        this.hostname = hostname;
+        return this;
+    }
+
+    public MetricBuilder timestamp(final long timestamp) {
+        this.timestamp = String.valueOf(timestamp);
+        return this;
+    }
+
+    public MetricBuilder metricName(final String metricName) {
+        this.metricName = metricName;
+        return this;
+    }
+
+    public MetricBuilder metricValue(final String metricValue) {
+        this.metricValue = metricValue;
+        return this;
+    }
+
+    public JsonObject build() {
+        return factory.createObjectBuilder()
+                .add(MetricFields.METRIC_NAME, metricName)
+                .add(MetricFields.APP_ID, applicationId)
+                .add(MetricFields.INSTANCE_ID, instanceId)
+                .add(MetricFields.HOSTNAME, hostname)
+                .add(MetricFields.TIMESTAMP, timestamp)
+                .add(MetricFields.START_TIME, timestamp)
+                .add(MetricFields.METRICS,
+                        factory.createObjectBuilder()
+                                .add(String.valueOf(timestamp), metricValue)
+                ).build();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/930417b9/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricFields.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricFields.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricFields.java
new file mode 100644
index 0000000..4c451ea
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricFields.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.util.metrics.api;
+
+public interface MetricFields {
+
+    String METRIC_NAME = "metricname";
+    String APP_ID = "appid";
+    String INSTANCE_ID = "instanceid";
+    String HOSTNAME = "hostname";
+    String TIMESTAMP = "timestamp";
+    String START_TIME = "starttime";
+    String METRICS = "metrics";
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/930417b9/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricsBuilder.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricsBuilder.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricsBuilder.java
new file mode 100644
index 0000000..3694720
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricsBuilder.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.util.metrics.api;
+
+import javax.json.JsonArrayBuilder;
+import javax.json.JsonBuilderFactory;
+import javax.json.JsonObject;
+import javax.json.JsonObjectBuilder;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Builds the overall JsonObject for the Metrics.
+ */
+public class MetricsBuilder {
+
+    static final String ROOT_JSON_ELEMENT = "metrics";
+
+    private final JsonBuilderFactory factory;
+
+    private long timestamp;
+    private String applicationId;
+    private String instanceId;
+    private String hostname;
+    private Map<String,String> metrics = new HashMap<>();
+
+    public MetricsBuilder(final JsonBuilderFactory factory) {
+        this.factory = factory;
+    }
+
+    public MetricsBuilder applicationId(final String applicationId) {
+        this.applicationId = applicationId;
+        return this;
+    }
+
+    public MetricsBuilder instanceId(final String instanceId) {
+        this.instanceId = instanceId;
+        return this;
+    }
+
+    public MetricsBuilder hostname(final String hostname) {
+        this.hostname = hostname;
+        return this;
+    }
+
+    public MetricsBuilder timestamp(final long timestamp) {
+        this.timestamp = timestamp;
+        return this;
+    }
+
+    public MetricsBuilder metric(final String name, String value) {
+        this.metrics.put(name, value);
+        return this;
+    }
+
+    public MetricsBuilder addAllMetrics(final Map<String,String> metrics) {
+        this.metrics.putAll(metrics);
+        return this;
+    }
+
+    public JsonObject build() {
+        // builds JsonObject for individual metrics
+        final MetricBuilder metricBuilder = new MetricBuilder(factory);
+        
metricBuilder.instanceId(instanceId).applicationId(applicationId).timestamp(timestamp).hostname(hostname);
+
+        final JsonArrayBuilder metricArrayBuilder = 
factory.createArrayBuilder();
+
+        for (Map.Entry<String,String> entry : metrics.entrySet()) {
+            
metricBuilder.metricName(entry.getKey()).metricValue(entry.getValue());
+            metricArrayBuilder.add(metricBuilder.build());
+        }
+
+        // add the array of metrics to a top-level json object
+        final JsonObjectBuilder metricsBuilder = factory.createObjectBuilder();
+        metricsBuilder.add(ROOT_JSON_ELEMENT, metricArrayBuilder);
+        return metricsBuilder.build();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/930417b9/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/pom.xml
 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/pom.xml
index d883529..b664e57 100644
--- 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/pom.xml
+++ 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/pom.xml
@@ -55,6 +55,24 @@
             <version>1.6.0-SNAPSHOT</version>
         </dependency>
         <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-service-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-services</artifactId>
+            <version>1.6.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-avro-record-utils</artifactId>
+            <version>1.6.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
             <groupId>org.glassfish</groupId>
             <artifactId>javax.json</artifactId>
             <version>1.0.4</version>
@@ -83,10 +101,30 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock-record-utils</artifactId>
+            <version>1.6.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
-            <version>4.12</version>
             <scope>test</scope>
         </dependency>
     </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.rat</groupId>
+                <artifactId>apache-rat-plugin</artifactId>
+                <configuration>
+                    <excludes combine.children="append">
+                        
<exclude>src/main/resources/schema-metrics.avsc</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+    
 </project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/930417b9/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java
 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java
index 28106a6..c303203 100644
--- 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java
+++ 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java
@@ -16,6 +16,18 @@
  */
 package org.apache.nifi.reporting;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import javax.json.JsonObjectBuilder;
+import javax.json.JsonValue;
+import javax.net.ssl.SSLContext;
+
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.PropertyDescriptor;
@@ -24,27 +36,39 @@ import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.Validator;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.json.JsonTreeRowRecordReader;
 import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.remote.client.SiteToSiteClient;
 import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
 import org.apache.nifi.remote.protocol.http.HttpProxy;
 import org.apache.nifi.remote.util.SiteToSiteRestApiClient;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.ssl.RestrictedSSLContextService;
 import org.apache.nifi.ssl.SSLContextService;
 import org.apache.nifi.util.StringUtils;
 
-import javax.net.ssl.SSLContext;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
 /**
  * Base class for ReportingTasks that send data over site-to-site.
  */
 public abstract class AbstractSiteToSiteReportingTask extends 
AbstractReportingTask {
+
+    protected static final String LAST_EVENT_ID_KEY = "last_event_id";
     protected static final String DESTINATION_URL_PATH = "/nifi";
+    protected static final String TIMESTAMP_FORMAT = 
"yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
+
+    private final String dateFormat = RecordFieldType.DATE.getDefaultFormat();
+    private final String timeFormat = RecordFieldType.TIME.getDefaultFormat();
+    private final String timestampFormat = 
RecordFieldType.TIMESTAMP.getDefaultFormat();
 
     static final PropertyDescriptor DESTINATION_URL = new 
PropertyDescriptor.Builder()
             .name("Destination URL")
@@ -140,8 +164,16 @@ public abstract class AbstractSiteToSiteReportingTask 
extends AbstractReportingT
             .sensitive(true)
             .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
             .build();
+    static final PropertyDescriptor RECORD_WRITER = new 
PropertyDescriptor.Builder()
+            .name("record-writer")
+            .displayName("Record Writer")
+            .description("Specifies the Controller Service to use for writing 
out the records.")
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .required(false)
+            .build();
 
     protected volatile SiteToSiteClient siteToSiteClient;
+    protected volatile RecordSchema recordSchema;
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@@ -187,7 +219,7 @@ public abstract class AbstractSiteToSiteReportingTask 
extends AbstractReportingT
         final SiteToSiteTransportProtocol mode = 
SiteToSiteTransportProtocol.valueOf(context.getProperty(TRANSPORT_PROTOCOL).getValue());
         final HttpProxy httpProxy = 
mode.equals(SiteToSiteTransportProtocol.RAW) || 
StringUtils.isEmpty(context.getProperty(HTTP_PROXY_HOSTNAME).getValue()) ? null
                 : new 
HttpProxy(context.getProperty(HTTP_PROXY_HOSTNAME).getValue(), 
context.getProperty(HTTP_PROXY_PORT).asInteger(),
-                context.getProperty(HTTP_PROXY_USERNAME).getValue(), 
context.getProperty(HTTP_PROXY_PASSWORD).getValue());
+                        context.getProperty(HTTP_PROXY_USERNAME).getValue(), 
context.getProperty(HTTP_PROXY_PASSWORD).getValue());
 
         siteToSiteClient = new SiteToSiteClient.Builder()
                 .urls(SiteToSiteRestApiClient.parseClusterUrls(destinationUrl))
@@ -214,6 +246,33 @@ public abstract class AbstractSiteToSiteReportingTask 
extends AbstractReportingT
         return this.siteToSiteClient;
     }
 
+    protected byte[] getData(final ReportingContext context, InputStream in, 
Map<String, String> attributes) {
+        try (final JsonTreeRowRecordReader reader = new 
JsonTreeRowRecordReader(in, getLogger(), recordSchema, dateFormat, timeFormat, 
timestampFormat)) {
+
+            final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+            final RecordSchema writeSchema = writerFactory.getSchema(null, 
recordSchema);
+            final ByteArrayOutputStream out = new ByteArrayOutputStream();
+
+            try (final RecordSetWriter writer = 
writerFactory.createWriter(getLogger(), writeSchema, out)) {
+                writer.beginRecordSet();
+
+                Record record;
+                while ((record = reader.nextRecord()) != null) {
+                    writer.write(record);
+                }
+
+                final WriteResult writeResult = writer.finishRecordSet();
+
+                attributes.put(CoreAttributes.MIME_TYPE.key(), 
writer.getMimeType());
+                attributes.putAll(writeResult.getAttributes());
+            }
+
+            return out.toByteArray();
+        } catch (IOException | SchemaNotFoundException | 
MalformedRecordException e) {
+            throw new ProcessException("Failed to write metrics using record 
writer: " + e.getMessage(), e);
+        }
+    }
+
     static class NiFiUrlValidator implements Validator {
         @Override
         public ValidationResult validate(final String subject, final String 
input, final ValidationContext context) {
@@ -235,4 +294,34 @@ public abstract class AbstractSiteToSiteReportingTask 
extends AbstractReportingT
             }
         }
     }
+
+    protected void addField(final JsonObjectBuilder builder, final String key, 
final Long value) {
+        if (value != null) {
+            builder.add(key, value.longValue());
+        }
+    }
+
+    protected void addField(final JsonObjectBuilder builder, final String key, 
final Integer value) {
+        if (value != null) {
+            builder.add(key, value.intValue());
+        }
+    }
+
+    protected void addField(final JsonObjectBuilder builder, final String key, 
final String value) {
+        if (value == null) {
+            return;
+        }
+
+        builder.add(key, value);
+    }
+
+    protected void addField(final JsonObjectBuilder builder, final String key, 
final String value, final boolean allowNullValues) {
+        if (value == null) {
+            if (allowNullValues) {
+                builder.add(key, JsonValue.NULL);
+            }
+        } else {
+            builder.add(key, value);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/930417b9/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java
 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java
index 03d8f3b..eddf4be 100644
--- 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java
+++ 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java
@@ -67,9 +67,6 @@ import java.util.concurrent.TimeUnit;
 @DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
 public class SiteToSiteBulletinReportingTask extends 
AbstractSiteToSiteReportingTask {
 
-    static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
-    static final String LAST_EVENT_ID_KEY = "last_event_id";
-
     static final PropertyDescriptor PLATFORM = new PropertyDescriptor.Builder()
         .name("Platform")
         .description("The value to use for the platform field in each 
provenance event.")
@@ -194,7 +191,7 @@ public class SiteToSiteBulletinReportingTask extends 
AbstractSiteToSiteReporting
         lastSentBulletinId = currMaxId;
     }
 
-    static JsonObject serialize(final JsonBuilderFactory factory, final 
JsonObjectBuilder builder, final Bulletin bulletin, final DateFormat df,
+    private JsonObject serialize(final JsonBuilderFactory factory, final 
JsonObjectBuilder builder, final Bulletin bulletin, final DateFormat df,
         final String platform, final String nodeIdentifier) {
 
         addField(builder, "objectId", UUID.randomUUID().toString());
@@ -215,17 +212,4 @@ public class SiteToSiteBulletinReportingTask extends 
AbstractSiteToSiteReporting
         return builder.build();
     }
 
-    private static void addField(final JsonObjectBuilder builder, final String 
key, final Long value) {
-        if (value != null) {
-            builder.add(key, value.longValue());
-        }
-    }
-
-    private static void addField(final JsonObjectBuilder builder, final String 
key, final String value) {
-        if (value == null) {
-            return;
-        }
-        builder.add(key, value);
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/930417b9/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java
 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java
new file mode 100644
index 0000000..0bb7501
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.reporting;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.management.ManagementFactory;
+import java.lang.management.OperatingSystemMXBean;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import javax.json.Json;
+import javax.json.JsonBuilderFactory;
+import javax.json.JsonObject;
+
+import org.apache.avro.Schema;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.reporting.util.metrics.MetricNames;
+import org.apache.nifi.reporting.util.metrics.MetricsService;
+import org.apache.nifi.reporting.util.metrics.api.MetricsBuilder;
+
+import com.yammer.metrics.core.VirtualMachineMetrics;
+
+@Tags({"status", "metrics", "site", "site to site"})
+@CapabilityDescription("Publishes same metrics as the Ambari Reporting task 
using the Site To Site protocol.")
+public class SiteToSiteMetricsReportingTask extends 
AbstractSiteToSiteReportingTask {
+
+    static final AllowableValue AMBARI_FORMAT = new 
AllowableValue("ambari-format", "Ambari Format", "Metrics will be formatted"
+            + " according to the Ambari Metrics API. See Additional Details in 
Usage documentation.");
+    static final AllowableValue RECORD_FORMAT = new 
AllowableValue("record-format", "Record Format", "Metrics will be formatted"
+            + " using the Record Writer property of this reporting task. See 
Additional Details in Usage documentation to"
+            + " have the description of the default schema.");
+
+    static final PropertyDescriptor APPLICATION_ID = new 
PropertyDescriptor.Builder()
+            .name("s2s-metrics-application-id")
+            .displayName("Application ID")
+            .description("The Application ID to be included in the metrics")
+            .required(true)
+            .expressionLanguageSupported(true)
+            .defaultValue("nifi")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
+            .name("s2s-metrics-hostname")
+            .displayName("Hostname")
+            .description("The Hostname of this NiFi instance to be included in 
the metrics")
+            .required(true)
+            .expressionLanguageSupported(true)
+            .defaultValue("${hostname(true)}")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor FORMAT = new PropertyDescriptor.Builder()
+            .name("s2s-metrics-format")
+            .displayName("Output Format")
+            .description("The output format that will be used for the metrics. 
If " + RECORD_FORMAT.getDisplayName() + " is selected, "
+                    + "a Record Writer must be provided. If " + 
AMBARI_FORMAT.getDisplayName() + " is selected, the Record Writer property "
+                    + "should be empty.")
+            .required(true)
+            .allowableValues(AMBARI_FORMAT, RECORD_FORMAT)
+            .defaultValue(AMBARI_FORMAT.getValue())
+            .addValidator(Validator.VALID)
+            .build();
+
+    private final MetricsService metricsService = new MetricsService();
+
+    public SiteToSiteMetricsReportingTask() throws IOException {
+        final InputStream schema = 
getClass().getClassLoader().getResourceAsStream("schema-metrics.avsc");
+        recordSchema = AvroTypeUtil.createSchema(new 
Schema.Parser().parse(schema));
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new 
ArrayList<>(super.getSupportedPropertyDescriptors());
+        properties.add(HOSTNAME);
+        properties.add(APPLICATION_ID);
+        properties.add(FORMAT);
+        properties.add(RECORD_WRITER);
+        properties.remove(BATCH_SIZE);
+        return properties;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext 
validationContext) {
+        final List<ValidationResult> problems = new 
ArrayList<>(super.customValidate(validationContext));
+
+        final boolean isWriterSet = 
validationContext.getProperty(RECORD_WRITER).isSet();
+        if 
(validationContext.getProperty(FORMAT).getValue().equals(RECORD_FORMAT.getValue())
 && !isWriterSet) {
+            problems.add(new ValidationResult.Builder()
+                    .input("Record Writer")
+                    .valid(false)
+                    .explanation("If using " + RECORD_FORMAT.getDisplayName() 
+ ", a record writer needs to be set.")
+                    .build());
+        }
+        if 
(validationContext.getProperty(FORMAT).getValue().equals(AMBARI_FORMAT.getValue())
 && isWriterSet) {
+            problems.add(new ValidationResult.Builder()
+                    .input("Record Writer")
+                    .valid(false)
+                    .explanation("If using " + AMBARI_FORMAT.getDisplayName() 
+ ", no record writer should be set.")
+                    .build());
+        }
+
+        return problems;
+    }
+
+    @Override
+    public void onTrigger(final ReportingContext context) {
+        final boolean isClustered = context.isClustered();
+        final String nodeId = context.getClusterNodeIdentifier();
+        if (nodeId == null && isClustered) {
+            getLogger().debug("This instance of NiFi is configured for 
clustering, but the Cluster Node Identifier is not yet available. "
+                    + "Will wait for Node Identifier to be established.");
+            return;
+        }
+
+        final VirtualMachineMetrics virtualMachineMetrics = 
VirtualMachineMetrics.getInstance();
+        final Map<String, ?> config = Collections.emptyMap();
+        final JsonBuilderFactory factory = Json.createBuilderFactory(config);
+
+        final String applicationId = 
context.getProperty(APPLICATION_ID).evaluateAttributeExpressions().getValue();
+        final String hostname = 
context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
+        final ProcessGroupStatus status = 
context.getEventAccess().getControllerStatus();
+
+        if(status != null) {
+            final Map<String,String> statusMetrics = 
metricsService.getMetrics(status, false);
+            final Map<String,String> jvmMetrics = 
metricsService.getMetrics(virtualMachineMetrics);
+
+            final MetricsBuilder metricsBuilder = new MetricsBuilder(factory);
+            final OperatingSystemMXBean os = 
ManagementFactory.getOperatingSystemMXBean();
+            final double systemLoad = os.getSystemLoadAverage();
+
+            byte[] data;
+            final Map<String, String> attributes = new HashMap<>();
+
+            
if(context.getProperty(FORMAT).getValue().equals(AMBARI_FORMAT.getValue())) {
+                final JsonObject metricsObject = metricsBuilder
+                        .applicationId(applicationId)
+                        .instanceId(status.getId())
+                        .hostname(hostname)
+                        .timestamp(System.currentTimeMillis())
+                        .addAllMetrics(statusMetrics)
+                        .addAllMetrics(jvmMetrics)
+                        .metric(MetricNames.CORES, 
String.valueOf(os.getAvailableProcessors()))
+                        .metric(MetricNames.LOAD1MN, String.valueOf(systemLoad 
>= 0 ? systemLoad : -1))
+                        .build();
+
+                data = 
metricsObject.toString().getBytes(StandardCharsets.UTF_8);
+                attributes.put(CoreAttributes.MIME_TYPE.key(), 
"application/json");
+            } else {
+                final JsonObject metricsObject = 
metricsService.getMetrics(factory, status, virtualMachineMetrics, 
applicationId, status.getId(),
+                        hostname, System.currentTimeMillis(), 
os.getAvailableProcessors(), systemLoad >= 0 ? systemLoad : -1);
+                data = getData(context, new 
ByteArrayInputStream(metricsObject.toString().getBytes(StandardCharsets.UTF_8)),
 attributes);
+            }
+
+            try {
+                long start = System.nanoTime();
+                final Transaction transaction = 
getClient().createTransaction(TransferDirection.SEND);
+                if (transaction == null) {
+                    getLogger().debug("All destination nodes are penalized; 
will attempt to send data later");
+                    return;
+                }
+
+                final String transactionId = UUID.randomUUID().toString();
+                attributes.put("reporting.task.transaction.id", transactionId);
+                attributes.put("reporting.task.name", getName());
+                attributes.put("reporting.task.uuid", getIdentifier());
+                attributes.put("reporting.task.type", 
this.getClass().getSimpleName());
+
+                transaction.send(data, attributes);
+                transaction.confirm();
+                transaction.complete();
+
+                final long transferMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+                getLogger().info("Successfully sent metrics to destination in 
{}ms; Transaction ID = {}", new Object[]{transferMillis, transactionId});
+            } catch (final Exception e) {
+                throw new ProcessException("Failed to send metrics to 
destination due to:" + e.getMessage(), e);
+            }
+
+        } else {
+            getLogger().error("No process group status to retrieve metrics");
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/930417b9/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
index f7a59db..fe407eb 100644
--- 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
+++ 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
@@ -45,7 +45,6 @@ import javax.json.JsonArrayBuilder;
 import javax.json.JsonBuilderFactory;
 import javax.json.JsonObject;
 import javax.json.JsonObjectBuilder;
-import javax.json.JsonValue;
 import java.io.IOException;
 import java.net.MalformedURLException;
 import java.net.URL;
@@ -75,9 +74,6 @@ import java.util.concurrent.TimeUnit;
 )
 public class SiteToSiteProvenanceReportingTask extends 
AbstractSiteToSiteReportingTask {
 
-    static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
-    static final String LAST_EVENT_ID_KEY = "last_event_id";
-
     static final AllowableValue BEGINNING_OF_STREAM = new 
AllowableValue("beginning-of-stream", "Beginning of Stream",
         "Start reading provenance Events from the beginning of the stream (the 
oldest event first)");
     static final AllowableValue END_OF_STREAM = new 
AllowableValue("end-of-stream", "End of Stream",
@@ -306,7 +302,7 @@ public class SiteToSiteProvenanceReportingTask extends 
AbstractSiteToSiteReporti
     }
 
 
-    static JsonObject serialize(final JsonBuilderFactory factory, final 
JsonObjectBuilder builder, final ProvenanceEventRecord event, final DateFormat 
df,
+    private JsonObject serialize(final JsonBuilderFactory factory, final 
JsonObjectBuilder builder, final ProvenanceEventRecord event, final DateFormat 
df,
                                 final String componentName, final String 
processGroupId, final String processGroupName, final String hostname, final URL 
nifiUrl, final String applicationName,
                                 final String platform, final String 
nodeIdentifier) {
         addField(builder, "eventId", UUID.randomUUID().toString());
@@ -370,13 +366,7 @@ public class SiteToSiteProvenanceReportingTask extends 
AbstractSiteToSiteReporti
         builder.add(key, mapBuilder);
     }
 
-    private static void addField(final JsonObjectBuilder builder, final String 
key, final Long value) {
-        if (value != null) {
-            builder.add(key, value.longValue());
-        }
-    }
-
-    private static void addField(final JsonObjectBuilder builder, final 
JsonBuilderFactory factory, final String key, final Collection<String> values) {
+    private void addField(final JsonObjectBuilder builder, final 
JsonBuilderFactory factory, final String key, final Collection<String> values) {
         if (values == null) {
             return;
         }
@@ -384,20 +374,6 @@ public class SiteToSiteProvenanceReportingTask extends 
AbstractSiteToSiteReporti
         builder.add(key, createJsonArray(factory, values));
     }
 
-    private static void addField(final JsonObjectBuilder builder, final String 
key, final String value) {
-        addField(builder, key, value, false);
-    }
-
-    private static void addField(final JsonObjectBuilder builder, final String 
key, final String value, final boolean allowNullValues) {
-        if (value == null) {
-            if (allowNullValues) {
-                builder.add(key, JsonValue.NULL);
-            }
-        } else {
-            builder.add(key, value);
-        }
-    }
-
     private static JsonArrayBuilder createJsonArray(JsonBuilderFactory 
factory, final Collection<String> values) {
         final JsonArrayBuilder builder = factory.createArrayBuilder();
         for (final String value : values) {

Reply via email to