[ 
https://issues.apache.org/jira/browse/NIFI-987?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14990105#comment-14990105
 ] 

ASF GitHub Bot commented on NIFI-987:
-------------------------------------

Github user apiri commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/91#discussion_r43918298
  
    --- Diff: 
nifi-nar-bundles/nifi-riemann-bundle/nifi-riemann-processors/src/main/java/org/apache/nifi/processors/riemann/PutRiemann.java
 ---
    @@ -0,0 +1,376 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.riemann;
    +
    +import com.aphyr.riemann.Proto;
    +import com.aphyr.riemann.Proto.Event;
    +import com.aphyr.riemann.client.RiemannClient;
    +import org.apache.nifi.annotation.behavior.DynamicProperty;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.PropertyValue;
    +import org.apache.nifi.components.Validator;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.TimeUnit;
    +
    +@Tags({"riemann", "monitoring", "metrics"})
    +@DynamicProperty(name = "Custom Event Attribute", 
supportsExpressionLanguage = true,
    +  description = "These values will be attached to the Riemann event as a 
custom attribute",
    +  value = "Any value or expression")
    +@CapabilityDescription("Send events to Riemann")
    +@SupportsBatching
    +public class PutRiemann extends AbstractProcessor {
    +  protected enum Transport {
    +    TCP, UDP
    +  }
    +
    +  protected RiemannClient riemannClient = null;
    +  protected Transport transport;
    +
    +  public static final Relationship REL_SUCCESS = new Relationship.Builder()
    +    .name("success")
    +    .description("Metrics successfully written to Riemann")
    +    .build();
    +
    +  public static final Relationship REL_FAILURE = new Relationship.Builder()
    +    .name("failure")
    +    .description("Metrics which failed to write to Riemann")
    +    .build();
    +
    +
    +  public static final PropertyDescriptor RIEMANN_HOST = new 
PropertyDescriptor.Builder()
    +    .name("Riemann Address")
    +    .description("Hostname of Riemann server")
    +    .required(true)
    +    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +    .build();
    +
    +  public static final PropertyDescriptor RIEMANN_PORT = new 
PropertyDescriptor.Builder()
    +    .name("Riemann Port")
    +    .description("Port that Riemann is listening on")
    +    .required(true)
    +    .defaultValue("5555")
    +    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +    .addValidator(StandardValidators.INTEGER_VALIDATOR)
    +    .build();
    +
    +  public static final PropertyDescriptor TRANSPORT_PROTOCOL = new 
PropertyDescriptor.Builder()
    +    .name("Transport Protocol")
    +    .description("Transport protocol to speak to Riemann in")
    +    .required(true)
    +    .allowableValues(new Transport[]{Transport.TCP, Transport.UDP})
    +    .defaultValue("TCP")
    +    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +    .build();
    +
    +  public static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
    +    .name("Batch Size")
    +    .description("Batch size for incoming FlowFiles")
    +    .required(false)
    +    .defaultValue("100")
    +    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +    .addValidator(StandardValidators.INTEGER_VALIDATOR)
    +    .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +    .build();
    +
    +  // Attributes Mappings
    +  public static final PropertyDescriptor ATTR_SERVICE = new 
PropertyDescriptor.Builder()
    +    .name("Service")
    +    .description("Name of service for the event")
    +    .required(false)
    +    .expressionLanguageSupported(true)
    +    .addValidator(Validator.VALID)
    +    .build();
    +
    +  public static final PropertyDescriptor ATTR_STATE = new 
PropertyDescriptor.Builder()
    +    .name("State")
    +    .description("State of service for the event (e.g. ok, warning, or 
critical)")
    +    .required(false)
    +    .expressionLanguageSupported(true)
    +    .addValidator(Validator.VALID)
    +    .build();
    +
    +  public static final PropertyDescriptor ATTR_TIME = new 
PropertyDescriptor.Builder()
    +    .name("Time")
    +    .description("Time for event")
    +    .required(false)
    +    .expressionLanguageSupported(true)
    +    .addValidator(Validator.VALID)
    +    .expressionLanguageSupported(true)
    +    .build();
    +
    +  public static final PropertyDescriptor ATTR_HOST = new 
PropertyDescriptor.Builder()
    +    .name("Host")
    +    .description("Name of host for the event")
    +    .required(false)
    +    .defaultValue("${hostname()}")
    +    .expressionLanguageSupported(true)
    +    .addValidator(Validator.VALID)
    +    .build();
    +
    +  public static final PropertyDescriptor ATTR_TTL = new 
PropertyDescriptor.Builder()
    +    .name("TTL")
    +    .description("Time to live for the event")
    +    .required(false)
    +    .addValidator(Validator.VALID)
    +    .expressionLanguageSupported(true)
    +    .build();
    +
    +  public static final PropertyDescriptor ATTR_METRIC = new 
PropertyDescriptor.Builder()
    +    .name("Metric")
    +    .description("Metric for the event")
    +    .required(false)
    +    .addValidator(Validator.VALID)
    +    .expressionLanguageSupported(true)
    +    .build();
    +
    +  public static final PropertyDescriptor ATTR_DESCRIPTION = new 
PropertyDescriptor.Builder()
    +    .name("Description")
    +    .description("Description for the event")
    +    .required(false)
    +    .expressionLanguageSupported(true)
    +    .addValidator(Validator.VALID)
    +    .build();
    +
    +
    +  public static final PropertyDescriptor ATTR_TAGS = new 
PropertyDescriptor.Builder()
    +    .name("Tags")
    +    .description("Comma separated list of tags for the event")
    +    .required(false)
    +    .expressionLanguageSupported(true)
    +    .addValidator(Validator.VALID)
    +    .build();
    +
    +  public static final PropertyDescriptor TIMEOUT = new 
PropertyDescriptor.Builder()
    +    .name("Timeout")
    +    .description("Timeout in milliseconds when writing events to Riemann")
    +    .required(true)
    +    .defaultValue("1000")
    +    .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
    +    .build();
    +
    +  private List<PropertyDescriptor> customAttributes = new ArrayList<>();
    +  private static final Set<Relationship> relationships = new HashSet<>();
    +  private static final List<PropertyDescriptor> localProperties = new 
ArrayList<>();
    +
    +  private int batchSize = -1;
    +  private long writeTimeout = 1000;
    +
    +  static {
    +    relationships.add(REL_SUCCESS);
    +    relationships.add(REL_FAILURE);
    +    localProperties.add(RIEMANN_HOST);
    +    localProperties.add(RIEMANN_PORT);
    +    localProperties.add(TRANSPORT_PROTOCOL);
    +    localProperties.add(TIMEOUT);
    +    localProperties.add(BATCH_SIZE);
    +    localProperties.add(ATTR_DESCRIPTION);
    +    localProperties.add(ATTR_SERVICE);
    +    localProperties.add(ATTR_STATE);
    +    localProperties.add(ATTR_METRIC);
    +    localProperties.add(ATTR_TTL);
    +    localProperties.add(ATTR_TAGS);
    +    localProperties.add(ATTR_HOST);
    +    localProperties.add(ATTR_TIME);
    +  }
    +
    +
    +  @Override
    +  public Set<Relationship> getRelationships() {
    +    return relationships;
    +  }
    +
    +  @Override
    +  protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +    return localProperties;
    +  }
    +
    +  @Override
    +  protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
    +    return new PropertyDescriptor.Builder()
    +      .name(propertyDescriptorName)
    +      .expressionLanguageSupported(true)
    +      .addValidator(Validator.VALID)
    +      .required(false)
    +      .dynamic(true)
    +      .build();
    +  }
    +
    +  @OnStopped
    +  public final void cleanUpClient() {
    +    if (riemannClient != null) {
    +      this.riemannClient.close();
    +    }
    +    this.riemannClient = null;
    +    this.batchSize = -1;
    +    this.customAttributes.clear();
    +  }
    +
    +  @OnScheduled
    +  public void onScheduled(ProcessContext context) throws ProcessException {
    +    if (batchSize == -1) {
    +      batchSize = context.getProperty(BATCH_SIZE).asInteger();
    +    }
    +    if (riemannClient == null || !riemannClient.isConnected()) {
    +      transport = 
Transport.valueOf(context.getProperty(TRANSPORT_PROTOCOL).getValue());
    +      String host = context.getProperty(RIEMANN_HOST).getValue().trim();
    +      int port = context.getProperty(RIEMANN_PORT).asInteger();
    +      writeTimeout = context.getProperty(TIMEOUT).asLong();
    +      RiemannClient client = null;
    +      try {
    +        switch (transport) {
    +          case TCP:
    +            client = RiemannClient.tcp(host, port);
    +            break;
    +          case UDP:
    +            client = RiemannClient.udp(host, port);
    +            break;
    +        }
    +        client.connect();
    +        riemannClient = client;
    +      } catch (IOException e) {
    +        context.yield();
    +        throw new ProcessException(String.format("Unable to connect to 
Riemann [%s:%d] (%s)\n%s", host, port, transport, e.getMessage()));
    +      }
    +    }
    +
    +    if (customAttributes.size() == 0) {
    +      for (Map.Entry<PropertyDescriptor, String> property : 
context.getProperties().entrySet()) {
    +        // only custom defined properties
    +        if 
(!getSupportedPropertyDescriptors().contains(property.getKey())) {
    +          customAttributes.add(property.getKey());
    +        }
    +      }
    +    }
    +  }
    +
    +
    +  @Override
    +  public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
    +    // Check if the client is currently connected, as a previous trigger 
could have detected a failure
    +    // in the connection.
    +    if (riemannClient == null || !riemannClient.isConnected()) {
    +      // clean up the client and attempt to re-initialize the processor
    +      cleanUpClient();
    +      onScheduled(context);
    +    }
    +
    +    List<FlowFile> incomingFlowFiles = session.get(batchSize);
    +    List<FlowFile> successfulFlowFiles = new 
ArrayList<>(incomingFlowFiles.size());
    +    List<Event> eventsQueue = new ArrayList<>(incomingFlowFiles.size());
    +    for (FlowFile flowFile : incomingFlowFiles) {
    +      try {
    +        eventsQueue.add(FlowFileToEvent.fromAttributes(context, 
customAttributes, flowFile));
    +        successfulFlowFiles.add(flowFile);
    +      } catch (NumberFormatException e) {
    +        getLogger().warn(e.getMessage());
    +        session.transfer(flowFile, REL_FAILURE);
    +      }
    +    }
    +    try {
    +      if (transport == Transport.TCP) {
    +        Proto.Msg returnMessage = 
riemannClient.sendEvents(eventsQueue).deref(writeTimeout, 
TimeUnit.MILLISECONDS);
    +        if (returnMessage == null) {
    +          context.yield();
    +          throw new ProcessException("Timed out writing to Riemann!");
    +        }
    +      } else {
    +        riemannClient.sendEvents(eventsQueue);
    +      }
    +      riemannClient.flush();
    +    } catch (Exception e) {
    +      context.yield();
    +      throw new ProcessException("Failed writing to Riemann\n" + 
e.getMessage());
    +    }
    +    session.transfer(successfulFlowFiles, REL_SUCCESS);
    +    session.commit();
    --- End diff --
    
    Framework provides this at the conclusion of the onTrigger method as per 
AbstractProcessor#onTrigger(final ProcessContext context, final 
ProcessSessionFactory sessionFactory)


> Add Processor for Writing Events to Riemann
> -------------------------------------------
>
>                 Key: NIFI-987
>                 URL: https://issues.apache.org/jira/browse/NIFI-987
>             Project: Apache NiFi
>          Issue Type: New Feature
>            Reporter: Ricky Saltzer
>            Assignee: Ricky Saltzer
>             Fix For: 0.4.0
>
>         Attachments: Sample Riemann Dataflow .png
>
>
> Riemann (http://riemann.io) is a new framework for monitoring distributed 
> systems. It's particular useful for sending ad-hoc events such as, heartbeats 
> and metrics. It would be nice if NiFi had a PutRiemann processor for writing 
> events using the NiFi expression language.
> A simple use case would be a data flow that repeatedly checks specific 
> services over TCP, HTTP, etc and checks into Riemann. Another example would 
> be detecting a blip in events coming down a stream using a simple event 
> heartbeat mechanism. I'll post a couple visuals to help make these examples 
> more concrete.
> I have an initial PutRiemann processor made. I will post the patch via a 
> Github pull request later today.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to