NIFI-1420 Adding Splunk bundle containing PutSplunk, and GetSplunk, and adding 
a ListenTCP processor to standard processors. Refactored internal code from 
PutSyslog to create a generic AbstractPutEventProcessor which PutSplunk extends 
from.

This closes #233


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/6f5fb594
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/6f5fb594
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/6f5fb594

Branch: refs/heads/master
Commit: 6f5fb5947911baf3da6892310d01148d8aa50f30
Parents: 4ce7b67
Author: Bryan Bende <[email protected]>
Authored: Mon Mar 7 18:07:06 2016 -0500
Committer: Bryan Bende <[email protected]>
Committed: Mon Mar 7 18:21:17 2016 -0500

----------------------------------------------------------------------
 nifi-assembly/pom.xml                           |   5 +
 .../listen/AbstractListenEventProcessor.java    |   4 +
 .../dispatcher/SocketChannelDispatcher.java     |  31 +-
 .../handler/socket/SSLSocketChannelHandler.java |  19 +-
 .../socket/StandardSocketChannelHandler.java    |  25 +-
 .../util/put/AbstractPutEventProcessor.java     | 476 ++++++++++++++++
 .../util/put/sender/ChannelSender.java          | 103 ++++
 .../util/put/sender/DatagramChannelSender.java  |  80 +++
 .../util/put/sender/SSLSocketChannelSender.java |  71 +++
 .../util/put/sender/SocketChannelSender.java    |  97 ++++
 .../remote/io/socket/ssl/SSLSocketChannel.java  |  24 +-
 .../nifi-splunk-bundle/nifi-splunk-nar/pom.xml  |  42 ++
 .../src/main/resources/META-INF/LICENSE         | 203 +++++++
 .../src/main/resources/META-INF/NOTICE          |  24 +
 .../nifi-splunk-processors/pom.xml              |  68 +++
 .../nifi/processors/splunk/GetSplunk.java       | 543 +++++++++++++++++++
 .../nifi/processors/splunk/PutSplunk.java       | 342 ++++++++++++
 .../org.apache.nifi.processor.Processor         |  16 +
 .../nifi/processors/splunk/TestGetSplunk.java   | 283 ++++++++++
 .../nifi/processors/splunk/TestPutSplunk.java   | 370 +++++++++++++
 .../processors/splunk/util/LogGenerator.java    |  73 +++
 nifi-nar-bundles/nifi-splunk-bundle/pom.xml     |  59 ++
 .../standard/AbstractSyslogProcessor.java       |   8 +
 .../nifi/processors/standard/ListenTCP.java     | 226 ++++++++
 .../nifi/processors/standard/PutSyslog.java     | 215 +-------
 .../org.apache.nifi.processor.Processor         |   1 +
 .../standard/TestListenAndPutSyslog.java        | 175 ++++++
 .../processors/standard/TestListenRELP.java     |  24 +-
 .../processors/standard/TestListenSyslog.java   |  13 +-
 .../nifi/processors/standard/TestListenTCP.java | 275 ++++++++++
 .../nifi/processors/standard/TestPutSyslog.java |  51 +-
 nifi-nar-bundles/pom.xml                        |   1 +
 pom.xml                                         |   6 +
 33 files changed, 3711 insertions(+), 242 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/6f5fb594/nifi-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
index a7dfb3f..6088e63 100644
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -275,6 +275,11 @@ language governing permissions and limitations under the 
License. -->
         <dependency>
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-amqp-nar</artifactId>
+               <type>nar</type>
+           </dependency>
+           <dependency>
+               <groupId>org.apache.nifi</groupId>
+               <artifactId>nifi-splunk-nar</artifactId>
             <type>nar</type>
         </dependency>
     </dependencies>

http://git-wip-us.apache.org/repos/asf/nifi/blob/6f5fb594/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java
 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java
index d56255d..84fa5dc 100644
--- 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java
+++ 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java
@@ -220,6 +220,10 @@ public abstract class AbstractListenEventProcessor<E 
extends Event> extends Abst
         return errorEvents.size();
     }
 
