Repository: nifi
Updated Branches:
  refs/heads/master ce0855e98 -> 6fbe1515e


http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java
 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java
new file mode 100644
index 0000000..20416e1
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.reporting;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.management.ManagementFactory;
+import java.lang.management.OperatingSystemMXBean;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import javax.json.Json;
+import javax.json.JsonBuilderFactory;
+import javax.json.JsonObject;
+
+import org.apache.avro.Schema;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.reporting.util.metrics.MetricNames;
+import org.apache.nifi.reporting.util.metrics.MetricsService;
+import org.apache.nifi.reporting.util.metrics.api.MetricsBuilder;
+
+import com.yammer.metrics.core.VirtualMachineMetrics;
+
+@Tags({"status", "metrics", "site", "site to site"})
+@CapabilityDescription("Publishes same metrics as the Ambari Reporting task 
using the Site To Site protocol.")
+public class SiteToSiteMetricsReportingTask extends 
AbstractSiteToSiteReportingTask {
+
+    static final AllowableValue AMBARI_FORMAT = new 
AllowableValue("ambari-format", "Ambari Format", "Metrics will be formatted"
+            + " according to the Ambari Metrics API. See Additional Details in 
Usage documentation.");
+    static final AllowableValue RECORD_FORMAT = new 
AllowableValue("record-format", "Record Format", "Metrics will be formatted"
+            + " using the Record Writer property of this reporting task. See 
Additional Details in Usage documentation to"
+            + " have the description of the default schema.");
+
+    static final PropertyDescriptor APPLICATION_ID = new 
PropertyDescriptor.Builder()
+            .name("s2s-metrics-application-id")
+            .displayName("Application ID")
+            .description("The Application ID to be included in the metrics")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .defaultValue("nifi")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
+            .name("s2s-metrics-hostname")
+            .displayName("Hostname")
+            .description("The Hostname of this NiFi instance to be included in 
the metrics")
+            .required(true)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .defaultValue("${hostname(true)}")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor FORMAT = new PropertyDescriptor.Builder()
+            .name("s2s-metrics-format")
+            .displayName("Output Format")
+            .description("The output format that will be used for the metrics. 
If " + RECORD_FORMAT.getDisplayName() + " is selected, "
+                    + "a Record Writer must be provided. If " + 
AMBARI_FORMAT.getDisplayName() + " is selected, the Record Writer property "
+                    + "should be empty.")
+            .required(true)
+            .allowableValues(AMBARI_FORMAT, RECORD_FORMAT)
+            .defaultValue(AMBARI_FORMAT.getValue())
+            .addValidator(Validator.VALID)
+            .build();
+
+    private final MetricsService metricsService = new MetricsService();
+
+    public SiteToSiteMetricsReportingTask() throws IOException {
+        final InputStream schema = 
getClass().getClassLoader().getResourceAsStream("schema-metrics.avsc");
+        recordSchema = AvroTypeUtil.createSchema(new 
Schema.Parser().parse(schema));
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new 
ArrayList<>(super.getSupportedPropertyDescriptors());
+        properties.add(HOSTNAME);
+        properties.add(APPLICATION_ID);
+        properties.add(FORMAT);
+        properties.add(RECORD_WRITER);
+        properties.remove(BATCH_SIZE);
+        return properties;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext 
validationContext) {
+        final List<ValidationResult> problems = new 
ArrayList<>(super.customValidate(validationContext));
+
+        final boolean isWriterSet = 
validationContext.getProperty(RECORD_WRITER).isSet();
+        if 
(validationContext.getProperty(FORMAT).getValue().equals(RECORD_FORMAT.getValue())
 && !isWriterSet) {
+            problems.add(new ValidationResult.Builder()
+                    .input("Record Writer")
+                    .valid(false)
+                    .explanation("If using " + RECORD_FORMAT.getDisplayName() 
+ ", a record writer needs to be set.")
+                    .build());
+        }
+        if 
(validationContext.getProperty(FORMAT).getValue().equals(AMBARI_FORMAT.getValue())
 && isWriterSet) {
+            problems.add(new ValidationResult.Builder()
+                    .input("Record Writer")
+                    .valid(false)
+                    .explanation("If using " + AMBARI_FORMAT.getDisplayName() 
+ ", no record writer should be set.")
+                    .build());
+        }
+
+        return problems;
+    }
+
+    @Override
+    public void onTrigger(final ReportingContext context) {
+        final boolean isClustered = context.isClustered();
+        final String nodeId = context.getClusterNodeIdentifier();
+        if (nodeId == null && isClustered) {
+            getLogger().debug("This instance of NiFi is configured for 
clustering, but the Cluster Node Identifier is not yet available. "
+                    + "Will wait for Node Identifier to be established.");
+            return;
+        }
+
+        final VirtualMachineMetrics virtualMachineMetrics = 
VirtualMachineMetrics.getInstance();
+        final Map<String, ?> config = Collections.emptyMap();
+        final JsonBuilderFactory factory = Json.createBuilderFactory(config);
+
+        final String applicationId = 
context.getProperty(APPLICATION_ID).evaluateAttributeExpressions().getValue();
+        final String hostname = 
context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
+        final ProcessGroupStatus status = 
context.getEventAccess().getControllerStatus();
+
+        if(status != null) {
+            final Map<String,String> statusMetrics = 
metricsService.getMetrics(status, false);
+            final Map<String,String> jvmMetrics = 
metricsService.getMetrics(virtualMachineMetrics);
+
+            final MetricsBuilder metricsBuilder = new MetricsBuilder(factory);
+            final OperatingSystemMXBean os = 
ManagementFactory.getOperatingSystemMXBean();
+            final double systemLoad = os.getSystemLoadAverage();
+
+            byte[] data;
+            final Map<String, String> attributes = new HashMap<>();
+
+            
if(context.getProperty(FORMAT).getValue().equals(AMBARI_FORMAT.getValue())) {
+                final JsonObject metricsObject = metricsBuilder
+                        .applicationId(applicationId)
+                        .instanceId(status.getId())
+                        .hostname(hostname)
+                        .timestamp(System.currentTimeMillis())
+                        .addAllMetrics(statusMetrics)
+                        .addAllMetrics(jvmMetrics)
+                        .metric(MetricNames.CORES, 
String.valueOf(os.getAvailableProcessors()))
+                        .metric(MetricNames.LOAD1MN, String.valueOf(systemLoad 
>= 0 ? systemLoad : -1))
+                        .build();
+
+                data = 
metricsObject.toString().getBytes(StandardCharsets.UTF_8);
+                attributes.put(CoreAttributes.MIME_TYPE.key(), 
"application/json");
+            } else {
+                final JsonObject metricsObject = 
metricsService.getMetrics(factory, status, virtualMachineMetrics, 
applicationId, status.getId(),
+                        hostname, System.currentTimeMillis(), 
os.getAvailableProcessors(), systemLoad >= 0 ? systemLoad : -1);
+                data = getData(context, new 
ByteArrayInputStream(metricsObject.toString().getBytes(StandardCharsets.UTF_8)),
 attributes);
+            }
+
+            try {
+                long start = System.nanoTime();
+                final Transaction transaction = 
getClient().createTransaction(TransferDirection.SEND);
+                if (transaction == null) {
+                    getLogger().debug("All destination nodes are penalized; 
will attempt to send data later");
+                    return;
+                }
+
+                final String transactionId = UUID.randomUUID().toString();
+                attributes.put("reporting.task.transaction.id", transactionId);
+                attributes.put("reporting.task.name", getName());
+                attributes.put("reporting.task.uuid", getIdentifier());
+                attributes.put("reporting.task.type", 
this.getClass().getSimpleName());
+
+                transaction.send(data, attributes);
+                transaction.confirm();
+                transaction.complete();
+
+                final long transferMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+                getLogger().info("Successfully sent metrics to destination in 
{}ms; Transaction ID = {}", new Object[]{transferMillis, transactionId});
+            } catch (final Exception e) {
+                throw new ProcessException("Failed to send metrics to 
destination due to:" + e.getMessage(), e);
+            }
+
+        } else {
+            getLogger().error("No process group status to retrieve metrics");
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
index 6db30b8..ec1414d 100644
--- 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
+++ 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
@@ -46,7 +46,6 @@ import javax.json.JsonArrayBuilder;
 import javax.json.JsonBuilderFactory;
 import javax.json.JsonObject;
 import javax.json.JsonObjectBuilder;
-import javax.json.JsonValue;
 import java.io.IOException;
 import java.net.MalformedURLException;
 import java.net.URL;
@@ -76,9 +75,6 @@ import java.util.concurrent.TimeUnit;
 )
 public class SiteToSiteProvenanceReportingTask extends 
AbstractSiteToSiteReportingTask {
 
-    static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
-    static final String LAST_EVENT_ID_KEY = "last_event_id";
-
     static final AllowableValue BEGINNING_OF_STREAM = new 
AllowableValue("beginning-of-stream", "Beginning of Stream",
         "Start reading provenance Events from the beginning of the stream (the 
oldest event first)");
     static final AllowableValue END_OF_STREAM = new 
AllowableValue("end-of-stream", "End of Stream",
@@ -307,7 +303,7 @@ public class SiteToSiteProvenanceReportingTask extends 
AbstractSiteToSiteReporti
     }
 
 
-    static JsonObject serialize(final JsonBuilderFactory factory, final 
JsonObjectBuilder builder, final ProvenanceEventRecord event, final DateFormat 
df,
+    private JsonObject serialize(final JsonBuilderFactory factory, final 
JsonObjectBuilder builder, final ProvenanceEventRecord event, final DateFormat 
df,
                                 final String componentName, final String 
processGroupId, final String processGroupName, final String hostname, final URL 
nifiUrl, final String applicationName,
                                 final String platform, final String 
nodeIdentifier) {
         addField(builder, "eventId", UUID.randomUUID().toString());
@@ -371,13 +367,7 @@ public class SiteToSiteProvenanceReportingTask extends 
AbstractSiteToSiteReporti
         builder.add(key, mapBuilder);
     }
 
-    private static void addField(final JsonObjectBuilder builder, final String 
key, final Long value) {
-        if (value != null) {
-            builder.add(key, value.longValue());
-        }
-    }
-
-    private static void addField(final JsonObjectBuilder builder, final 
JsonBuilderFactory factory, final String key, final Collection<String> values) {
+    private void addField(final JsonObjectBuilder builder, final 
JsonBuilderFactory factory, final String key, final Collection<String> values) {
         if (values == null) {
             return;
         }
@@ -385,20 +375,6 @@ public class SiteToSiteProvenanceReportingTask extends 
AbstractSiteToSiteReporti
         builder.add(key, createJsonArray(factory, values));
     }
 
-    private static void addField(final JsonObjectBuilder builder, final String 
key, final String value) {
-        addField(builder, key, value, false);
-    }
-
-    private static void addField(final JsonObjectBuilder builder, final String 
key, final String value, final boolean allowNullValues) {
-        if (value == null) {
-            if (allowNullValues) {
-                builder.add(key, JsonValue.NULL);
-            }
-        } else {
-            builder.add(key, value);
-        }
-    }
-
     private static JsonArrayBuilder createJsonArray(JsonBuilderFactory 
factory, final Collection<String> values) {
         final JsonArrayBuilder builder = factory.createArrayBuilder();
         for (final String value : values) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java
 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java
index 75e9811..46b465d 100644
--- 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java
+++ 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java
@@ -61,8 +61,6 @@ import org.apache.nifi.remote.TransferDirection;
         + "However, all process groups are recursively searched for matching 
components, regardless of whether the process group matches the component 
filters.")
 public class SiteToSiteStatusReportingTask extends 
AbstractSiteToSiteReportingTask {
 
-    static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
-
     static final PropertyDescriptor PLATFORM = new PropertyDescriptor.Builder()
         .name("Platform")
         .description("The value to use for the platform field in each status 
record.")
@@ -71,6 +69,7 @@ public class SiteToSiteStatusReportingTask extends 
AbstractSiteToSiteReportingTa
         .defaultValue("nifi")
         .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
         .build();
+
     static final PropertyDescriptor COMPONENT_TYPE_FILTER_REGEX = new 
PropertyDescriptor.Builder()
         .name("Component Type Filter Regex")
         .description("A regex specifying which component types to report.  Any 
component type matching this regex will be included.  "
@@ -80,6 +79,7 @@ public class SiteToSiteStatusReportingTask extends 
AbstractSiteToSiteReportingTa
         
.defaultValue("(Processor|ProcessGroup|RemoteProcessGroup|RootProcessGroup|Connection|InputPort|OutputPort)")
         .addValidator(StandardValidators.createRegexValidator(0, 
Integer.MAX_VALUE, true))
         .build();
+
     static final PropertyDescriptor COMPONENT_NAME_FILTER_REGEX = new 
PropertyDescriptor.Builder()
         .name("Component Name Filter Regex")
         .description("A regex specifying which component names to report.  Any 
component name matching this regex will be included.")
@@ -198,7 +198,7 @@ public class SiteToSiteStatusReportingTask extends 
AbstractSiteToSiteReportingTa
      *            The component name
      * @return Whether the component matches both filters
      */
-    boolean componentMatchesFilters(final String componentType, final String 
componentName) {
+    private boolean componentMatchesFilters(final String componentType, final 
String componentName) {
         return componentTypeFilter.matcher(componentType).matches()
                 && componentNameFilter.matcher(componentName).matches();
     }
@@ -222,7 +222,7 @@ public class SiteToSiteStatusReportingTask extends 
AbstractSiteToSiteReportingTa
      * @param parentId
      *            The parent's component id
      */
-    void serializeProcessGroupStatus(final JsonArrayBuilder arrayBuilder, 
final JsonBuilderFactory factory,
+    private void serializeProcessGroupStatus(final JsonArrayBuilder 
arrayBuilder, final JsonBuilderFactory factory,
             final ProcessGroupStatus status, final DateFormat df,
         final String hostname, final String applicationName, final String 
platform, final String parentId, final Date currentDate) {
         final JsonObjectBuilder builder = factory.createObjectBuilder();
@@ -279,7 +279,7 @@ public class SiteToSiteStatusReportingTask extends 
AbstractSiteToSiteReportingTa
         }
     }
 
-    void serializeRemoteProcessGroupStatus(final JsonArrayBuilder 
arrayBuilder, final JsonBuilderFactory factory,
+    private void serializeRemoteProcessGroupStatus(final JsonArrayBuilder 
arrayBuilder, final JsonBuilderFactory factory,
             final RemoteProcessGroupStatus status, final DateFormat df, final 
String hostname, final String applicationName,
             final String platform, final String parentId, final Date 
currentDate) {
         final JsonObjectBuilder builder = factory.createObjectBuilder();
@@ -304,7 +304,7 @@ public class SiteToSiteStatusReportingTask extends 
AbstractSiteToSiteReportingTa
         }
     }
 
-    void serializePortStatus(final String componentType, final 
JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory, final 
PortStatus status,
+    private void serializePortStatus(final String componentType, final 
JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory, final 
PortStatus status,
             final DateFormat df, final String hostname, final String 
applicationName, final String platform, final String parentId, final Date 
currentDate) {
         final JsonObjectBuilder builder = factory.createObjectBuilder();
         final String componentName = status.getName();
@@ -328,7 +328,7 @@ public class SiteToSiteStatusReportingTask extends 
AbstractSiteToSiteReportingTa
         }
     }
 
-    void serializeConnectionStatus(final JsonArrayBuilder arrayBuilder, final 
JsonBuilderFactory factory, final ConnectionStatus status, final DateFormat df,
+    private void serializeConnectionStatus(final JsonArrayBuilder 
arrayBuilder, final JsonBuilderFactory factory, final ConnectionStatus status, 
final DateFormat df,
             final String hostname, final String applicationName, final String 
platform, final String parentId, final Date currentDate) {
         final JsonObjectBuilder builder = factory.createObjectBuilder();
         final String componentType = "Connection";
@@ -356,7 +356,7 @@ public class SiteToSiteStatusReportingTask extends 
AbstractSiteToSiteReportingTa
         }
     }
 
-    void serializeProcessorStatus(final JsonArrayBuilder arrayBuilder, final 
JsonBuilderFactory factory, final ProcessorStatus status, final DateFormat df,
+    private void serializeProcessorStatus(final JsonArrayBuilder arrayBuilder, 
final JsonBuilderFactory factory, final ProcessorStatus status, final 
DateFormat df,
             final String hostname, final String applicationName, final String 
platform, final String parentId, final Date currentDate) {
         final JsonObjectBuilder builder = factory.createObjectBuilder();
         final String componentType = "Processor";
@@ -387,7 +387,7 @@ public class SiteToSiteStatusReportingTask extends 
AbstractSiteToSiteReportingTa
         }
     }
 
-    private static void addCommonFields(final JsonObjectBuilder builder, final 
DateFormat df, final String hostname,
+    private void addCommonFields(final JsonObjectBuilder builder, final 
DateFormat df, final String hostname,
             final String applicationName, final String platform, final String 
parentId, final Date currentDate,
             final String componentType, final String componentName) {
         addField(builder, "statusId", UUID.randomUUID().toString());
@@ -401,23 +401,4 @@ public class SiteToSiteStatusReportingTask extends 
AbstractSiteToSiteReportingTa
         addField(builder, "application", applicationName);
     }
 
-    private static void addField(final JsonObjectBuilder builder, final String 
key, final Long value) {
-        if (value != null) {
-            builder.add(key, value.longValue());
-        }
-    }
-
-    private static void addField(final JsonObjectBuilder builder, final String 
key, final Integer value) {
-        if (value != null) {
-            builder.add(key, value.intValue());
-        }
-    }
-
-    private static void addField(final JsonObjectBuilder builder, final String 
key, final String value) {
-        if (value == null) {
-            return;
-        }
-
-        builder.add(key, value);
-    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask
 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask
index 0aced94..652b581 100644
--- 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask
+++ 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask
@@ -15,4 +15,5 @@
 
 org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask
 org.apache.nifi.reporting.SiteToSiteBulletinReportingTask
-org.apache.nifi.reporting.SiteToSiteStatusReportingTask
\ No newline at end of file
+org.apache.nifi.reporting.SiteToSiteStatusReportingTask
+org.apache.nifi.reporting.SiteToSiteMetricsReportingTask
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteMetricsReportingTask/additionalDetails.html
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteMetricsReportingTask/additionalDetails.html
 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteMetricsReportingTask/additionalDetails.html
new file mode 100644
index 0000000..8120d6a
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteMetricsReportingTask/additionalDetails.html
@@ -0,0 +1,178 @@
+<!DOCTYPE html>
+<html lang="en">
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+    <head>
+        <meta charset="utf-8" />
+        <title>SiteToSiteMetricsReportingTask</title>
+
+        <link rel="stylesheet" href="../../../../../css/component-usage.css" 
type="text/css" />
+    </head>
+
+    <body>
+       <p>
+               The Site-to-Site Metrics Reporting Task allows the user to 
publish NiFi's metrics (as in the Ambari reporting task) to the 
+               same NiFi instance or another NiFi instance. This provides a 
great deal of power because it allows the user to make use of
+               all of the different Processors that are available in NiFi in 
order to process or distribute that data.
+       </p>
+       
+       <h2>Ambari format</h2>
+       
+       <p>
+               There are two available output formats. The first one is the 
Ambari format as defined in the Ambari Metrics Collector 
+               API which is a JSON with dynamic keys. If using this format you 
might be interested by the below Jolt specification to 
+               transform the data.
+       </p>
+       
+               <pre>
+                       <code>
+                       [
+                         {
+                           "operation": "shift",
+                           "spec": {
+                             "metrics": {
+                               "*": {
+                                 "metrics": {
+                                   "*": {
+                                     "$": "metrics.[#4].metrics.time",
+                                     "@": "metrics.[#4].metrics.value"
+                                   }
+                                 },
+                                 "*": "metrics.[&1].&"
+                               }
+                             }
+                           }
+                         }
+                       ]
+                       </code>
+               </pre>
+               
+               <p>
+               This would transform the below sample:
+       </p>
+       
+               <pre>
+                       <code>
+                       {
+                               "metrics": [{
+                                       "metricname": 
"jvm.gc.time.G1OldGeneration",
+                                       "appid": "nifi",
+                                       "instanceid": 
"8927f4c0-0160-1000-597a-ea764ccd81a7",
+                                       "hostname": "localhost",
+                                       "timestamp": "1520456854361",
+                                       "starttime": "1520456854361",
+                                       "metrics": {
+                                               "1520456854361": "0"
+                                       }
+                               }, {
+                                       "metricname": 
"jvm.thread_states.terminated",
+                                       "appid": "nifi",
+                                       "instanceid": 
"8927f4c0-0160-1000-597a-ea764ccd81a7",
+                                       "hostname": "localhost",
+                                       "timestamp": "1520456854361",
+                                       "starttime": "1520456854361",
+                                       "metrics": {
+                                               "1520456854361": "0"
+                                       }
+                               }]
+                       }
+                       </code>
+               </pre>
+
+               <p>
+               into:
+       </p>
+       
+               <pre>
+                       <code>
+                       {
+                               "metrics": [{
+                                       "metricname": 
"jvm.gc.time.G1OldGeneration",
+                                       "appid": "nifi",
+                                       "instanceid": 
"8927f4c0-0160-1000-597a-ea764ccd81a7",
+                                       "hostname": "localhost",
+                                       "timestamp": "1520456854361",
+                                       "starttime": "1520456854361",
+                                       "metrics": {
+                                               "time": "1520456854361",
+                                               "value": "0"
+                                       }
+                               }, {
+                                       "metricname": 
"jvm.thread_states.terminated",
+                                       "appid": "nifi",
+                                       "instanceid": 
"8927f4c0-0160-1000-597a-ea764ccd81a7",
+                                       "hostname": "localhost",
+                                       "timestamp": "1520456854361",
+                                       "starttime": "1520456854361",
+                                       "metrics": {
+                                               "time": "1520456854361",
+                                               "value": "0"
+                                       }
+                               }]
+                       }
+                       </code>
+               </pre>
+       
+       <h2>Record format</h2>
+       
+       <p>
+               The second format is leveraging the record framework of NiFi so 
that the user can define a Record Writer and directly 
+               specify the output format and data with the assumption that the 
input schema is the following:
+       </p>
+
+               <pre>
+                       <code>
+                       {
+                         "type" : "record",
+                         "name" : "metrics",
+                         "namespace" : "metrics",
+                         "fields" : [ 
+                               { "name" : "appid", "type" : "string" },
+                               { "name" : "instanceid", "type" : "string" },
+                               { "name" : "hostname", "type" : "string" },
+                               { "name" : "timestamp", "type" : "long" },
+                               { "name" : "loadAverage1min", "type" : "double" 
},
+                               { "name" : "availableCores", "type" : "int" },
+                               { "name" : "FlowFilesReceivedLast5Minutes", 
"type" : "int" },
+                               { "name" : "BytesReceivedLast5Minutes", "type" 
: "long" },
+                               { "name" : "FlowFilesSentLast5Minutes", "type" 
: "int" },
+                               { "name" : "BytesSentLast5Minutes", "type" : 
"long" },
+                               { "name" : "FlowFilesQueued", "type" : "int" },
+                               { "name" : "BytesQueued", "type" : "long" },
+                               { "name" : "BytesReadLast5Minutes", "type" : 
"long" },
+                               { "name" : "BytesWrittenLast5Minutes", "type" : 
"long" },
+                               { "name" : "ActiveThreads", "type" : "int" },
+                               { "name" : "TotalTaskDurationSeconds", "type" : 
"long" },
+                               { "name" : "TotalTaskDurationNanoSeconds", 
"type" : "long" },
+                               { "name" : "jvmuptime", "type" : "long" },
+                               { "name" : "jvmheap_used", "type" : "double" },
+                               { "name" : "jvmheap_usage", "type" : "double" },
+                               { "name" : "jvmnon_heap_usage", "type" : 
"double" },
+                               { "name" : "jvmthread_statesrunnable", "type" : 
["int", "null"] },
+                               { "name" : "jvmthread_statesblocked", "type" : 
["int", "null"] },
+                               { "name" : "jvmthread_statestimed_waiting", 
"type" : ["int", "null"] },
+                               { "name" : "jvmthread_statesterminated", "type" 
: ["int", "null"] },
+                               { "name" : "jvmthread_count", "type" : "int" },
+                               { "name" : "jvmdaemon_thread_count", "type" : 
"int" },
+                               { "name" : "jvmfile_descriptor_usage", "type" : 
"double" },
+                               { "name" : "jvmgcruns", "type" : ["long", 
"null"] },
+                               { "name" : "jvmgctime", "type" : ["long", 
"null"] }
+                         ]
+                       }
+                       </code>
+               </pre>
+
+       </body>
+</html>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask/additionalDetails.html
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask/additionalDetails.html
 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask/additionalDetails.html
index e1841b2..86736a6 100644
--- 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask/additionalDetails.html
+++ 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask/additionalDetails.html
@@ -25,7 +25,7 @@
        <p>
                The Site-to-Site Provenance Reporting Task allows the user to 
publish all of the Provenance Events from a NiFi instance back to
                the same NiFi instance or another NiFi instance. This provides 
a great deal of power because it allows the user to make use of
-               all of the different Processors that are available in NiFi in 
order to processor or distribute that data. When possible, it is
+               all of the different Processors that are available in NiFi in 
order to process or distribute that data. When possible, it is
                advisable to send the Provenance data to a different NiFi 
instance than the one that this Reporting Task is running on, because
                when the data is received over Site-to-Site and processed, that 
in and of itself will generate Provenance events. As a result, there
                is a cycle that is created. However, the data is sent in 
batches (1,000 by default). This means that for each batch of Provenance events

http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/schema-metrics.avsc
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/schema-metrics.avsc
 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/schema-metrics.avsc
new file mode 100644
index 0000000..90dea10
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/schema-metrics.avsc
@@ -0,0 +1,37 @@
+{
+  "type" : "record",
+  "name" : "metrics",
+  "namespace" : "metrics",
+  "fields" : [ 
+       { "name" : "appid", "type" : "string" },
+       { "name" : "instanceid", "type" : "string" },
+       { "name" : "hostname", "type" : "string" },
+       { "name" : "timestamp", "type" : "long" },
+       { "name" : "loadAverage1min", "type" : "double" },
+       { "name" : "availableCores", "type" : "int" },
+       { "name" : "FlowFilesReceivedLast5Minutes", "type" : "int" },
+       { "name" : "BytesReceivedLast5Minutes", "type" : "long" },
+       { "name" : "FlowFilesSentLast5Minutes", "type" : "int" },
+       { "name" : "BytesSentLast5Minutes", "type" : "long" },
+       { "name" : "FlowFilesQueued", "type" : "int" },
+       { "name" : "BytesQueued", "type" : "long" },
+       { "name" : "BytesReadLast5Minutes", "type" : "long" },
+       { "name" : "BytesWrittenLast5Minutes", "type" : "long" },
+       { "name" : "ActiveThreads", "type" : "int" },
+       { "name" : "TotalTaskDurationSeconds", "type" : "long" },
+       { "name" : "TotalTaskDurationNanoSeconds", "type" : "long" },
+       { "name" : "jvmuptime", "type" : "long" },
+       { "name" : "jvmheap_used", "type" : "double" },
+       { "name" : "jvmheap_usage", "type" : "double" },
+       { "name" : "jvmnon_heap_usage", "type" : "double" },
+       { "name" : "jvmthread_statesrunnable", "type" : ["int", "null"] },
+       { "name" : "jvmthread_statesblocked", "type" : ["int", "null"] },
+       { "name" : "jvmthread_statestimed_waiting", "type" : ["int", "null"] },
+       { "name" : "jvmthread_statesterminated", "type" : ["int", "null"] },
+       { "name" : "jvmthread_count", "type" : "int" },
+       { "name" : "jvmdaemon_thread_count", "type" : "int" },
+       { "name" : "jvmfile_descriptor_usage", "type" : "double" },
+       { "name" : "jvmgcruns", "type" : ["long", "null"] },
+       { "name" : "jvmgctime", "type" : ["long", "null"] }
+  ]
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/6fbe1515/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteMetricsReportingTask.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteMetricsReportingTask.java
 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteMetricsReportingTask.java
new file mode 100644
index 0000000..c699a1c
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteMetricsReportingTask.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.reporting;
+
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.UUID;
+
+import javax.json.Json;
+import javax.json.JsonArray;
+import javax.json.JsonObject;
+import javax.json.JsonReader;
+import javax.json.JsonValue;
+
+import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.MockRecordWriter;
+import org.apache.nifi.state.MockStateManager;
+import org.apache.nifi.util.MockPropertyValue;
+import org.apache.nifi.util.TestRunner;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class TestSiteToSiteMetricsReportingTask {
+
+    private ReportingContext context;
+    private ProcessGroupStatus status;
+    private TestRunner runner;
+
+    @Before
+    public void setup() {
+        status = new ProcessGroupStatus();
+        status.setId("1234");
+        status.setFlowFilesReceived(5);
+        status.setBytesReceived(10000);
+        status.setFlowFilesSent(10);
+        status.setBytesSent(20000);
+        status.setQueuedCount(100);
+        status.setQueuedContentSize(1024L);
+        status.setBytesRead(60000L);
+        status.setBytesWritten(80000L);
+        status.setActiveThreadCount(5);
+
+        // create a processor status with processing time
+        ProcessorStatus procStatus = new ProcessorStatus();
+        procStatus.setProcessingNanos(123456789);
+
+        Collection<ProcessorStatus> processorStatuses = new ArrayList<>();
+        processorStatuses.add(procStatus);
+        status.setProcessorStatus(processorStatuses);
+
+        // create a group status with processing time
+        ProcessGroupStatus groupStatus = new ProcessGroupStatus();
+        groupStatus.setProcessorStatus(processorStatuses);
+
+        Collection<ProcessGroupStatus> groupStatuses = new ArrayList<>();
+        groupStatuses.add(groupStatus);
+        status.setProcessGroupStatus(groupStatuses);
+    }
+
+    public MockSiteToSiteMetricsReportingTask initTask(Map<PropertyDescriptor, 
String> customProperties) throws InitializationException, IOException {
+
+        final MockSiteToSiteMetricsReportingTask task = new 
MockSiteToSiteMetricsReportingTask();
+        Map<PropertyDescriptor, String> properties = new HashMap<>();
+        for (final PropertyDescriptor descriptor : 
task.getSupportedPropertyDescriptors()) {
+            properties.put(descriptor, descriptor.getDefaultValue());
+        }
+        properties.putAll(customProperties);
+
+        context = Mockito.mock(ReportingContext.class);
+        Mockito.when(context.getStateManager()).thenReturn(new 
MockStateManager(task));
+        Mockito.doAnswer(new Answer<PropertyValue>() {
+            @Override
+            public PropertyValue answer(final InvocationOnMock invocation) 
throws Throwable {
+                final PropertyDescriptor descriptor = 
invocation.getArgumentAt(0, PropertyDescriptor.class);
+                return new MockPropertyValue(properties.get(descriptor));
+            }
+        }).when(context).getProperty(Mockito.any(PropertyDescriptor.class));
+
+        final EventAccess eventAccess = Mockito.mock(EventAccess.class);
+        Mockito.when(context.getEventAccess()).thenReturn(eventAccess);
+        Mockito.when(eventAccess.getControllerStatus()).thenReturn(status);
+
+        final PropertyValue pValue = Mockito.mock(StandardPropertyValue.class);
+        MockRecordWriter writer = new MockRecordWriter();
+        
Mockito.when(context.getProperty(MockSiteToSiteMetricsReportingTask.RECORD_WRITER)).thenReturn(pValue);
+        
Mockito.when(pValue.asControllerService(RecordSetWriterFactory.class)).thenReturn(writer);
+
+        final ComponentLog logger = Mockito.mock(ComponentLog.class);
+        final ReportingInitializationContext initContext = 
Mockito.mock(ReportingInitializationContext.class);
+        
Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString());
+        Mockito.when(initContext.getLogger()).thenReturn(logger);
+        task.initialize(initContext);
+
+        return task;
+    }
+
+    @Test
+    public void testValidationBothAmbariFormatRecordWriter() throws 
IOException {
+        ValidationContext validationContext = 
Mockito.mock(ValidationContext.class);
+        final String urlEL = "http://${hostname(true)}:8080/nifi";
+        final String url = "http://localhost:8080/nifi";;
+
+        final MockSiteToSiteMetricsReportingTask task = new 
MockSiteToSiteMetricsReportingTask();
+        Map<PropertyDescriptor, String> properties = new HashMap<>();
+        for (final PropertyDescriptor descriptor : 
task.getSupportedPropertyDescriptors()) {
+            properties.put(descriptor, descriptor.getDefaultValue());
+        }
+
+        properties.put(SiteToSiteMetricsReportingTask.FORMAT, 
SiteToSiteMetricsReportingTask.AMBARI_FORMAT.getValue());
+        properties.put(SiteToSiteMetricsReportingTask.DESTINATION_URL, url);
+        properties.put(SiteToSiteMetricsReportingTask.INSTANCE_URL, url);
+        properties.put(SiteToSiteMetricsReportingTask.PORT_NAME, "port");
+
+        final PropertyValue pValueUrl = 
Mockito.mock(StandardPropertyValue.class);
+        
Mockito.when(validationContext.newPropertyValue(url)).thenReturn(pValueUrl);
+        
Mockito.when(validationContext.newPropertyValue(urlEL)).thenReturn(pValueUrl);
+        
Mockito.when(pValueUrl.evaluateAttributeExpressions()).thenReturn(pValueUrl);
+        Mockito.when(pValueUrl.getValue()).thenReturn(url);
+
+        Mockito.doAnswer(new Answer<PropertyValue>() {
+            @Override
+            public PropertyValue answer(final InvocationOnMock invocation) 
throws Throwable {
+                final PropertyDescriptor descriptor = 
invocation.getArgumentAt(0, PropertyDescriptor.class);
+                return new MockPropertyValue(properties.get(descriptor));
+            }
+        
}).when(validationContext).getProperty(Mockito.any(PropertyDescriptor.class));
+
+        final PropertyValue pValue = Mockito.mock(StandardPropertyValue.class);
+        
Mockito.when(validationContext.getProperty(MockSiteToSiteMetricsReportingTask.RECORD_WRITER)).thenReturn(pValue);
+        Mockito.when(pValue.isSet()).thenReturn(true);
+
+        // should be invalid because both ambari format and record writer are 
set
+        Collection<ValidationResult> list = task.validate(validationContext);
+        Assert.assertEquals(1, list.size());
+        
Assert.assertEquals(SiteToSiteMetricsReportingTask.RECORD_WRITER.getDisplayName(),
 list.iterator().next().getInput());
+    }
+
+    @Test
+    public void testValidationRecordFormatNoRecordWriter() throws IOException {
+        ValidationContext validationContext = 
Mockito.mock(ValidationContext.class);
+        final String urlEL = "http://${hostname(true)}:8080/nifi";
+        final String url = "http://localhost:8080/nifi";;
+
+        final MockSiteToSiteMetricsReportingTask task = new 
MockSiteToSiteMetricsReportingTask();
+        Map<PropertyDescriptor, String> properties = new HashMap<>();
+        for (final PropertyDescriptor descriptor : 
task.getSupportedPropertyDescriptors()) {
+            properties.put(descriptor, descriptor.getDefaultValue());
+        }
+
+        properties.put(SiteToSiteMetricsReportingTask.FORMAT, 
SiteToSiteMetricsReportingTask.RECORD_FORMAT.getValue());
+        properties.put(SiteToSiteMetricsReportingTask.DESTINATION_URL, url);
+        properties.put(SiteToSiteMetricsReportingTask.INSTANCE_URL, url);
+        properties.put(SiteToSiteMetricsReportingTask.PORT_NAME, "port");
+
+        final PropertyValue pValueUrl = 
Mockito.mock(StandardPropertyValue.class);
+        
Mockito.when(validationContext.newPropertyValue(url)).thenReturn(pValueUrl);
+        
Mockito.when(validationContext.newPropertyValue(urlEL)).thenReturn(pValueUrl);
+        
Mockito.when(pValueUrl.evaluateAttributeExpressions()).thenReturn(pValueUrl);
+        Mockito.when(pValueUrl.getValue()).thenReturn(url);
+
+        Mockito.doAnswer(new Answer<PropertyValue>() {
+            @Override
+            public PropertyValue answer(final InvocationOnMock invocation) 
throws Throwable {
+                final PropertyDescriptor descriptor = 
invocation.getArgumentAt(0, PropertyDescriptor.class);
+                return new MockPropertyValue(properties.get(descriptor));
+            }
+        
}).when(validationContext).getProperty(Mockito.any(PropertyDescriptor.class));
+
+        final PropertyValue pValue = Mockito.mock(StandardPropertyValue.class);
+        
Mockito.when(validationContext.getProperty(MockSiteToSiteMetricsReportingTask.RECORD_WRITER)).thenReturn(pValue);
+        Mockito.when(pValue.isSet()).thenReturn(false);
+
+        // should be invalid because both ambari format and record writer are 
set
+        Collection<ValidationResult> list = task.validate(validationContext);
+        Assert.assertEquals(1, list.size());
+        
Assert.assertEquals(SiteToSiteMetricsReportingTask.RECORD_WRITER.getDisplayName(),
 list.iterator().next().getInput());
+    }
+
+    @Test
+    public void testAmbariFormat() throws IOException, InitializationException 
{
+
+        final Map<PropertyDescriptor, String> properties = new HashMap<>();
+        properties.put(SiteToSiteMetricsReportingTask.FORMAT, 
SiteToSiteMetricsReportingTask.AMBARI_FORMAT.getValue());
+
+        MockSiteToSiteMetricsReportingTask task = initTask(properties);
+        task.onTrigger(context);
+
+        assertEquals(1, task.dataSent.size());
+        final String msg = new String(task.dataSent.get(0), 
StandardCharsets.UTF_8);
+        JsonReader jsonReader = Json.createReader(new 
ByteArrayInputStream(msg.getBytes()));
+        JsonArray array = jsonReader.readObject().getJsonArray("metrics");
+        for(int i = 0; i < array.size(); i++) {
+            JsonObject object = array.getJsonObject(i);
+            assertEquals("nifi", object.getString("appid"));
+            assertEquals("1234", object.getString("instanceid"));
+            if(object.getString("metricname").equals("FlowFilesQueued")) {
+                for(Entry<String, JsonValue> kv : 
object.getJsonObject("metrics").entrySet()) {
+                    assertEquals("\"100\"", kv.getValue().toString());
+                }
+                return;
+            }
+        }
+        fail();
+    }
+
+    @Test
+    public void testRecordFormat() throws IOException, InitializationException 
{
+        final Map<PropertyDescriptor, String> properties = new HashMap<>();
+        properties.put(SiteToSiteMetricsReportingTask.FORMAT, 
SiteToSiteMetricsReportingTask.RECORD_FORMAT.getValue());
+        properties.put(SiteToSiteMetricsReportingTask.RECORD_WRITER, 
"record-writer");
+        MockSiteToSiteMetricsReportingTask task = initTask(properties);
+
+        task.onTrigger(context);
+
+        assertEquals(1, task.dataSent.size());
+        String[] data = new String(task.dataSent.get(0)).split(",");
+        assertEquals("\"nifi\"", data[0]);
+        assertEquals("\"1234\"", data[1]);
+        assertEquals("\"100\"", data[10]); // FlowFilesQueued
+    }
+
+    private static final class MockSiteToSiteMetricsReportingTask extends 
SiteToSiteMetricsReportingTask {
+
+        public MockSiteToSiteMetricsReportingTask() throws IOException {
+            super();
+        }
+
+        final List<byte[]> dataSent = new ArrayList<>();
+
+        @Override
+        protected SiteToSiteClient getClient() {
+            final SiteToSiteClient client = 
Mockito.mock(SiteToSiteClient.class);
+            final Transaction transaction = Mockito.mock(Transaction.class);
+
+            try {
+                Mockito.doAnswer(new Answer<Object>() {
+                    @Override
+                    public Object answer(final InvocationOnMock invocation) 
throws Throwable {
+                        final byte[] data = invocation.getArgumentAt(0, 
byte[].class);
+                        dataSent.add(data);
+                        return null;
+                    }
+                }).when(transaction).send(Mockito.any(byte[].class), 
Mockito.any(Map.class));
+
+                
Mockito.when(client.createTransaction(Mockito.any(TransferDirection.class))).thenReturn(transaction);
+            } catch (final Exception e) {
+                e.printStackTrace();
+                Assert.fail(e.toString());
+            }
+
+            return client;
+        }
+    }
+
+}

Reply via email to