http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/DatagramChannelDispatcher.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/DatagramChannelDispatcher.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/DatagramChannelDispatcher.java
new file mode 100644
index 0000000..69a1998
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/DatagramChannelDispatcher.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.listen.dispatcher;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.listen.event.Event;
+import org.apache.nifi.processor.util.listen.event.EventFactory;
+import org.apache.nifi.processor.util.listen.event.EventFactoryUtil;
+import org.apache.nifi.processor.util.listen.event.EventQueue;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.StandardSocketOptions;
+import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Reads from the Datagram channel into an available buffer. If data is read 
then the buffer is queued for
+ * processing, otherwise the buffer is returned to the buffer pool.
+ */
+public class DatagramChannelDispatcher<E extends Event<DatagramChannel>> 
implements ChannelDispatcher {
+
+    private final EventFactory<E> eventFactory;
+    private final BlockingQueue<ByteBuffer> bufferPool;
+    private final EventQueue<E> events;
+    private final ComponentLog logger;
+    private final String sendingHost;
+    private final Integer sendingPort;
+
+    private Selector selector;
+    private DatagramChannel datagramChannel;
+    private volatile boolean stopped = false;
+
+    public DatagramChannelDispatcher(final EventFactory<E> eventFactory,
+                                     final BlockingQueue<ByteBuffer> 
bufferPool,
+                                     final BlockingQueue<E> events,
+                                     final ComponentLog logger) {
+        this(eventFactory, bufferPool, events, logger, null, null);
+    }
+
+    public DatagramChannelDispatcher(final EventFactory<E> eventFactory,
+                                     final BlockingQueue<ByteBuffer> 
bufferPool,
+                                     final BlockingQueue<E> events,
+                                     final ComponentLog logger,
+                                     final String sendingHost,
+                                     final Integer sendingPort) {
+        this.eventFactory = eventFactory;
+        this.bufferPool = bufferPool;
+        this.logger = logger;
+        this.sendingHost = sendingHost;
+        this.sendingPort = sendingPort;
+        this.events = new EventQueue<>(events, logger);
+
+        if (bufferPool == null || bufferPool.size() == 0) {
+            throw new IllegalArgumentException("A pool of available 
ByteBuffers is required");
+        }
+    }
+
+    @Override
+    public void open(final InetAddress nicAddress, final int port, final int 
maxBufferSize) throws IOException {
+        stopped = false;
+        datagramChannel = DatagramChannel.open();
+        datagramChannel.configureBlocking(false);
+
+        if (maxBufferSize > 0) {
+            datagramChannel.setOption(StandardSocketOptions.SO_RCVBUF, 
maxBufferSize);
+            final int actualReceiveBufSize = 
datagramChannel.getOption(StandardSocketOptions.SO_RCVBUF);
+            if (actualReceiveBufSize < maxBufferSize) {
+                logger.warn("Attempted to set Socket Buffer Size to " + 
maxBufferSize + " bytes but could only set to "
+                        + actualReceiveBufSize + "bytes. You may want to 
consider changing the Operating System's "
+                        + "maximum receive buffer");
+            }
+        }
+
+        // we don't have to worry about nicAddress being null here because 
InetSocketAddress already handles it
+        datagramChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
+        datagramChannel.socket().bind(new InetSocketAddress(nicAddress, port));
+
+        // if a sending host and port were provided then connect to that 
specific address to only receive
+        // datagrams from that host/port, otherwise we can receive datagrams 
from any host/port
+        if (sendingHost != null && sendingPort != null) {
+            datagramChannel.connect(new InetSocketAddress(sendingHost, 
sendingPort));
+        }
+
+        selector = Selector.open();
+        datagramChannel.register(selector, SelectionKey.OP_READ);
+    }
+
+    @Override
+    public void run() {
+        final ByteBuffer buffer = bufferPool.poll();
+        while (!stopped) {
+            try {
+                int selected = selector.select();
+                // if stopped the selector could already be closed which would 
result in a ClosedSelectorException
+                if (selected > 0 && !stopped) {
+                    Iterator<SelectionKey> selectorKeys = 
selector.selectedKeys().iterator();
+                    // if stopped we don't want to modify the keys because 
close() may still be in progress
+                    while (selectorKeys.hasNext() && !stopped) {
+                        SelectionKey key = selectorKeys.next();
+                        selectorKeys.remove();
+                        if (!key.isValid()) {
+                            continue;
+                        }
+                        DatagramChannel channel = (DatagramChannel) 
key.channel();
+                        SocketAddress socketAddress;
+                        buffer.clear();
+                        while (!stopped && (socketAddress = 
channel.receive(buffer)) != null) {
+                            String sender = "";
+                            if (socketAddress instanceof InetSocketAddress) {
+                                sender = ((InetSocketAddress) 
socketAddress).getAddress().toString();
+                            }
+
+                            // create a byte array from the buffer
+                            buffer.flip();
+                            byte bytes[] = new byte[buffer.limit()];
+                            buffer.get(bytes, 0, buffer.limit());
+
+                            final Map<String,String> metadata = 
EventFactoryUtil.createMapWithSender(sender);
+                            final E event = eventFactory.create(bytes, 
metadata, null);
+                            events.offer(event);
+
+                            buffer.clear();
+                        }
+                    }
+                }
+            } catch (InterruptedException e) {
+                stopped = true;
+                Thread.currentThread().interrupt();
+            } catch (IOException e) {
+                logger.error("Error reading from DatagramChannel", e);
+            }
+        }
+
+        if (buffer != null) {
+            try {
+                bufferPool.put(buffer);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    @Override
+    public int getPort() {
+        return datagramChannel == null ? 0 : 
datagramChannel.socket().getLocalPort();
+    }
+
+    @Override
+    public void close() {
+        stopped = true;
+        if (selector != null) {
+            selector.wakeup();
+        }
+        IOUtils.closeQuietly(selector);
+        IOUtils.closeQuietly(datagramChannel);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelAttachment.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelAttachment.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelAttachment.java
new file mode 100644
index 0000000..f2479f1
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelAttachment.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.listen.dispatcher;
+
+import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Wrapper class so we can attach a buffer and/or an SSLSocketChannel to the 
selector key.
+ * */
+public class SocketChannelAttachment {
+
+    private final ByteBuffer byteBuffer;
+    private final SSLSocketChannel sslSocketChannel;
+
+    public SocketChannelAttachment(final ByteBuffer byteBuffer, final 
SSLSocketChannel sslSocketChannel) {
+        this.byteBuffer = byteBuffer;
+        this.sslSocketChannel = sslSocketChannel;
+    }
+
+    public ByteBuffer getByteBuffer() {
+        return byteBuffer;
+    }
+
+    public SSLSocketChannel getSslSocketChannel() {
+        return sslSocketChannel;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java
new file mode 100644
index 0000000..b07fbb9
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.listen.dispatcher;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.listen.event.Event;
+import org.apache.nifi.processor.util.listen.event.EventFactory;
+import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory;
+import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
+import org.apache.nifi.security.util.SslContextFactory;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.StandardSocketOptions;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.nio.charset.Charset;
+import java.util.Iterator;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Accepts Socket connections on the given port and creates a handler for each 
connection to
+ * be executed by a thread pool.
+ */
+public class SocketChannelDispatcher<E extends Event<SocketChannel>> 
implements AsyncChannelDispatcher {
+
+    private final EventFactory<E> eventFactory;
+    private final ChannelHandlerFactory<E, AsyncChannelDispatcher> 
handlerFactory;
+    private final BlockingQueue<ByteBuffer> bufferPool;
+    private final BlockingQueue<E> events;
+    private final ComponentLog logger;
+    private final int maxConnections;
+    private final SSLContext sslContext;
+    private final SslContextFactory.ClientAuth clientAuth;
+    private final Charset charset;
+
+    private ExecutorService executor;
+    private volatile boolean stopped = false;
+    private Selector selector;
+    private final BlockingQueue<SelectionKey> keyQueue;
+    private final AtomicInteger currentConnections = new AtomicInteger(0);
+
+    public SocketChannelDispatcher(final EventFactory<E> eventFactory,
+                                   final ChannelHandlerFactory<E, 
AsyncChannelDispatcher> handlerFactory,
+                                   final BlockingQueue<ByteBuffer> bufferPool,
+                                   final BlockingQueue<E> events,
+                                   final ComponentLog logger,
+                                   final int maxConnections,
+                                   final SSLContext sslContext,
+                                   final Charset charset) {
+        this(eventFactory, handlerFactory, bufferPool, events, logger, 
maxConnections, sslContext, SslContextFactory.ClientAuth.REQUIRED, charset);
+    }
+
+    public SocketChannelDispatcher(final EventFactory<E> eventFactory,
+                                   final ChannelHandlerFactory<E, 
AsyncChannelDispatcher> handlerFactory,
+                                   final BlockingQueue<ByteBuffer> bufferPool,
+                                   final BlockingQueue<E> events,
+                                   final ComponentLog logger,
+                                   final int maxConnections,
+                                   final SSLContext sslContext,
+                                   final SslContextFactory.ClientAuth 
clientAuth,
+                                   final Charset charset) {
+        this.eventFactory = eventFactory;
+        this.handlerFactory = handlerFactory;
+        this.bufferPool = bufferPool;
+        this.events = events;
+        this.logger = logger;
+        this.maxConnections = maxConnections;
+        this.keyQueue = new LinkedBlockingQueue<>(maxConnections);
+        this.sslContext = sslContext;
+        this.clientAuth = clientAuth;
+        this.charset = charset;
+
+        if (bufferPool == null || bufferPool.size() == 0 || bufferPool.size() 
!= maxConnections) {
+            throw new IllegalArgumentException(
+                    "A pool of available ByteBuffers equal to the maximum 
number of connections is required");
+        }
+    }
+
+    @Override
+    public void open(final InetAddress nicAddress, final int port, final int 
maxBufferSize) throws IOException {
+        stopped = false;
+        executor = Executors.newFixedThreadPool(maxConnections);
+
+        final ServerSocketChannel serverSocketChannel = 
ServerSocketChannel.open();
+        serverSocketChannel.configureBlocking(false);
+        if (maxBufferSize > 0) {
+            serverSocketChannel.setOption(StandardSocketOptions.SO_RCVBUF, 
maxBufferSize);
+            final int actualReceiveBufSize = 
serverSocketChannel.getOption(StandardSocketOptions.SO_RCVBUF);
+            if (actualReceiveBufSize < maxBufferSize) {
+                logger.warn("Attempted to set Socket Buffer Size to " + 
maxBufferSize + " bytes but could only set to "
+                        + actualReceiveBufSize + "bytes. You may want to 
consider changing the Operating System's "
+                        + "maximum receive buffer");
+            }
+        }
+
+        serverSocketChannel.socket().bind(new InetSocketAddress(nicAddress, 
port));
+
+        selector = Selector.open();
+        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
+    }
+
+    @Override
+    public void run() {
+        while (!stopped) {
+            try {
+                int selected = selector.select();
+                // if stopped the selector could already be closed which would 
result in a ClosedSelectorException
+                if (selected > 0 && !stopped){
+                    Iterator<SelectionKey> selectorKeys = 
selector.selectedKeys().iterator();
+                    // if stopped we don't want to modify the keys because 
close() may still be in progress
+                    while (selectorKeys.hasNext() && !stopped) {
+                        SelectionKey key = selectorKeys.next();
+                        selectorKeys.remove();
+                        if (!key.isValid()){
+                            continue;
+                        }
+                        if (key.isAcceptable()) {
+                            // Handle new connections coming in
+                            final ServerSocketChannel channel = 
(ServerSocketChannel) key.channel();
+                            final SocketChannel socketChannel = 
channel.accept();
+                            // Check for available connections
+                            if (currentConnections.incrementAndGet() > 
maxConnections){
+                                currentConnections.decrementAndGet();
+                                logger.warn("Rejecting connection from {} 
because max connections has been met",
+                                        new Object[]{ 
socketChannel.getRemoteAddress().toString() });
+                                IOUtils.closeQuietly(socketChannel);
+                                continue;
+                            }
+                            logger.debug("Accepted incoming connection from 
{}",
+                                    new 
Object[]{socketChannel.getRemoteAddress().toString()});
+                            // Set socket to non-blocking, and register with 
selector
+                            socketChannel.configureBlocking(false);
+                            SelectionKey readKey = 
socketChannel.register(selector, SelectionKey.OP_READ);
+
+                            // Prepare the byte buffer for the reads, clear it 
out
+                            ByteBuffer buffer = bufferPool.poll();
+                            buffer.clear();
+                            buffer.mark();
+
+                            // If we have an SSLContext then create an 
SSLEngine for the channel
+                            SSLSocketChannel sslSocketChannel = null;
+                            if (sslContext != null) {
+                                final SSLEngine sslEngine = 
sslContext.createSSLEngine();
+                                sslEngine.setUseClientMode(false);
+
+                                switch (clientAuth) {
+                                    case REQUIRED:
+                                        sslEngine.setNeedClientAuth(true);
+                                        break;
+                                    case WANT:
+                                        sslEngine.setWantClientAuth(true);
+                                        break;
+                                    case NONE:
+                                        sslEngine.setNeedClientAuth(false);
+                                        sslEngine.setWantClientAuth(false);
+                                        break;
+                                }
+
+                                sslSocketChannel = new 
SSLSocketChannel(sslEngine, socketChannel);
+                            }
+
+                            // Attach the buffer and SSLSocketChannel to the 
key
+                            SocketChannelAttachment attachment = new 
SocketChannelAttachment(buffer, sslSocketChannel);
+                            readKey.attach(attachment);
+                        } else if (key.isReadable()) {
+                            // Clear out the operations the select is 
interested in until done reading
+                            key.interestOps(0);
+                            // Create a handler based on the protocol and 
whether an SSLEngine was provided or not
+                            final Runnable handler;
+                            if (sslContext != null) {
+                                handler = handlerFactory.createSSLHandler(key, 
this, charset, eventFactory, events, logger);
+                            } else {
+                                handler = handlerFactory.createHandler(key, 
this, charset, eventFactory, events, logger);
+                            }
+
+                            // run the handler
+                            executor.execute(handler);
+                        }
+                    }
+                }
+                // Add back all idle sockets to the select
+                SelectionKey key;
+                while((key = keyQueue.poll()) != null){
+                    key.interestOps(SelectionKey.OP_READ);
+                }
+            } catch (IOException e) {
+                logger.error("Error accepting connection from SocketChannel", 
e);
+            }
+        }
+    }
+
+    @Override
+    public int getPort() {
+        // Return the port for the key listening for accepts
+        for(SelectionKey key : selector.keys()){
+            if (key.isValid()) {
+                final Channel channel = key.channel();
+                if (channel instanceof  ServerSocketChannel) {
+                    return 
((ServerSocketChannel)channel).socket().getLocalPort();
+                }
+            }
+        }
+        return 0;
+    }
+
+    @Override
+    public void close() {
+        stopped = true;
+        if (selector != null) {
+            selector.wakeup();
+        }
+
+        if (executor != null) {
+            executor.shutdown();
+            try {
+                // Wait a while for existing tasks to terminate
+                if (!executor.awaitTermination(1000L, TimeUnit.MILLISECONDS)) {
+                    executor.shutdownNow();
+                }
+            } catch (InterruptedException ie) {
+                // (Re-)Cancel if current thread also interrupted
+                executor.shutdownNow();
+                // Preserve interrupt status
+                Thread.currentThread().interrupt();
+            }
+        }
+
+        if (selector != null) {
+            synchronized (selector.keys()) {
+                for (SelectionKey key : selector.keys()) {
+                    IOUtils.closeQuietly(key.channel());
+                }
+            }
+        }
+        IOUtils.closeQuietly(selector);
+    }
+
+    @Override
+    public void completeConnection(SelectionKey key) {
+        // connection is done. Return the buffer to the pool
+        SocketChannelAttachment attachment = (SocketChannelAttachment) 
key.attachment();
+        try {
+            bufferPool.put(attachment.getByteBuffer());
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+        currentConnections.decrementAndGet();
+    }
+
+    @Override
+    public void addBackForSelection(SelectionKey key) {
+        keyQueue.offer(key);
+        selector.wakeup();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/event/Event.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/event/Event.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/event/Event.java
new file mode 100644
index 0000000..83989f8
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/event/Event.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.listen.event;
+
+import org.apache.nifi.processor.util.listen.response.ChannelResponder;
+
+import java.nio.channels.SelectableChannel;
+
+/**
+ * An event that was read from a channel.
+ *
+ * @param <C> the type of SelectableChannel the event was read from
+ */
+public interface Event<C extends SelectableChannel> {
+
+    /**
+     * @return the sending host of the data
+     */
+    String getSender();
+
+    /**
+     * @return raw data for this event
+     */
+    byte[] getData();
+
+    /**
+     * @return the responder to use for responding to this event, or null
+     *              if responses are not supported
+     */
+    ChannelResponder<C> getResponder();
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/event/EventFactory.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/event/EventFactory.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/event/EventFactory.java
new file mode 100644
index 0000000..1bd9f0d
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/event/EventFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.listen.event;
+
+import org.apache.nifi.processor.util.listen.response.ChannelResponder;
+
+import java.util.Map;
+
+/**
+ * Factory to create instances of a given type of Event.
+ */
+public interface EventFactory<E extends Event> {
+
+    /**
+     * The key in the metadata map for the sender.
+     */
+    String SENDER_KEY = "sender";
+
+    /**
+     * Creates an event for the given data and metadata.
+     *
+     * @param data raw data from a channel
+     * @param metadata additional metadata
+     * @param responder a responder for the event with the channel populated
+     *
+     * @return an instance of the given type
+     */
+    E create(final byte[] data, final Map<String, String> metadata, final 
ChannelResponder responder);
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/event/EventFactoryUtil.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/event/EventFactoryUtil.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/event/EventFactoryUtil.java
new file mode 100644
index 0000000..54529cf
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/event/EventFactoryUtil.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.listen.event;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Utility methods for EventFactory.
+ */
+public class EventFactoryUtil {
+
+    public static Map<String,String> createMapWithSender(final String sender) {
+        Map<String,String> metadata = new HashMap<>();
+        metadata.put(EventFactory.SENDER_KEY, sender);
+        return metadata;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/event/EventQueue.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/event/EventQueue.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/event/EventQueue.java
new file mode 100644
index 0000000..35e9ae0
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/event/EventQueue.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.listen.event;
+
+import org.apache.commons.lang3.Validate;
+import org.apache.nifi.logging.ComponentLog;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Wraps a BlockingQueue to centralize logic for offering events across UDP, 
TCP, and SSL.
+ *
+ * @param <E> the type of event
+ */
+public class EventQueue<E extends Event> {
+
+    /**
+     * The default number of milliseconds to wait when offering new events to 
the queue.
+     */
+    public static final long DEFAULT_OFFER_WAIT_MS = 100;
+
+    private final long offerWaitMs;
+    private final BlockingQueue<E> events;
+    private final ComponentLog logger;
+
+    public EventQueue(final BlockingQueue<E> events, final ComponentLog 
logger) {
+        this(events, DEFAULT_OFFER_WAIT_MS, logger);
+    }
+
+    public EventQueue(final BlockingQueue<E> events, final long offerWaitMs, 
final ComponentLog logger) {
+        this.events = events;
+        this.offerWaitMs = offerWaitMs;
+        this.logger = logger;
+        Validate.notNull(this.events);
+        Validate.notNull(this.logger);
+    }
+
+    /**
+     * Offers the given event to the events queue with a wait time, if the 
offer fails the event
+     * is dropped an error is logged.
+     *
+     * @param event the event to offer
+     * @throws InterruptedException if interrupted while waiting to offer
+     */
+    public void offer(final E event) throws InterruptedException {
+        boolean queued = events.offer(event, offerWaitMs, 
TimeUnit.MILLISECONDS);
+        if (!queued) {
+            logger.error("Internal queue at maximum capacity, could not queue 
event");
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/event/StandardEvent.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/event/StandardEvent.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/event/StandardEvent.java
new file mode 100644
index 0000000..fa3699e
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/event/StandardEvent.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.listen.event;
+
+import org.apache.nifi.processor.util.listen.response.ChannelResponder;
+
+import java.nio.channels.SelectableChannel;
+
+/**
+ * Standard implementation of Event.
+ */
+public class StandardEvent<C extends SelectableChannel> implements Event<C> {
+
+    private final String sender;
+    private final byte[] data;
+    private final ChannelResponder<C> responder;
+
+    public StandardEvent(final String sender, final byte[] data, final 
ChannelResponder<C> responder) {
+        this.sender = sender;
+        this.data = data;
+        this.responder = responder;
+    }
+
+    @Override
+    public String getSender() {
+        return sender;
+    }
+
+    @Override
+    public byte[] getData() {
+        return data;
+    }
+
+    public ChannelResponder<C> getResponder() {
+        return responder;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/event/StandardEventFactory.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/event/StandardEventFactory.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/event/StandardEventFactory.java
new file mode 100644
index 0000000..9ae6161
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/event/StandardEventFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.listen.event;
+
+import org.apache.nifi.processor.util.listen.response.ChannelResponder;
+
+import java.util.Map;
+
+/**
+ * EventFactory to create StandardEvent instances.
+ */
+public class StandardEventFactory implements EventFactory<StandardEvent> {
+
+    @Override
+    public StandardEvent create(final byte[] data, final Map<String, String> 
metadata, final ChannelResponder responder) {
+        String sender = null;
+        if (metadata != null && metadata.containsKey(EventFactory.SENDER_KEY)) 
{
+            sender = metadata.get(EventFactory.SENDER_KEY);
+        }
+        return new StandardEvent(sender, data, responder);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/handler/ChannelHandler.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/handler/ChannelHandler.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/handler/ChannelHandler.java
new file mode 100644
index 0000000..84ef062
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/handler/ChannelHandler.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.listen.handler;
+
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
+import org.apache.nifi.processor.util.listen.event.Event;
+import org.apache.nifi.processor.util.listen.event.EventFactory;
+import org.apache.nifi.processor.util.listen.event.EventQueue;
+
+import java.nio.channels.SelectionKey;
+import java.nio.charset.Charset;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Base class for all channel handlers.
+ */
+public abstract class ChannelHandler<E extends Event, D extends 
ChannelDispatcher> implements Runnable {
+
+    protected final SelectionKey key;
+    protected final D dispatcher;
+    protected final Charset charset;
+    protected final EventFactory<E> eventFactory;
+    protected final EventQueue<E> events;
+    protected final ComponentLog logger;
+
+    public ChannelHandler(final SelectionKey key,
+                          final D dispatcher,
+                          final Charset charset,
+                          final EventFactory<E> eventFactory,
+                          final BlockingQueue<E> events,
+                          final ComponentLog logger) {
+        this.key = key;
+        this.dispatcher = dispatcher;
+        this.charset = charset;
+        this.eventFactory = eventFactory;
+        this.logger = logger;
+        this.events = new EventQueue<E>(events, logger);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/handler/ChannelHandlerFactory.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/handler/ChannelHandlerFactory.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/handler/ChannelHandlerFactory.java
new file mode 100644
index 0000000..9ca6bdd
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/handler/ChannelHandlerFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.listen.handler;
+
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
+import org.apache.nifi.processor.util.listen.event.Event;
+import org.apache.nifi.processor.util.listen.event.EventFactory;
+
+import java.nio.channels.SelectionKey;
+import java.nio.charset.Charset;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Factory that can produce ChannelHandlers for the given type of Event and 
ChannelDispatcher.
+ */
+public interface ChannelHandlerFactory<E extends Event, D extends 
ChannelDispatcher> {
+
+    ChannelHandler<E, D> createHandler(final SelectionKey key,
+                                    final D dispatcher,
+                                    final Charset charset,
+                                    final EventFactory<E> eventFactory,
+                                    final BlockingQueue<E> events,
+                                    final ComponentLog logger);
+
+    ChannelHandler<E, D> createSSLHandler(final SelectionKey key,
+                                       final D dispatcher,
+                                       final Charset charset,
+                                       final EventFactory<E> eventFactory,
+                                       final BlockingQueue<E> events,
+                                       final ComponentLog logger);
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SSLSocketChannelHandler.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SSLSocketChannelHandler.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SSLSocketChannelHandler.java
new file mode 100644
index 0000000..ef747e1
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SSLSocketChannelHandler.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.listen.handler.socket;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
+import 
org.apache.nifi.processor.util.listen.dispatcher.SocketChannelAttachment;
+import org.apache.nifi.processor.util.listen.event.Event;
+import org.apache.nifi.processor.util.listen.event.EventFactory;
+import org.apache.nifi.processor.util.listen.event.EventFactoryUtil;
+import 
org.apache.nifi.processor.util.listen.response.socket.SSLSocketChannelResponder;
+import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.SocketTimeoutException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedByInterruptException;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.nio.charset.Charset;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Wraps a SocketChannel with an SSLSocketChannel for receiving messages over 
TLS.
+ */
+public class SSLSocketChannelHandler<E extends Event<SocketChannel>> extends 
SocketChannelHandler<E> {
+
+    private final ByteArrayOutputStream currBytes = new 
ByteArrayOutputStream(4096);
+
+    public SSLSocketChannelHandler(final SelectionKey key,
+                                   final AsyncChannelDispatcher dispatcher,
+                                   final Charset charset,
+                                   final EventFactory<E> eventFactory,
+                                   final BlockingQueue<E> events,
+                                   final ComponentLog logger) {
+        super(key, dispatcher, charset, eventFactory, events, logger);
+    }
+
+    @Override
+    public void run() {
+        boolean eof = false;
+        SSLSocketChannel sslSocketChannel = null;
+        try {
+            int bytesRead;
+            final SocketChannel socketChannel = (SocketChannel) key.channel();
+            final SocketChannelAttachment attachment = 
(SocketChannelAttachment) key.attachment();
+
+            // get the SSLSocketChannel from the attachment
+            sslSocketChannel = attachment.getSslSocketChannel();
+
+            // SSLSocketChannel deals with byte[] so ByteBuffer isn't used 
here, but we'll use the size to create a new byte[]
+            final ByteBuffer socketBuffer = attachment.getByteBuffer();
+            byte[] socketBufferArray = new byte[socketBuffer.limit()];
+
+            // read until no more data
+            try {
+                while ((bytesRead = sslSocketChannel.read(socketBufferArray)) 
> 0) {
+                    processBuffer(sslSocketChannel, socketChannel, bytesRead, 
socketBufferArray);
+                    logger.debug("bytes read from sslSocketChannel {}", new 
Object[]{bytesRead});
+                }
+            } catch (SocketTimeoutException ste) {
+                // SSLSocketChannel will throw this exception when 0 bytes are 
read and the timeout threshold
+                // is exceeded, we don't want to close the connection in this 
case
+                bytesRead = 0;
+            }
+
+            // Check for closed socket
+            if( bytesRead < 0 ){
+                eof = true;
+                logger.debug("Reached EOF, closing connection");
+            } else {
+                logger.debug("No more data available, returning for 
selection");
+            }
+        } catch (ClosedByInterruptException | InterruptedException e) {
+            logger.debug("read loop interrupted, closing connection");
+            // Treat same as closed socket
+            eof = true;
+        } catch (ClosedChannelException e) {
+            // ClosedChannelException doesn't have a message so handle it 
separately from IOException
+            logger.error("Error reading from channel due to channel being 
closed", e);
+            // Treat same as closed socket
+            eof = true;
+        } catch (IOException e) {
+            logger.error("Error reading from channel due to {}", new Object[] 
{e.getMessage()}, e);
+            // Treat same as closed socket
+            eof = true;
+        } finally {
+            if(eof == true) {
+                IOUtils.closeQuietly(sslSocketChannel);
+                dispatcher.completeConnection(key);
+            } else {
+                dispatcher.addBackForSelection(key);
+            }
+        }
+    }
+
+    /**
+     * Process the contents of the buffer. Give sub-classes a chance to 
override this behavior.
+     *
+     * @param sslSocketChannel the channel the data was read from
+     * @param socketChannel the socket channel being wrapped by 
sslSocketChannel
+     * @param bytesRead the number of bytes read
+     * @param buffer the buffer to process
+     * @throws InterruptedException thrown if interrupted while queuing events
+     */
+    protected void processBuffer(final SSLSocketChannel sslSocketChannel, 
final SocketChannel socketChannel,
+                                 final int bytesRead, final byte[] buffer) 
throws InterruptedException, IOException {
+        final InetAddress sender = socketChannel.socket().getInetAddress();
+
+        // go through the buffer looking for the end of each message
+        for (int i = 0; i < bytesRead; i++) {
+            final byte currByte = buffer[i];
+
+            // check if at end of a message
+            if (currByte == getDelimiter()) {
+                if (currBytes.size() > 0) {
+                    final SSLSocketChannelResponder response = new 
SSLSocketChannelResponder(socketChannel, sslSocketChannel);
+                    final Map<String, String> metadata = 
EventFactoryUtil.createMapWithSender(sender.toString());
+                    final E event = 
eventFactory.create(currBytes.toByteArray(), metadata, response);
+                    events.offer(event);
+                    currBytes.reset();
+                }
+            } else {
+                currBytes.write(currByte);
+            }
+        }
+    }
+
+    @Override
+    public byte getDelimiter() {
+        return TCP_DELIMITER;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SocketChannelHandler.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SocketChannelHandler.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SocketChannelHandler.java
new file mode 100644
index 0000000..07b5dcc
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SocketChannelHandler.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.listen.handler.socket;
+
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
+import org.apache.nifi.processor.util.listen.event.Event;
+import org.apache.nifi.processor.util.listen.event.EventFactory;
+import org.apache.nifi.processor.util.listen.handler.ChannelHandler;
+
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.nio.charset.Charset;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Base class for socket channel handlers.
+ */
+public abstract class SocketChannelHandler<E extends Event<SocketChannel>> 
extends ChannelHandler<E, AsyncChannelDispatcher> {
+
+    static final byte TCP_DELIMITER = '\n';
+
+    public SocketChannelHandler(final SelectionKey key,
+                                final AsyncChannelDispatcher dispatcher,
+                                final Charset charset,
+                                final EventFactory<E> eventFactory,
+                                final BlockingQueue<E> events,
+                                final ComponentLog logger) {
+        super(key, dispatcher, charset, eventFactory, events, logger);
+    }
+
+    /**
+     * @return the byte used as the delimiter between messages for the given 
handler
+     */
+    public abstract byte getDelimiter();
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SocketChannelHandlerFactory.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SocketChannelHandlerFactory.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SocketChannelHandlerFactory.java
new file mode 100644
index 0000000..9003f90
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SocketChannelHandlerFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.listen.handler.socket;
+
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
+import org.apache.nifi.processor.util.listen.event.Event;
+import org.apache.nifi.processor.util.listen.event.EventFactory;
+import org.apache.nifi.processor.util.listen.handler.ChannelHandler;
+import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory;
+
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.nio.charset.Charset;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Default factory for creating socket channel handlers.
+ */
+public class SocketChannelHandlerFactory<E extends Event<SocketChannel>> 
implements ChannelHandlerFactory<E, AsyncChannelDispatcher> {
+
+    @Override
+    public ChannelHandler<E, AsyncChannelDispatcher> createHandler(final 
SelectionKey key,
+                                           final AsyncChannelDispatcher 
dispatcher,
+                                           final Charset charset,
+                                           final EventFactory<E> eventFactory,
+                                           final BlockingQueue<E> events,
+                                           final ComponentLog logger) {
+        return new StandardSocketChannelHandler<>(key, dispatcher, charset, 
eventFactory, events, logger);
+    }
+
+    @Override
+    public ChannelHandler<E, AsyncChannelDispatcher> createSSLHandler(final 
SelectionKey key,
+                                              final AsyncChannelDispatcher 
dispatcher,
+                                              final Charset charset,
+                                              final EventFactory<E> 
eventFactory,
+                                              final BlockingQueue<E> events,
+                                              final ComponentLog logger) {
+        return new SSLSocketChannelHandler<>(key, dispatcher, charset, 
eventFactory, events, logger);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/StandardSocketChannelHandler.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/StandardSocketChannelHandler.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/StandardSocketChannelHandler.java
new file mode 100644
index 0000000..250168c
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/StandardSocketChannelHandler.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.listen.handler.socket;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
+import 
org.apache.nifi.processor.util.listen.dispatcher.SocketChannelAttachment;
+import org.apache.nifi.processor.util.listen.event.Event;
+import org.apache.nifi.processor.util.listen.event.EventFactory;
+import org.apache.nifi.processor.util.listen.event.EventFactoryUtil;
+import 
org.apache.nifi.processor.util.listen.response.socket.SocketChannelResponder;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedByInterruptException;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.nio.charset.Charset;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Reads from the given SocketChannel into the provided buffer. If the given 
delimiter is found, the data
+ * read up to that point is queued for processing.
+ */
+public class StandardSocketChannelHandler<E extends Event<SocketChannel>> 
extends SocketChannelHandler<E> {
+
+    private final ByteArrayOutputStream currBytes = new 
ByteArrayOutputStream(4096);
+
+    public StandardSocketChannelHandler(final SelectionKey key,
+                                        final AsyncChannelDispatcher 
dispatcher,
+                                        final Charset charset,
+                                        final EventFactory<E> eventFactory,
+                                        final BlockingQueue<E> events,
+                                        final ComponentLog logger) {
+        super(key, dispatcher, charset, eventFactory, events, logger);
+    }
+
+    @Override
+    public void run() {
+        boolean eof = false;
+        SocketChannel socketChannel = null;
+
+        try {
+            int bytesRead;
+            socketChannel = (SocketChannel) key.channel();
+
+            final SocketChannelAttachment attachment = 
(SocketChannelAttachment) key.attachment();
+            final ByteBuffer socketBuffer = attachment.getByteBuffer();
+
+            // read until the buffer is full
+            while ((bytesRead = socketChannel.read(socketBuffer)) > 0) {
+                // prepare byte buffer for reading
+                socketBuffer.flip();
+                // mark the current position as start, in case of partial 
message read
+                socketBuffer.mark();
+                // process the contents that have been read into the buffer
+                processBuffer(socketChannel, socketBuffer);
+
+                // Preserve bytes in buffer for next call to run
+                // NOTE: This code could benefit from the  two ByteBuffer read 
calls to avoid
+                // this compact for higher throughput
+                socketBuffer.reset();
+                socketBuffer.compact();
+                logger.debug("bytes read {}", new Object[]{bytesRead});
+            }
+
+            // Check for closed socket
+            if( bytesRead < 0 ){
+                eof = true;
+                logger.debug("Reached EOF, closing connection");
+            } else {
+                logger.debug("No more data available, returning for 
selection");
+            }
+        } catch (ClosedByInterruptException | InterruptedException e) {
+            logger.debug("read loop interrupted, closing connection");
+            // Treat same as closed socket
+            eof = true;
+        } catch (ClosedChannelException e) {
+            // ClosedChannelException doesn't have a message so handle it 
separately from IOException
+            logger.error("Error reading from channel due to channel being 
closed", e);
+            // Treat same as closed socket
+            eof = true;
+        } catch (IOException e) {
+            logger.error("Error reading from channel due to {}", new Object[] 
{e.getMessage()}, e);
+            // Treat same as closed socket
+            eof = true;
+        } finally {
+            if(eof == true) {
+                IOUtils.closeQuietly(socketChannel);
+                dispatcher.completeConnection(key);
+            } else {
+                dispatcher.addBackForSelection(key);
+            }
+        }
+    }
+
+    /**
+     * Process the contents that have been read into the buffer. Allow 
sub-classes to override this behavior.
+     *
+     * @param socketChannel the channel the data was read from
+     * @param socketBuffer the buffer the data was read into
+     * @throws InterruptedException if interrupted when queuing events
+     */
+    protected void processBuffer(final SocketChannel socketChannel, final 
ByteBuffer socketBuffer) throws InterruptedException, IOException {
+        // get total bytes in buffer
+        final int total = socketBuffer.remaining();
+        final InetAddress sender = socketChannel.socket().getInetAddress();
+
+        // go through the buffer looking for the end of each message
+        currBytes.reset();
+        for (int i = 0; i < total; i++) {
+            // NOTE: For higher throughput, the looking for \n and copying 
into the byte stream could be improved
+            // Pull data out of buffer and cram into byte array
+            byte currByte = socketBuffer.get();
+
+            // check if at end of a message
+            if (currByte == getDelimiter()) {
+                if (currBytes.size() > 0) {
+                    final SocketChannelResponder response = new 
SocketChannelResponder(socketChannel);
+                    final Map<String, String> metadata = 
EventFactoryUtil.createMapWithSender(sender.toString());
+                    final E event = 
eventFactory.create(currBytes.toByteArray(), metadata, response);
+                    events.offer(event);
+                    currBytes.reset();
+
+                    // Mark this as the start of the next message
+                    socketBuffer.mark();
+                }
+            } else {
+                currBytes.write(currByte);
+            }
+        }
+    }
+
+    @Override
+    public byte getDelimiter() {
+        return TCP_DELIMITER;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/response/ChannelResponder.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/response/ChannelResponder.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/response/ChannelResponder.java
new file mode 100644
index 0000000..978f3ac
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/response/ChannelResponder.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.listen.response;
+
+import java.io.IOException;
+import java.nio.channels.SelectableChannel;
+import java.util.List;
+
+/**
+ * A responder for a given channel.
+ *
+ * @param <C> The type of SelectableChannel where the response will be written.
+ */
+public interface ChannelResponder<C extends SelectableChannel> {
+
+    /**
+     * @return a SelectableChannel to write the response to
+     */
+    C getChannel();
+
+    /**
+     * @return a list of responses to write to the channel
+     */
+    List<ChannelResponse> getResponses();
+
+    /**
+     * @param response adds the given response to the list of responses
+     */
+    void addResponse(ChannelResponse response);
+
+    /**
+     * Writes the responses to the underlying channel.
+     */
+    void respond() throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/response/ChannelResponse.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/response/ChannelResponse.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/response/ChannelResponse.java
new file mode 100644
index 0000000..98f0301
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/response/ChannelResponse.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.listen.response;
+
+/**
+ * A response to send back over channel.
+ */
+public interface ChannelResponse {
+
+    /**
+     * @return the bytes that should be written to a channel for this response
+     */
+    byte[] toByteArray();
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/response/socket/SSLSocketChannelResponder.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/response/socket/SSLSocketChannelResponder.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/response/socket/SSLSocketChannelResponder.java
new file mode 100644
index 0000000..20102ba
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/response/socket/SSLSocketChannelResponder.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.listen.response.socket;
+
+import org.apache.nifi.processor.util.listen.response.ChannelResponse;
+import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
+
+import java.io.IOException;
+import java.nio.channels.SocketChannel;
+
+/**
+ * A ChannelResponder for SSLSocketChannels.
+ */
+public class SSLSocketChannelResponder extends SocketChannelResponder {
+
+    private SSLSocketChannel sslSocketChannel;
+
+    public SSLSocketChannelResponder(final SocketChannel socketChannel, final 
SSLSocketChannel sslSocketChannel) {
+        super(socketChannel);
+        this.sslSocketChannel = sslSocketChannel;
+    }
+
+    @Override
+    public void respond() throws IOException {
+        for (final ChannelResponse response : responses) {
+            sslSocketChannel.write(response.toByteArray());
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/response/socket/SocketChannelResponder.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/response/socket/SocketChannelResponder.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/response/socket/SocketChannelResponder.java
new file mode 100644
index 0000000..5c20bf0
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/response/socket/SocketChannelResponder.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.listen.response.socket;
+
+import org.apache.nifi.processor.util.listen.response.ChannelResponder;
+import org.apache.nifi.processor.util.listen.response.ChannelResponse;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * A ChannelResponder for SocketChannels. The SocketChannel should first be 
registered with a selector,
+ * upon being selected for writing the respond() method should be executed.
+ */
+public class SocketChannelResponder implements ChannelResponder<SocketChannel> 
{
+
+    protected final List<ChannelResponse> responses;
+    protected final SocketChannel socketChannel;
+
+    public SocketChannelResponder(final SocketChannel socketChannel) {
+        this.responses = new ArrayList<>();
+        this.socketChannel = socketChannel;
+    }
+
+    @Override
+    public SocketChannel getChannel() {
+        return socketChannel;
+    }
+
+    @Override
+    public List<ChannelResponse> getResponses() {
+        return Collections.unmodifiableList(responses);
+    }
+
+    @Override
+    public void addResponse(ChannelResponse response) {
+        this.responses.add(response);
+    }
+
+    @Override
+    public void respond() throws IOException {
+        for (final ChannelResponse response : responses) {
+            final ByteBuffer responseBuffer = 
ByteBuffer.wrap(response.toByteArray());
+
+            while (responseBuffer.hasRemaining()) {
+                socketChannel.write(responseBuffer);
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/DiscontinuedException.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/DiscontinuedException.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/DiscontinuedException.java
new file mode 100644
index 0000000..f97f31d
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/DiscontinuedException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.pattern;
+
+/**
+ * Represents a looping process was discontinued.
+ * When a method throws this exception, its caller should stop processing 
further inputs and stop immediately.
+ */
+public class DiscontinuedException extends RuntimeException {
+    public DiscontinuedException(String message) {
+        super(message);
+    }
+
+    public DiscontinuedException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/ErrorTypes.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/ErrorTypes.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/ErrorTypes.java
new file mode 100644
index 0000000..c6cf140
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/pattern/ErrorTypes.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.pattern;
+
+import static 
org.apache.nifi.processor.util.pattern.ErrorTypes.Destination.Failure;
+import static 
org.apache.nifi.processor.util.pattern.ErrorTypes.Destination.ProcessException;
+import static 
org.apache.nifi.processor.util.pattern.ErrorTypes.Destination.Retry;
+import static 
org.apache.nifi.processor.util.pattern.ErrorTypes.Destination.Self;
+import static org.apache.nifi.processor.util.pattern.ErrorTypes.Penalty.None;
+import static 
org.apache.nifi.processor.util.pattern.ErrorTypes.Penalty.Penalize;
+import static org.apache.nifi.processor.util.pattern.ErrorTypes.Penalty.Yield;
+
+/**
+ * Represents general error types and how it should be treated.
+ */
+public enum ErrorTypes {
+
+    /**
+     * Procedure setting has to be fixed, otherwise the same error would occur 
irrelevant to the input.
+     * In order to NOT call failing process frequently, this should be yielded.
+     */
+    PersistentFailure(ProcessException, Yield),
+
+    /**
+     * It is unknown whether the error is persistent or temporal, related to 
the input or not.
+     */
+    UnknownFailure(ProcessException, None),
+
+    /**
+     * The input will be sent to the failure route for recovery without 
penalizing.
+     * Basically, the input should not be sent to the same procedure again 
unless the issue has been solved.
+     */
+    InvalidInput(Failure, None),
+
+    /**
+     * The procedure is temporarily unavailable, usually due to the external 
service unavailability.
+     * Retrying maybe successful, but it should be yielded for a while.
+     */
+    TemporalFailure(Retry, Yield),
+
+    /**
+     * The input was not processed successfully due to some temporal error
+     * related to the specifics of the input. Retrying maybe successful,
+     * but it should be penalized for a while.
+     */
+    TemporalInputFailure(Retry, Penalize),
+
+    /**
+     * The input was not ready for being processed. It will be kept in the 
incoming queue and also be penalized.
+     */
+    Defer(Self, Penalize);
+
+    private final Destination destination;
+    private final Penalty penalty;
+    ErrorTypes(Destination destination, Penalty penalty){
+        this.destination = destination;
+        this.penalty = penalty;
+    }
+
+    public Result result() {
+        return new Result(destination, penalty);
+    }
+
+    /**
+     * Represents the destination of input.
+     */
+    public enum Destination {
+        ProcessException, Failure, Retry, Self
+    }
+
+    /**
+     * Indicating yield or penalize the processing when transfer the input.
+     */
+    public enum Penalty {
+        Yield, Penalize, None
+    }
+
+    public Destination destination(){
+        return this.destination;
+    }
+
+    public Penalty penalty(){
+        return this.penalty;
+    }
+
+    /**
+     * Result represents a result of a procedure.
+     * ErrorTypes enum contains basic error result patterns.
+     */
+    public static class Result {
+        private final Destination destination;
+        private final Penalty penalty;
+
+        public Result(Destination destination, Penalty penalty) {
+            this.destination = destination;
+            this.penalty = penalty;
+        }
+
+        public Destination destination() {
+            return destination;
+        }
+
+        public Penalty penalty() {
+            return penalty;
+        }
+
+        @Override
+        public String toString() {
+            return "Result{" +
+                    "destination=" + destination +
+                    ", penalty=" + penalty +
+                    '}';
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+
+            Result result = (Result) o;
+
+            if (destination != result.destination) return false;
+            return penalty == result.penalty;
+        }
+
+        @Override
+        public int hashCode() {
+            int result = destination != null ? destination.hashCode() : 0;
+            result = 31 * result + (penalty != null ? penalty.hashCode() : 0);
+            return result;
+        }
+    }
+
+}

Reply via email to