+    public int getQueueSize() {
+        return events == null ? 0 : events.size();
+    }
+
     @OnUnscheduled
     public void onUnscheduled() {
         if (dispatcher != null) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/6f5fb594/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java
 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java
index da5c414..670dba4 100644
--- 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java
+++ 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java
@@ -22,6 +22,7 @@ import org.apache.nifi.processor.util.listen.event.Event;
 import org.apache.nifi.processor.util.listen.event.EventFactory;
 import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory;
 import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
+import org.apache.nifi.security.util.SslContextFactory;
 
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
@@ -56,6 +57,7 @@ public class SocketChannelDispatcher<E extends 
Event<SocketChannel>> implements
     private final ProcessorLog logger;
     private final int maxConnections;
     private final SSLContext sslContext;
+    private final SslContextFactory.ClientAuth clientAuth;
     private final Charset charset;
 
     private ExecutorService executor;
@@ -64,6 +66,16 @@ public class SocketChannelDispatcher<E extends 
Event<SocketChannel>> implements
     private final BlockingQueue<SelectionKey> keyQueue;
     private final AtomicInteger currentConnections = new AtomicInteger(0);
 
+    public SocketChannelDispatcher(final EventFactory<E> eventFactory,
+                                   final ChannelHandlerFactory<E, 
AsyncChannelDispatcher> handlerFactory,
+                                   final BlockingQueue<ByteBuffer> bufferPool,
+                                   final BlockingQueue<E> events,
+                                   final ProcessorLog logger,
+                                   final int maxConnections,
+                                   final SSLContext sslContext,
+                                   final Charset charset) {
+        this(eventFactory, handlerFactory, bufferPool, events, logger, 
maxConnections, sslContext, SslContextFactory.ClientAuth.REQUIRED, charset);
+    }
 
     public SocketChannelDispatcher(final EventFactory<E> eventFactory,
                                    final ChannelHandlerFactory<E, 
AsyncChannelDispatcher> handlerFactory,
@@ -72,6 +84,7 @@ public class SocketChannelDispatcher<E extends 
Event<SocketChannel>> implements
                                    final ProcessorLog logger,
                                    final int maxConnections,
                                    final SSLContext sslContext,
+                                   final SslContextFactory.ClientAuth 
clientAuth,
                                    final Charset charset) {
         this.eventFactory = eventFactory;
         this.handlerFactory = handlerFactory;
@@ -81,6 +94,7 @@ public class SocketChannelDispatcher<E extends 
Event<SocketChannel>> implements
         this.maxConnections = maxConnections;
         this.keyQueue = new LinkedBlockingQueue<>(maxConnections);
         this.sslContext = sslContext;
+        this.clientAuth = clientAuth;
         this.charset = charset;
 
         if (bufferPool == null || bufferPool.size() == 0 || bufferPool.size() 
!= maxConnections) {
@@ -152,7 +166,22 @@ public class SocketChannelDispatcher<E extends 
Event<SocketChannel>> implements
                             SSLSocketChannel sslSocketChannel = null;
                             if (sslContext != null) {
                                 final SSLEngine sslEngine = 
sslContext.createSSLEngine();
-                                sslSocketChannel = new 
SSLSocketChannel(sslEngine, socketChannel, false);
+                                sslEngine.setUseClientMode(false);
+
+                                switch (clientAuth) {
+                                    case REQUIRED:
+                                        sslEngine.setNeedClientAuth(true);
+                                        break;
+                                    case WANT:
+                                        sslEngine.setWantClientAuth(true);
+                                        break;
+                                    case NONE:
+                                        sslEngine.setNeedClientAuth(false);
+                                        sslEngine.setWantClientAuth(false);
+                                        break;
+                                }
+
+                                sslSocketChannel = new 
SSLSocketChannel(sslEngine, socketChannel);
                             }
 
                             // Attach the buffer and SSLSocketChannel to the 
key

http://git-wip-us.apache.org/repos/asf/nifi/blob/6f5fb594/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SSLSocketChannelHandler.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SSLSocketChannelHandler.java
 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SSLSocketChannelHandler.java
index 6a2c6f8..460ef08 100644
--- 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SSLSocketChannelHandler.java
+++ 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SSLSocketChannelHandler.java
@@ -129,17 +129,20 @@ public class SSLSocketChannelHandler<E extends 
Event<SocketChannel>> extends Soc
         // go through the buffer looking for the end of each message
         for (int i = 0; i < bytesRead; i++) {
             final byte currByte = buffer[i];
-            currBytes.write(currByte);
 
             // check if at end of a message
             if (currByte == getDelimiter()) {
-                final SSLSocketChannelResponder response = new 
SSLSocketChannelResponder(socketChannel, sslSocketChannel);
-                final Map<String,String> metadata = 
EventFactoryUtil.createMapWithSender(sender.toString());
-
-                // queue the raw event blocking until space is available, 
reset the temporary buffer
-                final E event = eventFactory.create(currBytes.toByteArray(), 
metadata, response);
-                events.put(event);
-                currBytes.reset();
+                if (currBytes.size() > 0) {
+                    final SSLSocketChannelResponder response = new 
SSLSocketChannelResponder(socketChannel, sslSocketChannel);
+                    final Map<String, String> metadata = 
EventFactoryUtil.createMapWithSender(sender.toString());
+
+                    // queue the raw event blocking until space is available, 
reset the temporary buffer
+                    final E event = 
eventFactory.create(currBytes.toByteArray(), metadata, response);
+                    events.put(event);
+                    currBytes.reset();
+                }
+            } else {
+                currBytes.write(currByte);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/6f5fb594/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/StandardSocketChannelHandler.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/StandardSocketChannelHandler.java
 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/StandardSocketChannelHandler.java
index f12e705..e2fd3a8 100644
--- 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/StandardSocketChannelHandler.java
+++ 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/StandardSocketChannelHandler.java
@@ -131,20 +131,23 @@ public class StandardSocketChannelHandler<E extends 
Event<SocketChannel>> extend
             // NOTE: For higher throughput, the looking for \n and copying 
into the byte stream could be improved
             // Pull data out of buffer and cram into byte array
             byte currByte = socketBuffer.get();
-            currBytes.write(currByte);
 
             // check if at end of a message
             if (currByte == getDelimiter()) {
-                final SocketChannelResponder response = new 
SocketChannelResponder(socketChannel);
-                final Map<String,String> metadata = 
EventFactoryUtil.createMapWithSender(sender.toString());
-
-                // queue the raw event blocking until space is available, 
reset the buffer
-                final E event = eventFactory.create(currBytes.toByteArray(), 
metadata, response);
-                events.put(event);
-                currBytes.reset();
-
-                // Mark this as the start of the next message
-                socketBuffer.mark();
+                if (currBytes.size() > 0) {
+                    final SocketChannelResponder response = new 
SocketChannelResponder(socketChannel);
+                    final Map<String, String> metadata = 
EventFactoryUtil.createMapWithSender(sender.toString());
+
+                    // queue the raw event blocking until space is available, 
reset the buffer
+                    final E event = 
eventFactory.create(currBytes.toByteArray(), metadata, response);
+                    events.put(event);
+                    currBytes.reset();
+
+                    // Mark this as the start of the next message
+                    socketBuffer.mark();
+                }
+            } else {
+                currBytes.write(currByte);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/6f5fb594/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
new file mode 100644
index 0000000..c7313dc
--- /dev/null
+++ 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
@@ -0,0 +1,476 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.put;
+
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processor.util.put.sender.ChannelSender;
+import org.apache.nifi.processor.util.put.sender.DatagramChannelSender;
+import org.apache.nifi.processor.util.put.sender.SSLSocketChannelSender;
+import org.apache.nifi.processor.util.put.sender.SocketChannelSender;
+
+import javax.net.ssl.SSLContext;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A base class for processors that send data to an external system using TCP 
or UDP.
+ */
+public abstract class AbstractPutEventProcessor extends 
AbstractSessionFactoryProcessor {
+
+    public static final PropertyDescriptor HOSTNAME = new 
PropertyDescriptor.Builder()
+            .name("Hostname")
+            .description("The ip address or hostname of the destination.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .defaultValue("localhost")
+            .required(true)
+            .build();
+    public static final PropertyDescriptor PORT = new PropertyDescriptor
+            .Builder().name("Port")
+            .description("The port on the destination.")
+            .required(true)
+            .addValidator(StandardValidators.PORT_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor MAX_SOCKET_SEND_BUFFER_SIZE = new 
PropertyDescriptor.Builder()
+            .name("Max Size of Socket Send Buffer")
+            .description("The maximum size of the socket send buffer that 
should be used. This is a suggestion to the Operating System " +
+                    "to indicate how big the socket buffer should be. If this 
value is set too low, the buffer may fill up before " +
+                    "the data can be read, and incoming data will be dropped.")
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .defaultValue("1 MB")
+            .required(true)
+            .build();
+    public static final PropertyDescriptor CHARSET = new 
PropertyDescriptor.Builder()
+            .name("Character Set")
+            .description("Specifies the character set of the data being sent.")
+            .required(true)
+            .defaultValue("UTF-8")
+            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor TIMEOUT = new 
PropertyDescriptor.Builder()
+            .name("Timeout")
+            .description("The timeout for connecting to and communicating with 
the destination. Does not apply to UDP")
+            .required(false)
+            .defaultValue("10 seconds")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor IDLE_EXPIRATION = new 
PropertyDescriptor
+            .Builder().name("Idle Connection Expiration")
+            .description("The amount of time a connection should be held open 
without being used before closing the connection.")
+            .required(true)
+            .defaultValue("5 seconds")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+
+    // Putting these properties here so sub-classes don't have to redefine 
them, but they are
+    // not added to the properties by default since not all processors may 
need them
+
+    public static final AllowableValue TCP_VALUE = new AllowableValue("TCP", 
"TCP");
+    public static final AllowableValue UDP_VALUE = new AllowableValue("UDP", 
"UDP");
+
+    public static final PropertyDescriptor PROTOCOL = new PropertyDescriptor
+            .Builder().name("Protocol")
+            .description("The protocol for communication.")
+            .required(true)
+            .allowableValues(TCP_VALUE, UDP_VALUE)
+            .defaultValue(UDP_VALUE.getValue())
+            .build();
+    public static final PropertyDescriptor MESSAGE_DELIMITER = new 
PropertyDescriptor.Builder()
+            .name("Message Delimiter")
+            .description("Specifies the delimiter to use for splitting apart 
multiple messages within a single FlowFile. "
+                    + "If not specified, the entire content of the FlowFile 
will be used as a single message. "
+                    + "If specified, the contents of the FlowFile will be 
split on this delimiter and each section "
+                    + "sent as a separate message. Note that if messages are 
delimited and some messages for a given FlowFile "
+                    + "are transferred successfully while others are not, the 
messages will be split into individual FlowFiles, such that those "
+                    + "messages that were successfully sent are routed to the 
'success' relationship while other messages are sent to the 'failure' "
+                    + "relationship.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("FlowFiles that are sent successfully to the 
destination are sent out this relationship.")
+            .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("FlowFiles that failed to send to the destination are 
sent out this relationship.")
+            .build();
+
+    private Set<Relationship> relationships;
+    private List<PropertyDescriptor> descriptors;
+
+    protected volatile String transitUri;
+    protected volatile BlockingQueue<ChannelSender> senderPool;
+
+    protected final BlockingQueue<FlowFileMessageBatch> completeBatches = new 
LinkedBlockingQueue<>();
+    protected final Set<FlowFileMessageBatch> activeBatches = 
Collections.synchronizedSet(new HashSet<FlowFileMessageBatch>());
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(HOSTNAME);
+        descriptors.add(PORT);
+        descriptors.add(MAX_SOCKET_SEND_BUFFER_SIZE);
+        descriptors.add(CHARSET);
+        descriptors.add(TIMEOUT);
+        descriptors.add(IDLE_EXPIRATION);
+        descriptors.addAll(getAdditionalProperties());
+        this.descriptors = Collections.unmodifiableList(descriptors);
+
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        relationships.addAll(getAdditionalRelationships());
+        this.relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    /**
+     * Override to provide additional relationships for the processor.
+     *
+     * @return a list of relationships
+     */
+    protected List<Relationship> getAdditionalRelationships() {
+        return Collections.EMPTY_LIST;
+    }
+
+    /**
+     * Override to provide additional properties for the processor.
+     *
+     * @return a list of properties
+     */
+    protected List<PropertyDescriptor> getAdditionalProperties() {
+        return Collections.EMPTY_LIST;
+    }
+
+    @Override
+    public final Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) throws IOException {
+        // initialize the queue of senders, one per task, senders will get 
created on the fly in onTrigger
+        this.senderPool = new 
LinkedBlockingQueue<>(context.getMaxConcurrentTasks());
+        this.transitUri = createTransitUri(context);
+    }
+
+    @OnStopped
+    public void closeSenders() {
+        if (senderPool != null) {
+            ChannelSender sender = senderPool.poll();
+            while (sender != null) {
+                sender.close();
+                sender = senderPool.poll();
+            }
+        }
+    }
+
+    /**
+     * Sub-classes construct a transit uri for provenance events. Called from 
@OnScheduled
+     * method of this class.
+     *
+     * @param context the current context
+     *
+     * @return the transit uri
+     */
+    protected abstract String createTransitUri(final ProcessContext context);
+
+    /**
+     * Sub-classes create a ChannelSender given a context.
+     *
+     * @param context the current context
+     * @return an implementation of ChannelSender
+     * @throws IOException if an error occurs creating the ChannelSender
+     */
+    protected abstract ChannelSender createSender(final ProcessContext 
context) throws IOException;
+
+    /**
+     * Close any senders that haven't been active with in the given threshold
+     *
+     * @param idleThreshold the threshold to consider a sender as idle
+     */
+    protected void pruneIdleSenders(final long idleThreshold) {
+        long currentTime = System.currentTimeMillis();
+        final List<ChannelSender> putBack = new ArrayList<>();
+
+        // if a connection hasn't been used with in the threshold then it gets 
closed
+        ChannelSender sender;
+        while ((sender = senderPool.poll()) != null) {
+            if (currentTime > (sender.getLastUsed() + idleThreshold)) {
+                getLogger().debug("Closing idle connection...");
+                sender.close();
+            } else {
+                putBack.add(sender);
+            }
+        }
+        // re-queue senders that weren't idle, but if the queue is full then 
close the sender
+        for (ChannelSender putBackSender : putBack) {
+            boolean returned = senderPool.offer(putBackSender);
+            if (!returned) {
+                putBackSender.close();
+            }
+        }
+    }
+
+    /**
+     * Helper for sub-classes to create a sender.
+     *
+     * @param protocol the protocol for the sender
+     * @param host the host to send to
+     * @param port the port to send to
+     * @param timeout the timeout for connecting and communicating over the 
channel
+     * @param maxSendBufferSize the maximum size of the socket send buffer
+     * @param sslContext an SSLContext, or null if not using SSL
+     *
+     * @return a ChannelSender based on the given properties
+     *
+     * @throws IOException if an error occurs creating the sender
+     */
+    protected ChannelSender createSender(final String protocol,
+                                         final String host,
+                                         final int port,
+                                         final int timeout,
+                                         final int maxSendBufferSize,
+                                         final SSLContext sslContext) throws 
IOException {
+
+        ChannelSender sender;
+        if (protocol.equals(UDP_VALUE.getValue())) {
+            sender = new DatagramChannelSender(host, port, maxSendBufferSize, 
getLogger());
+        } else {
+            // if an SSLContextService is provided then we make a secure sender
+            if (sslContext != null) {
+                sender = new SSLSocketChannelSender(host, port, 
maxSendBufferSize, sslContext, getLogger());
+            } else {
+                sender = new SocketChannelSender(host, port, 
maxSendBufferSize, getLogger());
+            }
+        }
+
+        sender.setTimeout(timeout);
+        sender.open();
+        return sender;
+    }
+
+    /**
+     * Represents a range of messages from a FlowFile.
+     */
+    protected static class Range {
+        private final long start;
+        private final long end;
+
+        public Range(final long start, final long end) {
+            this.start = start;
+            this.end = end;
+        }
+
+        public long getStart() {
+            return start;
+        }
+
+        public long getEnd() {
+            return end;
+        }
+
+        @Override
+        public String toString() {
+            return "Range[" + start + "-" + end + "]";
+        }
+    }
+
+    /**
+     * A wrapper to hold the ranges of a FlowFile that were successful and 
ranges that failed, and then
+     * transfer those ranges appropriately.
+     */
+    protected class FlowFileMessageBatch {
+
+        private final ProcessSession session;
+        private final FlowFile flowFile;
+        private final long startTime = System.nanoTime();
+
+        private final List<Range> successfulRanges = new ArrayList<>();
+        private final List<Range> failedRanges = new ArrayList<>();
+
+        private Exception lastFailureReason;
+        private long numMessages = -1L;
+        private long completeTime = 0L;
+        private boolean canceled = false;
+
+        public FlowFileMessageBatch(final ProcessSession session, final 
FlowFile flowFile) {
+            this.session = session;
+            this.flowFile = flowFile;
+        }
+
+        public synchronized void cancelOrComplete() {
+            if (isComplete()) {
+                completeSession();
+                return;
+            }
+
+            this.canceled = true;
+
+            session.rollback();
+            successfulRanges.clear();
+            failedRanges.clear();
+        }
+
+        public synchronized void addSuccessfulRange(final long start, final 
long end) {
+            if (canceled) {
+                return;
+            }
+
+            successfulRanges.add(new Range(start, end));
+
+            if (isComplete()) {
+                activeBatches.remove(this);
+                completeBatches.add(this);
+                completeTime = System.nanoTime();
+            }
+        }
+
+        public synchronized void addFailedRange(final long start, final long 
end, final Exception e) {
+            if (canceled) {
+                return;
+            }
+
+            failedRanges.add(new Range(start, end));
+            lastFailureReason = e;
+
+            if (isComplete()) {
+                activeBatches.remove(this);
+                completeBatches.add(this);
+                completeTime = System.nanoTime();
+            }
+        }
+
+        private boolean isComplete() {
+            return !canceled && (numMessages > -1) && (successfulRanges.size() 
+ failedRanges.size() >= numMessages);
+        }
+
+        public synchronized void setNumMessages(final long msgCount) {
+            this.numMessages = msgCount;
+
+            if (isComplete()) {
+                activeBatches.remove(this);
+                completeBatches.add(this);
+                completeTime = System.nanoTime();
+            }
+        }
+
+        private void transferRanges(final List<Range> ranges, final 
Relationship relationship) {
+            Collections.sort(ranges, new Comparator<Range>() {
+                @Override
+                public int compare(final Range o1, final Range o2) {
+                    return Long.compare(o1.getStart(), o2.getStart());
+                }
+            });
+
+            for (int i = 0; i < ranges.size(); i++) {
+                Range range = ranges.get(i);
+                int count = 1;
+
+                while (i + 1 < ranges.size()) {
+                    // Check if the next range in the List continues where 
this one left off.
+                    final Range nextRange = ranges.get(i + 1);
+
+                    if (nextRange.getStart() == range.getEnd()) {
+                        // We have two ranges in a row that are contiguous; 
combine them into a single Range.
+                        range = new Range(range.getStart(), 
nextRange.getEnd());
+
+                        count++;
+                        i++;
+                    } else {
+                        break;
+                    }
+                }
+
+                // Create a FlowFile for this range.
+                FlowFile child = session.clone(flowFile, range.getStart(), 
range.getEnd() - range.getStart());
+                if (relationship == REL_SUCCESS) {
+                    session.getProvenanceReporter().send(child, transitUri, 
"Sent " + count + " messages");
+                    session.transfer(child, relationship);
+                } else {
+                    child = session.penalize(child);
+                    session.transfer(child, relationship);
+                }
+            }
+        }
+
+        public synchronized void completeSession() {
+            if (canceled) {
+                return;
+            }
+
+            if (successfulRanges.isEmpty() && failedRanges.isEmpty()) {
+                getLogger().info("Completed processing {} but sent 0 
FlowFiles", new Object[] {flowFile});
+                session.transfer(flowFile, REL_SUCCESS);
+                session.commit();
+                return;
+            }
+
+            if (successfulRanges.isEmpty()) {
+                getLogger().error("Failed to send {}; routing to 'failure'; 
last failure reason reported was {};", new Object[] {flowFile, 
lastFailureReason});
+                final FlowFile penalizedFlowFile = session.penalize(flowFile);
+                session.transfer(penalizedFlowFile, REL_FAILURE);
+                session.commit();
+                return;
+            }
+
+            if (failedRanges.isEmpty()) {
+                final long transferMillis = 
TimeUnit.NANOSECONDS.toMillis(completeTime - startTime);
+                session.getProvenanceReporter().send(flowFile, transitUri, 
"Sent " + successfulRanges.size() + " messages;", transferMillis);
+                session.transfer(flowFile, REL_SUCCESS);
+                getLogger().info("Successfully sent {} messages for {} in {} 
millis", new Object[] {successfulRanges.size(), flowFile, transferMillis});
+                session.commit();
+                return;
+            }
+
+            // At this point, the successful ranges is not empty and the 
failed ranges is not empty. This indicates that some messages made their way
+            // successfully and some failed. We will address this by splitting 
apart the source FlowFile into children and sending the successful messages to 
'success'
+            // and the failed messages to 'failure'.
+            transferRanges(successfulRanges, REL_SUCCESS);
+            transferRanges(failedRanges, REL_FAILURE);
+            session.remove(flowFile);
+            getLogger().error("Successfully sent {} messages, but failed to 
send {} messages; the last error received was {}",
+                    new Object[] {successfulRanges.size(), 
failedRanges.size(), lastFailureReason});
+            session.commit();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/6f5fb594/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/ChannelSender.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/ChannelSender.java
 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/ChannelSender.java
new file mode 100644
index 0000000..8c92b1f
--- /dev/null
+++ 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/ChannelSender.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.put.sender;
+
+import org.apache.nifi.logging.ProcessorLog;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+
+/**
+ * Base class for sending messages over a channel.
+ */
+public abstract class ChannelSender {
+
+    protected final int port;
+    protected final String host;
+    protected final int maxSendBufferSize;
+    protected final ProcessorLog logger;
+
+    protected volatile int timeout = 10000;
+    protected volatile long lastUsed;
+
+    public ChannelSender(final String host, final int port, final int 
maxSendBufferSize, final ProcessorLog logger) {
+        this.port = port;
+        this.host = host;
+        this.maxSendBufferSize = maxSendBufferSize;
+        this.logger = logger;
+    }
+
+    public void setTimeout(int timeout) {
+        this.timeout = timeout;
+    }
+
+    public int getTimeout() {
+        return timeout;
+    }
+
+    /**
+     * @return the last time data was sent over this channel
+     */
+    public long getLastUsed() {
+        return lastUsed;
+    }
+
+    /**
+     * Opens the connection to the destination.
+     *
+     * @throws IOException if an error occurred opening the connection.
+     */
+    public abstract void open() throws IOException;
+
+    /**
+     * Sends the given string over the channel.
+     *
+     * @param message the message to send over the channel
+     * @throws IOException if there was an error communicating over the channel
+     */
+    public void send(final String message, final Charset charset) throws 
IOException {
+        final byte[] bytes = message.getBytes(charset);
+        send(bytes);
+    }
+
+    /**
+     * Sends the given data over the channel.
+     *
+     * @param data the data to send over the channel
+     * @throws IOException if there was an error communicating over the channel
+     */
+    public void send(final byte[] data) throws IOException {
+        write(data);
+        lastUsed = System.currentTimeMillis();
+    }
+
+    /**
+     * Write the given buffer to the underlying channel.
+     */
+    protected abstract void write(byte[] data) throws IOException;
+
+    /**
+     * @return true if the underlying channel is connected
+     */
+    public abstract boolean isConnected();
+
+    /**
+     * Close the underlying channel
+     */
+    public abstract void close();
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/6f5fb594/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/DatagramChannelSender.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/DatagramChannelSender.java
 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/DatagramChannelSender.java
new file mode 100644
index 0000000..632c6e5
--- /dev/null
+++ 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/DatagramChannelSender.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.put.sender;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.logging.ProcessorLog;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.StandardSocketOptions;
+import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
+
+/**
+ * Sends messages over a DatagramChannel.
+ */
+public class DatagramChannelSender extends ChannelSender {
+
+    private DatagramChannel channel;
+
+    public DatagramChannelSender(final String host, final int port, final int 
maxSendBufferSize, final ProcessorLog logger) {
+        super(host, port, maxSendBufferSize, logger);
+    }
+
+    @Override
+    public void open() throws IOException {
+        if (channel == null) {
+            channel = DatagramChannel.open();
+
+            if (maxSendBufferSize > 0) {
+                channel.setOption(StandardSocketOptions.SO_SNDBUF, 
maxSendBufferSize);
+                final int actualSendBufSize = 
channel.getOption(StandardSocketOptions.SO_SNDBUF);
+                if (actualSendBufSize < maxSendBufferSize) {
+                    logger.warn("Attempted to set Socket Send Buffer Size to " 
+ maxSendBufferSize
+                            + " bytes but could only set to " + 
actualSendBufSize + "bytes. You may want to "
+                            + "consider changing the Operating System's 
maximum receive buffer");
+                }
+            }
+        }
+
+        if (!channel.isConnected()) {
+            channel.connect(new InetSocketAddress(InetAddress.getByName(host), 
port));
+        }
+    }
+
+    @Override
+    protected void write(byte[] data) throws IOException {
+        ByteBuffer buffer = ByteBuffer.wrap(data);
+        while (buffer.hasRemaining()) {
+            channel.write(buffer);
+        }
+    }
+
+    @Override
+    public boolean isConnected() {
+        return channel != null && channel.isConnected();
+    }
+
+    @Override
+    public void close() {
+        IOUtils.closeQuietly(channel);
+        channel = null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/6f5fb594/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/SSLSocketChannelSender.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/SSLSocketChannelSender.java
 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/SSLSocketChannelSender.java
new file mode 100644
index 0000000..dc85d80
--- /dev/null
+++ 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/SSLSocketChannelSender.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.put.sender;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.logging.ProcessorLog;
+import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
+
+import javax.net.ssl.SSLContext;
+import java.io.IOException;
+
+/**
+ * Sends messages over an SSLSocketChannel.
+ */
+public class SSLSocketChannelSender extends SocketChannelSender {
+
+    private SSLSocketChannel sslChannel;
+    private SSLContext sslContext;
+
+    public SSLSocketChannelSender(final String host,
+                                  final int port,
+                                  final int maxSendBufferSize,
+                                  final SSLContext sslContext,
+                                  final ProcessorLog logger) {
+        super(host, port, maxSendBufferSize, logger);
+        this.sslContext = sslContext;
+    }
+
+    @Override
+    public void open() throws IOException {
+        if (sslChannel == null) {
+            super.open();
+            sslChannel = new SSLSocketChannel(sslContext, channel, true);
+        }
+        sslChannel.setTimeout(timeout);
+
+        // SSLSocketChannel will check if already connected so we can safely 
call this
+        sslChannel.connect();
+    }
+
+    @Override
+    protected void write(byte[] data) throws IOException {
+        sslChannel.write(data);
+    }
+
+    @Override
+    public boolean isConnected() {
+        return sslChannel != null && !sslChannel.isClosed();
+    }
+
+    @Override
+    public void close() {
+        super.close();
+        IOUtils.closeQuietly(sslChannel);
+        sslChannel = null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/6f5fb594/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/SocketChannelSender.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/SocketChannelSender.java
 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/SocketChannelSender.java
new file mode 100644
index 0000000..fcf341a
--- /dev/null
+++ 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/sender/SocketChannelSender.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */package org.apache.nifi.processor.util.put.sender;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.logging.ProcessorLog;
+import org.apache.nifi.remote.io.socket.SocketChannelOutputStream;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketTimeoutException;
+import java.net.StandardSocketOptions;
+import java.nio.channels.SocketChannel;
+
+/**
+ * Sends messages over a SocketChannel.
+ */
+public class SocketChannelSender extends ChannelSender {
+
+    protected SocketChannel channel;
+    protected SocketChannelOutputStream socketChannelOutput;
+
+    public SocketChannelSender(final String host, final int port, final int 
maxSendBufferSize, final ProcessorLog logger) {
+        super(host, port, maxSendBufferSize, logger);
+    }
+
+    @Override
+    public void open() throws IOException {
+        if (channel == null) {
+            channel = SocketChannel.open();
+            channel.configureBlocking(false);
+
+            if (maxSendBufferSize > 0) {
+                channel.setOption(StandardSocketOptions.SO_SNDBUF, 
maxSendBufferSize);
+                final int actualSendBufSize = 
channel.getOption(StandardSocketOptions.SO_SNDBUF);
+                if (actualSendBufSize < maxSendBufferSize) {
+                    logger.warn("Attempted to set Socket Send Buffer Size to " 
+ maxSendBufferSize
+                            + " bytes but could only set to " + 
actualSendBufSize + "bytes. You may want to "
+                            + "consider changing the Operating System's 
maximum receive buffer");
+                }
+            }
+        }
+
+        if (!channel.isConnected()) {
+            final long startTime = System.currentTimeMillis();
+            final InetSocketAddress socketAddress = new 
InetSocketAddress(InetAddress.getByName(host), port);
+
+            if (!channel.connect(socketAddress)) {
+                while (!channel.finishConnect()) {
+                    if (System.currentTimeMillis() > startTime + timeout) {
+                        throw new SocketTimeoutException("Timed out connecting 
to " + host + ":" + port);
+                    }
+
+                    try {
+                        Thread.sleep(50L);
+                    } catch (final InterruptedException e) {
+                    }
+                }
+            }
+
+            socketChannelOutput = new SocketChannelOutputStream(channel);
+            socketChannelOutput.setTimeout(timeout);
+        }
+    }
+
+    @Override
+    protected void write(byte[] data) throws IOException {
+        socketChannelOutput.write(data);
+    }
+
+    @Override
+    public boolean isConnected() {
+        return channel != null && channel.isConnected();
+    }
+
+    @Override
+    public void close() {
+        IOUtils.closeQuietly(socketChannelOutput);
+        IOUtils.closeQuietly(channel);
+        socketChannelOutput = null;
+        channel = null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/6f5fb594/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java
 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java
index 408eb59..2209e38 100644
--- 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java
+++ 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java
@@ -85,10 +85,27 @@ public class SSLSocketChannel implements Closeable {
     }
 
     public SSLSocketChannel(final SSLContext sslContext, final SocketChannel 
socketChannel, final boolean client) throws IOException {
-        this(sslContext.createSSLEngine(), socketChannel, client);
+        if (!socketChannel.isConnected()) {
+            throw new IllegalArgumentException("Cannot pass an un-connected 
SocketChannel");
+        }
+
+        this.channel = socketChannel;
+
+        this.socketAddress = socketChannel.getRemoteAddress();
+        final Socket socket = socketChannel.socket();
+        this.hostname = socket.getInetAddress().getHostName();
+        this.port = socket.getPort();
+
+        this.engine = sslContext.createSSLEngine();
+        this.engine.setUseClientMode(client);
+        this.engine.setNeedClientAuth(true);
+
+        streamInManager = new 
BufferStateManager(ByteBuffer.allocate(engine.getSession().getPacketBufferSize()));
+        streamOutManager = new 
BufferStateManager(ByteBuffer.allocate(engine.getSession().getPacketBufferSize()));
+        appDataManager = new 
BufferStateManager(ByteBuffer.allocate(engine.getSession().getApplicationBufferSize()));
     }
 
-    public SSLSocketChannel(final SSLEngine sslEngine, final SocketChannel 
socketChannel, final boolean client) throws IOException {
+    public SSLSocketChannel(final SSLEngine sslEngine, final SocketChannel 
socketChannel) throws IOException {
         if (!socketChannel.isConnected()) {
             throw new IllegalArgumentException("Cannot pass an un-connected 
SocketChannel");
         }
@@ -100,9 +117,8 @@ public class SSLSocketChannel implements Closeable {
         this.hostname = socket.getInetAddress().getHostName();
         this.port = socket.getPort();
 
+        // don't set useClientMode or needClientAuth, use the engine as is and 
let the caller configure it
         this.engine = sslEngine;
-        this.engine.setUseClientMode(client);
-        this.engine.setNeedClientAuth(true);
 
         streamInManager = new 
BufferStateManager(ByteBuffer.allocate(engine.getSession().getPacketBufferSize()));
         streamOutManager = new 
BufferStateManager(ByteBuffer.allocate(engine.getSession().getPacketBufferSize()));

http://git-wip-us.apache.org/repos/asf/nifi/blob/6f5fb594/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-nar/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-nar/pom.xml 
b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-nar/pom.xml
new file mode 100644
index 0000000..10bec1c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-nar/pom.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-splunk-bundle</artifactId>
+        <version>0.6.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-splunk-nar</artifactId>
+    <version>0.6.0-SNAPSHOT</version>
+    <packaging>nar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-splunk-processors</artifactId>
+            <version>0.6.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-standard-services-api-nar</artifactId>
+            <type>nar</type>
+        </dependency>
+    </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/6f5fb594/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-nar/src/main/resources/META-INF/LICENSE
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-nar/src/main/resources/META-INF/LICENSE
 
b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-nar/src/main/resources/META-INF/LICENSE
new file mode 100644
index 0000000..6b0b127
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-nar/src/main/resources/META-INF/LICENSE
@@ -0,0 +1,203 @@
+
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [yyyy] [name of copyright owner]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+

http://git-wip-us.apache.org/repos/asf/nifi/blob/6f5fb594/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-nar/src/main/resources/META-INF/NOTICE
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-nar/src/main/resources/META-INF/NOTICE
 
b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-nar/src/main/resources/META-INF/NOTICE
new file mode 100644
index 0000000..22c3bbe
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-nar/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,24 @@
+nifi-splunk-nar
+Copyright 2015-2016 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+===========================================
+Apache Software License v2
+===========================================
+
+The following binary components are provided under the Apache Software License 
v2
+
+  (ASLv2) Apache Commons IO
+    The following NOTICE information applies:
+      Apache Commons IO
+      Copyright 2002-2012 The Apache Software Foundation
+
+  (ASLv2) Apache Commons Lang
+    The following NOTICE information applies:
+      Apache Commons Lang
+      Copyright 2001-2015 The Apache Software Foundation
+
+      This product includes software from the Spring Framework,
+      under the Apache License 2.0 (see: StringUtils.containsWhitespace())

http://git-wip-us.apache.org/repos/asf/nifi/blob/6f5fb594/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/pom.xml 
b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/pom.xml
new file mode 100644
index 0000000..a1dd6a0
--- /dev/null
+++ b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/pom.xml
@@ -0,0 +1,68 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-splunk-bundle</artifactId>
+        <version>0.6.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-splunk-processors</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.splunk</groupId>
+            <artifactId>splunk</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-ssl-context-service-api</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-processor-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-ssl-context-service</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-simple</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.11</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>

Reply via email to