[ 
https://issues.apache.org/jira/browse/NIFI-1420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15167225#comment-15167225
 ] 

ASF GitHub Bot commented on NIFI-1420:
--------------------------------------

Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/233#discussion_r54098353
  
    --- Diff: 
nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
 ---
    @@ -0,0 +1,474 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processor.util.put;
    +
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processor.util.put.sender.ChannelSender;
    +import org.apache.nifi.processor.util.put.sender.DatagramChannelSender;
    +import org.apache.nifi.processor.util.put.sender.SSLSocketChannelSender;
    +import org.apache.nifi.processor.util.put.sender.SocketChannelSender;
    +
    +import javax.net.ssl.SSLContext;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.Comparator;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * A base class for processors that send data to an external system using 
TCP or UDP.
    + */
    +public abstract class AbstractPutEventProcessor extends 
AbstractSessionFactoryProcessor {
    +
    +    public static final PropertyDescriptor HOSTNAME = new 
PropertyDescriptor.Builder()
    +            .name("Hostname")
    +            .description("The ip address or hostname of the destination.")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .defaultValue("localhost")
    +            .required(true)
    +            .build();
    +    public static final PropertyDescriptor PORT = new PropertyDescriptor
    +            .Builder().name("Port")
    +            .description("The port on the destination.")
    +            .required(true)
    +            .addValidator(StandardValidators.PORT_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor MAX_SOCKET_SEND_BUFFER_SIZE = 
new PropertyDescriptor.Builder()
    +            .name("Max Size of Socket Send Buffer")
    +            .description("The maximum size of the socket send buffer that 
should be used. This is a suggestion to the Operating System " +
    +                    "to indicate how big the socket buffer should be. If 
this value is set too low, the buffer may fill up before " +
    +                    "the data can be read, and incoming data will be 
dropped.")
    +            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
    +            .defaultValue("1 MB")
    +            .required(true)
    +            .build();
    +    public static final PropertyDescriptor CHARSET = new 
PropertyDescriptor.Builder()
    +            .name("Character Set")
    +            .description("Specifies the character set of the data being 
sent.")
    +            .required(true)
    +            .defaultValue("UTF-8")
    +            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor TIMEOUT = new 
PropertyDescriptor.Builder()
    +            .name("Timeout")
    +            .description("The timeout for connecting to and communicating 
with the destination. Does not apply to UDP")
    +            .required(false)
    +            .defaultValue("10 seconds")
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor IDLE_EXPIRATION = new 
PropertyDescriptor
    +            .Builder().name("Idle Connection Expiration")
    +            .description("The amount of time a connection should be held 
open without being used before closing the connection.")
    +            .required(true)
    +            .defaultValue("5 seconds")
    +            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
    +            .build();
    +
    +    // Putting these properties here so sub-classes don't have to redefine 
them, but they are
    +    // not added to the properties by default since not all processors may 
need them
    +
    +    public static final AllowableValue TCP_VALUE = new 
AllowableValue("TCP", "TCP");
    +    public static final AllowableValue UDP_VALUE = new 
AllowableValue("UDP", "UDP");
    +
    +    public static final PropertyDescriptor PROTOCOL = new 
PropertyDescriptor
    +            .Builder().name("Protocol")
    +            .description("The protocol for communication.")
    +            .required(true)
    +            .allowableValues(TCP_VALUE, UDP_VALUE)
    +            .defaultValue(UDP_VALUE.getValue())
    +            .build();
    +    public static final PropertyDescriptor MESSAGE_DELIMITER = new 
PropertyDescriptor.Builder()
    +            .name("Message Delimiter")
    +            .description("Specifies the delimiter to use for splitting 
apart multiple messages within a single FlowFile. "
    +                    + "If not specified, the entire content of the 
FlowFile will be used as a single message. "
    +                    + "If specified, the contents of the FlowFile will be 
split on this delimiter and each section "
    +                    + "sent as a separate message. Note that if messages 
are delimited and some messages for a given FlowFile "
    +                    + "are transferred successfully while others are not, 
the messages will be split into individual FlowFiles, such that those "
    +                    + "messages that were successfully sent are routed to 
the 'success' relationship while other messages are sent to the 'failure' "
    +                    + "relationship.")
    +            .required(false)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .expressionLanguageSupported(true)
    +            .build();
    +
    +    public static final Relationship REL_SUCCESS = new 
Relationship.Builder()
    +            .name("success")
    +            .description("FlowFiles that are sent successfully to the 
destination are sent out this relationship.")
    +            .build();
    +    public static final Relationship REL_FAILURE = new 
Relationship.Builder()
    +            .name("failure")
    +            .description("FlowFiles that failed to send to the destination 
are sent out this relationship.")
    +            .build();
    +
    +    private Set<Relationship> relationships;
    +    private List<PropertyDescriptor> descriptors;
    +
    +    protected volatile String transitUri;
    +    protected volatile BlockingQueue<ChannelSender> senderPool;
    +
    +    protected final BlockingQueue<FlowFileMessageBatch> completeBatches = 
new LinkedBlockingQueue<>();
    +    protected final Set<FlowFileMessageBatch> activeBatches = 
Collections.synchronizedSet(new HashSet<FlowFileMessageBatch>());
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        final List<PropertyDescriptor> descriptors = new ArrayList<>();
    +        descriptors.add(HOSTNAME);
    +        descriptors.add(PORT);
    +        descriptors.add(MAX_SOCKET_SEND_BUFFER_SIZE);
    +        descriptors.add(CHARSET);
    +        descriptors.add(TIMEOUT);
    +        descriptors.add(IDLE_EXPIRATION);
    +        descriptors.addAll(getAdditionalProperties());
    +        this.descriptors = Collections.unmodifiableList(descriptors);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        relationships.add(REL_FAILURE);
    +        relationships.addAll(getAdditionalRelationships());
    +        this.relationships = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    /**
    +     * Override to provide additional relationships for the processor.
    +     *
    +     * @return a list of relationships
    +     */
    +    protected List<Relationship> getAdditionalRelationships() {
    +        return Collections.EMPTY_LIST;
    +    }
    +
    +    /**
    +     * Override to provide additional properties for the processor.
    +     *
    +     * @return a list of properties
    +     */
    +    protected List<PropertyDescriptor> getAdditionalProperties() {
    +        return Collections.EMPTY_LIST;
    +    }
    --- End diff --
    
    Is it really necessary to override if that's a default behavior?


> Splunk Processors
> -----------------
>
>                 Key: NIFI-1420
>                 URL: https://issues.apache.org/jira/browse/NIFI-1420
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Extensions
>            Reporter: Bryan Bende
>            Assignee: Bryan Bende
>            Priority: Minor
>             Fix For: 0.6.0
>
>
> To continue improving NiFi's ability to collect logs, a good integration 
> point would be to have a processor that could listen for data from a Splunk 
> forwarder (https://docs.splunk.com/Splexicon:Universalforwarder). Being able 
> to push log messages to Splunk would also be useful.
> Splunk provides an SDK that may be helpful:
> https://github.com/splunk/splunk-sdk-java 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to