[
https://issues.apache.org/jira/browse/NIFI-987?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14987806#comment-14987806
]
ASF GitHub Bot commented on NIFI-987:
-------------------------------------
Github user rickysaltzer commented on a diff in the pull request:
https://github.com/apache/nifi/pull/91#discussion_r43786954
--- Diff:
nifi-nar-bundles/nifi-riemann-bundle/nifi-riemann-processors/src/main/java/org/apache/nifi/processors/riemann/PutRiemann.java
---
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.riemann;
+
+import com.aphyr.riemann.Proto;
+import com.aphyr.riemann.Proto.Event;
+import com.aphyr.riemann.client.RiemannClient;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@Tags({"riemann", "monitoring", "metrics"})
+@DynamicProperty(name = "Custom Event Attribute",
supportsExpressionLanguage = true,
+ description = "These values will be attached to the Riemann event as a
custom attribute",
+ value = "Any value or expression")
+@CapabilityDescription("Send events to Riemann")
+@SupportsBatching
+public class PutRiemann extends AbstractProcessor {
+ protected enum Transport {
+ TCP, UDP
+ }
+
+ protected RiemannClient riemannClient = null;
+ protected Transport transport;
+
+ public static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("Metrics successfully written to Riemann")
+ .build();
+
+ public static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("Metrics which failed to write to Riemann")
+ .build();
+
+
+ public static final PropertyDescriptor RIEMANN_HOST = new
PropertyDescriptor.Builder()
+ .name("Riemann Address")
+ .description("Hostname of Riemann server")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor RIEMANN_PORT = new
PropertyDescriptor.Builder()
+ .name("Riemann Port")
+ .description("Port that Riemann is listening on")
+ .required(true)
+ .defaultValue("5555")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .addValidator(StandardValidators.INTEGER_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor TRANSPORT_PROTOCOL = new
PropertyDescriptor.Builder()
+ .name("Transport Protocol")
+ .description("Transport protocol to speak to Riemann in")
+ .required(true)
+ .allowableValues(new Transport[]{Transport.TCP, Transport.UDP})
+ .defaultValue("TCP")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor BATCH_SIZE = new
PropertyDescriptor.Builder()
+ .name("Batch Size")
+ .description("Batch size for incoming FlowFiles")
+ .required(false)
+ .defaultValue("100")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .addValidator(StandardValidators.INTEGER_VALIDATOR)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .build();
+
+ // Attributes Mappings
+ public static final PropertyDescriptor ATTR_SERVICE = new
PropertyDescriptor.Builder()
+ .name("Service")
+ .description("Name of service for the event")
+ .required(false)
+ .expressionLanguageSupported(true)
+ .addValidator(Validator.VALID)
+ .build();
+
+ public static final PropertyDescriptor ATTR_STATE = new
PropertyDescriptor.Builder()
+ .name("State")
+ .description("State of service for the event (e.g. ok, warning, or
critical)")
+ .required(false)
+ .expressionLanguageSupported(true)
+ .addValidator(Validator.VALID)
+ .build();
+
+ public static final PropertyDescriptor ATTR_TIME = new
PropertyDescriptor.Builder()
+ .name("Time")
+ .description("Time for event")
+ .required(false)
+ .expressionLanguageSupported(true)
+ .addValidator(Validator.VALID)
+ .expressionLanguageSupported(true)
+ .build();
+
+ public static final PropertyDescriptor ATTR_HOST = new
PropertyDescriptor.Builder()
+ .name("Host")
+ .description("Name of host for the event")
+ .required(false)
+ .defaultValue("${hostname()}")
+ .expressionLanguageSupported(true)
+ .addValidator(Validator.VALID)
+ .build();
+
+ public static final PropertyDescriptor ATTR_TTL = new
PropertyDescriptor.Builder()
+ .name("TTL")
+ .description("Time to live for the event")
+ .required(false)
+ .addValidator(Validator.VALID)
+ .expressionLanguageSupported(true)
+ .build();
+
+ public static final PropertyDescriptor ATTR_METRIC = new
PropertyDescriptor.Builder()
+ .name("Metric")
+ .description("Metric for the event")
+ .required(false)
+ .addValidator(Validator.VALID)
+ .expressionLanguageSupported(true)
+ .build();
+
+ public static final PropertyDescriptor ATTR_DESCRIPTION = new
PropertyDescriptor.Builder()
+ .name("Description")
+ .description("Description for the event")
+ .required(false)
+ .expressionLanguageSupported(true)
+ .addValidator(Validator.VALID)
+ .build();
+
+
+ public static final PropertyDescriptor ATTR_TAGS = new
PropertyDescriptor.Builder()
+ .name("Tags")
+ .description("Comma separated list of tags for the event")
+ .required(false)
+ .expressionLanguageSupported(true)
+ .addValidator(Validator.VALID)
+ .build();
+
+ public static final PropertyDescriptor TIMEOUT = new
PropertyDescriptor.Builder()
+ .name("Timeout")
+ .description("Timeout in milliseconds when writing events to Riemann")
+ .required(true)
+ .defaultValue("1000")
+ .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
+ .build();
+
+ private List<PropertyDescriptor> customAttributes = new ArrayList<>();
+ private static final Set<Relationship> relationships = new HashSet<>();
+ private static final List<PropertyDescriptor> localProperties = new
ArrayList<>();
+
+ private int batchSize = -1;
+ private long writeTimeout = 1000;
+
+ static {
+ relationships.add(REL_SUCCESS);
+ relationships.add(REL_FAILURE);
+ localProperties.add(RIEMANN_HOST);
+ localProperties.add(RIEMANN_PORT);
+ localProperties.add(TRANSPORT_PROTOCOL);
+ localProperties.add(TIMEOUT);
+ localProperties.add(BATCH_SIZE);
+ localProperties.add(ATTR_DESCRIPTION);
+ localProperties.add(ATTR_SERVICE);
+ localProperties.add(ATTR_STATE);
+ localProperties.add(ATTR_METRIC);
+ localProperties.add(ATTR_TTL);
+ localProperties.add(ATTR_TAGS);
+ localProperties.add(ATTR_HOST);
+ localProperties.add(ATTR_TIME);
+ }
+
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return localProperties;
+ }
+
+ @Override
+ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final
String propertyDescriptorName) {
+ return new PropertyDescriptor.Builder()
+ .name(propertyDescriptorName)
+ .expressionLanguageSupported(true)
+ .addValidator(Validator.VALID)
+ .required(false)
+ .dynamic(true)
+ .build();
+ }
+
+ @OnStopped
+ public final void cleanUpClient() {
+ if (riemannClient != null) {
+ this.riemannClient.close();
+ }
+ this.riemannClient = null;
+ this.batchSize = -1;
+ this.customAttributes.clear();
+ }
+
+ @OnScheduled
+ public void onScheduled(ProcessContext context) throws ProcessException {
+ if (batchSize == -1) {
+ batchSize = context.getProperty(BATCH_SIZE).asInteger();
+ }
+ if (riemannClient == null || !riemannClient.isConnected()) {
+ transport =
Transport.valueOf(context.getProperty(TRANSPORT_PROTOCOL).getValue());
+ String host = context.getProperty(RIEMANN_HOST).getValue().trim();
+ int port = context.getProperty(RIEMANN_PORT).asInteger();
+ writeTimeout = context.getProperty(TIMEOUT).asLong();
+ RiemannClient client = null;
+ try {
+ switch (transport) {
+ case TCP:
+ client = RiemannClient.tcp(host, port);
+ break;
+ case UDP:
+ client = RiemannClient.udp(host, port);
+ break;
+ }
+ client.connect();
+ riemannClient = client;
+ } catch (IOException e) {
+ context.yield();
+ throw new ProcessException(String.format("Unable to connect to
Riemann [%s:%d] (%s)\n%s", host, port, transport, e.getMessage()));
+ }
+ }
+
+ if (customAttributes.size() == 0) {
+ for (Map.Entry<PropertyDescriptor, String> property :
context.getProperties().entrySet()) {
+ // only custom defined properties
+ if
(!getSupportedPropertyDescriptors().contains(property.getKey())) {
+ customAttributes.add(property.getKey());
+ }
+ }
+ }
+ }
+
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session)
throws ProcessException {
+ // Check if the client is currently connected, as a previous trigger
could have detected a failure
+ // in the connection.
+ if (riemannClient == null || !riemannClient.isConnected()) {
+ // clean up the client and attempt to re-initialize the processor
+ cleanUpClient();
+ onScheduled(context);
+ }
+
+ List<FlowFile> incomingFlowFiles = session.get(batchSize);
+ List<FlowFile> successfulFlowFiles = new
ArrayList<>(incomingFlowFiles.size());
+ List<Event> eventsQueue = new ArrayList<>(incomingFlowFiles.size());
+ for (FlowFile flowFile : incomingFlowFiles) {
+ try {
+ eventsQueue.add(FlowFileToEvent.fromAttributes(context,
customAttributes, flowFile));
+ successfulFlowFiles.add(flowFile);
+ } catch (NumberFormatException e) {
+ getLogger().warn(e.getMessage());
+ session.transfer(flowFile, REL_FAILURE);
+ }
+ }
+ try {
+ if (transport == Transport.TCP) {
+ Proto.Msg returnMessage =
riemannClient.sendEvents(eventsQueue).deref(writeTimeout,
TimeUnit.MILLISECONDS);
+ if (returnMessage == null) {
+ context.yield();
+ throw new ProcessException("Timed out writing to Riemann!");
+ }
+ } else {
+ riemannClient.sendEvents(eventsQueue);
+ }
+ riemannClient.flush();
+ } catch (Exception e) {
+ context.yield();
+ throw new ProcessException("Failed writing to Riemann\n" +
e.getMessage());
+ }
+ session.transfer(successfulFlowFiles, REL_SUCCESS);
+ session.commit();
+ }
+
+ /**
+ * Converts a FlowFile into a Riemann Protobuf Event
+ */
+ private static class FlowFileToEvent {
+ protected static Event fromAttributes(ProcessContext context,
List<PropertyDescriptor> customProperties,
+ FlowFile flowFile) {
+ Event.Builder builder = Event.newBuilder();
+
+ PropertyValue service =
context.getProperty(ATTR_SERVICE).evaluateAttributeExpressions(flowFile);
+ if (service.getValue() != null && !service.getValue().equals("")) {
+ builder.setService(service.getValue());
+ }
+ PropertyValue description =
context.getProperty(ATTR_DESCRIPTION).evaluateAttributeExpressions(flowFile);
+ if (description.getValue() != null &&
!description.getValue().equals("")) {
+ builder.setDescription(description.getValue());
+ }
+ PropertyValue metric =
context.getProperty(ATTR_METRIC).evaluateAttributeExpressions(flowFile);
+ if (metric.getValue() != null && !metric.getValue().equals("")) {
+ builder.setMetricF(metric.asFloat());
+ }
+ PropertyValue time =
context.getProperty(ATTR_TIME).evaluateAttributeExpressions(flowFile);
+ if (time.getValue() != null && !time.getValue().equals("")) {
+ builder.setTime(time.asLong());
+ }
+ PropertyValue state =
context.getProperty(ATTR_STATE).evaluateAttributeExpressions(flowFile);
+ if (state.getValue() != null && !state.getValue().equals("")) {
+ builder.setState(state.getValue());
+ }
+ PropertyValue ttl =
context.getProperty(ATTR_TTL).evaluateAttributeExpressions(flowFile);
+ if (ttl.getValue() != null && !ttl.getValue().equals("")) {
+ builder.setTtl(ttl.asFloat());
+ }
+ PropertyValue host =
context.getProperty(ATTR_HOST).evaluateAttributeExpressions(flowFile);
+ if (host.getValue() != null && !host.getValue().equals("")) {
+ builder.setHost(host.getValue());
+ }
+ PropertyValue tags =
context.getProperty(ATTR_TAGS).evaluateAttributeExpressions(flowFile);
+ if (tags.getValue() != null && !tags.getValue().equals("")) {
--- End diff --
I believe so, I can add that check
> Add Processor for Writing Events to Riemann
> -------------------------------------------
>
> Key: NIFI-987
> URL: https://issues.apache.org/jira/browse/NIFI-987
> Project: Apache NiFi
> Issue Type: New Feature
> Reporter: Ricky Saltzer
> Assignee: Ricky Saltzer
> Fix For: 0.4.0
>
> Attachments: Sample Riemann Dataflow .png
>
>
> Riemann (http://riemann.io) is a new framework for monitoring distributed
> systems. It's particular useful for sending ad-hoc events such as, heartbeats
> and metrics. It would be nice if NiFi had a PutRiemann processor for writing
> events using the NiFi expression language.
> A simple use case would be a data flow that repeatedly checks specific
> services over TCP, HTTP, etc and checks into Riemann. Another example would
> be detecting a blip in events coming down a stream using a simple event
> heartbeat mechanism. I'll post a couple visuals to help make these examples
> more concrete.
> I have an initial PutRiemann processor made. I will post the patch via a
> Github pull request later today.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)