[
https://issues.apache.org/jira/browse/NIFI-1337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15119724#comment-15119724
]
ASF GitHub Bot commented on NIFI-1337:
--------------------------------------
Github user rickysaltzer commented on a diff in the pull request:
https://github.com/apache/nifi/pull/188#discussion_r51010255
--- Diff:
nifi-nar-bundles/nifi-riemann-bundle/nifi-riemann-reporting-task/src/main/java/org/apache/nifi/reporting/riemann/RiemannReportingTask.java
---
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.riemann;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.reporting.AbstractReportingTask;
+import org.apache.nifi.reporting.Bulletin;
+import org.apache.nifi.reporting.BulletinQuery;
+import org.apache.nifi.reporting.ReportingContext;
+import org.apache.nifi.reporting.riemann.metrics.MetricsService;
+
+import com.aphyr.riemann.Proto;
+import com.aphyr.riemann.client.IPromise;
+import com.aphyr.riemann.client.RiemannClient;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.yammer.metrics.core.VirtualMachineMetrics;
+
+@Tags({ "reporting", "riemann", "metrics" })
+@DynamicProperty(name = "Attribute Name", value = "Attribute Value",
supportsExpressionLanguage = false,
+ description = "Additional attributes may be attached to the event
by adding dynamic properties")
+@CapabilityDescription("Publish NiFi metrics to Riemann. These metrics
include " + "JVM, Processor, and General Data Flow metrics. In addition, you
may also forward bulletin " + "board messages.")
+public class RiemannReportingTask extends AbstractReportingTask {
+ public static final PropertyDescriptor RIEMANN_HOST = new
PropertyDescriptor.Builder().name("Riemann Address").description("Hostname of
Riemann server").required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
+ public static final PropertyDescriptor RIEMANN_PORT = new
PropertyDescriptor.Builder().name("Riemann Port").description("Port that
Riemann is listening on").required(true).defaultValue("5555")
+ .addValidator(StandardValidators.PORT_VALIDATOR).build();
+ public static final PropertyDescriptor TRANSPORT_PROTOCOL = new
PropertyDescriptor.Builder().name("Transport Protocol").description("Transport
protocol to speak to Riemann in").required(true)
+ .allowableValues(new Transport[] { Transport.TCP,
Transport.UDP }).defaultValue("TCP").build();
+ public static final PropertyDescriptor SERVICE_PREFIX = new
PropertyDescriptor.Builder().name("Prefix for Service
Name").description("Prefix to use when reporting to
Riemann").defaultValue("nifi")
+
.required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
+ public static final PropertyDescriptor WRITE_TIMEOUT = new
PropertyDescriptor.Builder().name("Timeout").description("Timeout in
milliseconds when writing events to Riemann").required(true)
+
.defaultValue("500ms").addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).build();
+ static final PropertyDescriptor HOSTNAME = new
PropertyDescriptor.Builder().name("Hostname").description("The Hostname of this
NiFi instance to report to Riemann").required(true)
+
.expressionLanguageSupported(true).defaultValue("${hostname(true)}").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
+ static final PropertyDescriptor TAGS = new
PropertyDescriptor.Builder().name("Tags").description("Comma separated list of
tags to include ").required(true).expressionLanguageSupported(true)
+
.defaultValue("nifi,metrics").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
+ static final PropertyDescriptor SEND_JVM_METRICS = new
PropertyDescriptor.Builder().name("JVM Metrics").description("Forwards NiFi JVM
metrics to Riemann").allowableValues("true", "false")
+
.required(true).defaultValue("true").addValidator(StandardValidators.BOOLEAN_VALIDATOR).build();
+ static final PropertyDescriptor SEND_NIFI_METRICS = new
PropertyDescriptor.Builder().name("NiFi Metrics").description("Forwards
aggregated data flow metrics to Riemann")
+ .allowableValues("true",
"false").required(true).defaultValue("true").addValidator(StandardValidators.BOOLEAN_VALIDATOR).build();
+ static final PropertyDescriptor SEND_PROCESSOR_METRICS = new
PropertyDescriptor.Builder().name("Processor Metrics").description("Forwards
metrics for individual processor to Riemann")
+ .allowableValues("true",
"false").required(true).defaultValue("true").addValidator(StandardValidators.BOOLEAN_VALIDATOR).build();
+ static final PropertyDescriptor SEND_BULLETIN_MESSAGES = new
PropertyDescriptor.Builder().name("Bulletin Messages").description("Forwards
messages from the Bulletin board to Riemann")
+ .allowableValues("true",
"false").required(true).defaultValue("true").addValidator(StandardValidators.BOOLEAN_VALIDATOR).build();
+ static final PropertyDescriptor MIN_BULLETIN_LEVEL = new
PropertyDescriptor.Builder().name("Minimum Bulletin Level")
+ .description("Only forward bulletin messages at this level and
above to
Riemann").allowableValues(LogLevel.values()).required(true).defaultValue("WARNING").build();
+ protected volatile Transport transport;
+ private volatile long lastObservedBulletinId = 0;
+ private volatile RiemannClient riemannClient = null;
+ private volatile MetricsService metricsService;
+ private volatile VirtualMachineMetrics virtualMachineMetrics =
VirtualMachineMetrics.getInstance();
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ final List<PropertyDescriptor> properties = Lists.newArrayList();
+ properties.add(RIEMANN_HOST);
+ properties.add(RIEMANN_PORT);
+ properties.add(TRANSPORT_PROTOCOL);
+ properties.add(SERVICE_PREFIX);
+ properties.add(HOSTNAME);
+ properties.add(TAGS);
+ properties.add(SEND_JVM_METRICS);
+ properties.add(SEND_NIFI_METRICS);
+ properties.add(SEND_PROCESSOR_METRICS);
+ properties.add(SEND_BULLETIN_MESSAGES);
+ properties.add(MIN_BULLETIN_LEVEL);
+ properties.add(WRITE_TIMEOUT);
+ return properties;
+ }
+
+ @Override
+ protected PropertyDescriptor
getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+ return new
PropertyDescriptor.Builder().name(propertyDescriptorName).required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(false).dynamic(true)
+ .build();
+ }
+
+ @OnStopped
+ public final void cleanUpClient() {
+ if (riemannClient != null) {
+ this.riemannClient.close();
+ }
+ this.riemannClient = null;
+ }
+
+ @OnScheduled
+ public void onScheduled(final ConfigurationContext context) throws
ProcessException {
+ if (riemannClient == null || !riemannClient.isConnected()) {
+ transport =
Transport.valueOf(context.getProperty(TRANSPORT_PROTOCOL).getValue());
+ String host =
context.getProperty(RIEMANN_HOST).evaluateAttributeExpressions().getValue();
+ int port = context.getProperty(RIEMANN_PORT).asInteger();
+ RiemannClient client = null;
+ try {
+ switch (transport) {
+ case TCP:
+ client = RiemannClient.tcp(host, port);
+ break;
+ case UDP:
+ client = RiemannClient.udp(host, port);
+ break;
+ }
--- End diff --
Since the `TRANSPORT_PROTOCOL` property only allows the `Transport.TCP` and
`Transport.UDP` values, we shouldn't have to worry about handling an additional
value.
> Add Riemann Reporting Task
> --------------------------
>
> Key: NIFI-1337
> URL: https://issues.apache.org/jira/browse/NIFI-1337
> Project: Apache NiFi
> Issue Type: New Feature
> Reporter: Ricky Saltzer
> Assignee: Ricky Saltzer
> Attachments: example-graph.png, riemann-reporting-configuration.png
>
>
> It would be beneficial for NiFi to report health information to
> [Riemann|http://riemann.io] for monitoring purposes. I plan on implementing
> this, and will use this JIRA to track the progress.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)