[
https://issues.apache.org/jira/browse/NIFI-1420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15167517#comment-15167517
]
ASF GitHub Bot commented on NIFI-1420:
--------------------------------------
Github user bbende commented on a diff in the pull request:
https://github.com/apache/nifi/pull/233#discussion_r54134089
--- Diff:
nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
---
@@ -0,0 +1,474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.put;
+
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processor.util.put.sender.ChannelSender;
+import org.apache.nifi.processor.util.put.sender.DatagramChannelSender;
+import org.apache.nifi.processor.util.put.sender.SSLSocketChannelSender;
+import org.apache.nifi.processor.util.put.sender.SocketChannelSender;
+
+import javax.net.ssl.SSLContext;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A base class for processors that send data to an external system using
TCP or UDP.
+ */
+public abstract class AbstractPutEventProcessor extends
AbstractSessionFactoryProcessor {
+
+ public static final PropertyDescriptor HOSTNAME = new
PropertyDescriptor.Builder()
+ .name("Hostname")
+ .description("The ip address or hostname of the destination.")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .defaultValue("localhost")
+ .required(true)
+ .build();
+ public static final PropertyDescriptor PORT = new PropertyDescriptor
+ .Builder().name("Port")
+ .description("The port on the destination.")
+ .required(true)
+ .addValidator(StandardValidators.PORT_VALIDATOR)
+ .build();
+ public static final PropertyDescriptor MAX_SOCKET_SEND_BUFFER_SIZE =
new PropertyDescriptor.Builder()
+ .name("Max Size of Socket Send Buffer")
+ .description("The maximum size of the socket send buffer that
should be used. This is a suggestion to the Operating System " +
+ "to indicate how big the socket buffer should be. If
this value is set too low, the buffer may fill up before " +
+ "the data can be read, and incoming data will be
dropped.")
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .defaultValue("1 MB")
+ .required(true)
+ .build();
+ public static final PropertyDescriptor CHARSET = new
PropertyDescriptor.Builder()
+ .name("Character Set")
+ .description("Specifies the character set of the data being
sent.")
+ .required(true)
+ .defaultValue("UTF-8")
+ .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+ .build();
+ public static final PropertyDescriptor TIMEOUT = new
PropertyDescriptor.Builder()
+ .name("Timeout")
+ .description("The timeout for connecting to and communicating
with the destination. Does not apply to UDP")
+ .required(false)
+ .defaultValue("10 seconds")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .build();
+ public static final PropertyDescriptor IDLE_EXPIRATION = new
PropertyDescriptor
+ .Builder().name("Idle Connection Expiration")
+ .description("The amount of time a connection should be held
open without being used before closing the connection.")
+ .required(true)
+ .defaultValue("5 seconds")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .build();
+
+ // Putting these properties here so sub-classes don't have to redefine
them, but they are
+ // not added to the properties by default since not all processors may
need them
+
+ public static final AllowableValue TCP_VALUE = new
AllowableValue("TCP", "TCP");
+ public static final AllowableValue UDP_VALUE = new
AllowableValue("UDP", "UDP");
+
+ public static final PropertyDescriptor PROTOCOL = new
PropertyDescriptor
+ .Builder().name("Protocol")
+ .description("The protocol for communication.")
+ .required(true)
+ .allowableValues(TCP_VALUE, UDP_VALUE)
+ .defaultValue(UDP_VALUE.getValue())
+ .build();
+ public static final PropertyDescriptor MESSAGE_DELIMITER = new
PropertyDescriptor.Builder()
+ .name("Message Delimiter")
+ .description("Specifies the delimiter to use for splitting
apart multiple messages within a single FlowFile. "
+ + "If not specified, the entire content of the
FlowFile will be used as a single message. "
+ + "If specified, the contents of the FlowFile will be
split on this delimiter and each section "
+ + "sent as a separate message. Note that if messages
are delimited and some messages for a given FlowFile "
+ + "are transferred successfully while others are not,
the messages will be split into individual FlowFiles, such that those "
+ + "messages that were successfully sent are routed to
the 'success' relationship while other messages are sent to the 'failure' "
+ + "relationship.")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .build();
+
+ public static final Relationship REL_SUCCESS = new
Relationship.Builder()
+ .name("success")
+ .description("FlowFiles that are sent successfully to the
destination are sent out this relationship.")
+ .build();
+ public static final Relationship REL_FAILURE = new
Relationship.Builder()
+ .name("failure")
+ .description("FlowFiles that failed to send to the destination
are sent out this relationship.")
+ .build();
+
+ private Set<Relationship> relationships;
+ private List<PropertyDescriptor> descriptors;
+
+ protected volatile String transitUri;
+ protected volatile BlockingQueue<ChannelSender> senderPool;
+
+ protected final BlockingQueue<FlowFileMessageBatch> completeBatches =
new LinkedBlockingQueue<>();
+ protected final Set<FlowFileMessageBatch> activeBatches =
Collections.synchronizedSet(new HashSet<FlowFileMessageBatch>());
+
+ @Override
+ protected void init(final ProcessorInitializationContext context) {
+ final List<PropertyDescriptor> descriptors = new ArrayList<>();
+ descriptors.add(HOSTNAME);
+ descriptors.add(PORT);
+ descriptors.add(MAX_SOCKET_SEND_BUFFER_SIZE);
+ descriptors.add(CHARSET);
+ descriptors.add(TIMEOUT);
+ descriptors.add(IDLE_EXPIRATION);
+ descriptors.addAll(getAdditionalProperties());
+ this.descriptors = Collections.unmodifiableList(descriptors);
+
+ final Set<Relationship> relationships = new HashSet<>();
+ relationships.add(REL_SUCCESS);
+ relationships.add(REL_FAILURE);
+ relationships.addAll(getAdditionalRelationships());
+ this.relationships = Collections.unmodifiableSet(relationships);
+ }
+
+ /**
+ * Override to provide additional relationships for the processor.
+ *
+ * @return a list of relationships
+ */
+ protected List<Relationship> getAdditionalRelationships() {
+ return Collections.EMPTY_LIST;
+ }
+
+ /**
+ * Override to provide additional properties for the processor.
+ *
+ * @return a list of properties
+ */
+ protected List<PropertyDescriptor> getAdditionalProperties() {
+ return Collections.EMPTY_LIST;
+ }
--- End diff --
The idea was to make getSupportedPropertyDescriptors() final so that the
properties provided by the base class had to be there, and then provide
getAdditionalProperties() to allow a subclass to add additional properties.
> Splunk Processors
> -----------------
>
> Key: NIFI-1420
> URL: https://issues.apache.org/jira/browse/NIFI-1420
> Project: Apache NiFi
> Issue Type: Improvement
> Components: Extensions
> Reporter: Bryan Bende
> Assignee: Bryan Bende
> Priority: Minor
> Fix For: 0.6.0
>
>
> To continue improving NiFi's ability to collect logs, a good integration
> point would be to have a processor that could listen for data from a Splunk
> forwarder (https://docs.splunk.com/Splexicon:Universalforwarder). Being able
> to push log messages to Splunk would also be useful.
> Splunk provides an SDK that may be helpful:
> https://github.com/splunk/splunk-sdk-java
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)