http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventBatchingProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventBatchingProcessor.java
 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventBatchingProcessor.java
deleted file mode 100644
index 9a97671..0000000
--- 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventBatchingProcessor.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processor.util.listen;
-
-import static 
org.apache.nifi.processor.util.listen.ListenerProperties.NETWORK_INTF_NAME;
-
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.ProcessorInitializationContext;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.OutputStreamCallback;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processor.util.listen.event.Event;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * An abstract processor that extends from AbstractListenEventProcessor and 
adds common functionality for
- * batching events into a single FlowFile.
- *
- * @param <E> the type of Event
- */
-public abstract class AbstractListenEventBatchingProcessor<E extends Event> 
extends AbstractListenEventProcessor<E> {
-
-    public static final PropertyDescriptor MAX_BATCH_SIZE = new 
PropertyDescriptor.Builder()
-            .name("Max Batch Size")
-            .description(
-                    "The maximum number of messages to add to a single 
FlowFile. If multiple messages are available, they will be concatenated along 
with "
-                            + "the <Message Delimiter> up to this configured 
maximum number of messages")
-            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
-            .expressionLanguageSupported(false)
-            .defaultValue("1")
-            .required(true)
-            .build();
-    public static final PropertyDescriptor MESSAGE_DELIMITER = new 
PropertyDescriptor.Builder()
-            .name("Message Delimiter")
-            .displayName("Batching Message Delimiter")
-            .description("Specifies the delimiter to place between messages 
when multiple messages are bundled together (see <Max Batch Size> property).")
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .defaultValue("\\n")
-            .required(true)
-            .build();
-
-    // it is only the array reference that is volatile - not the contents.
-    protected volatile byte[] messageDemarcatorBytes;
-
-    @Override
-    protected void init(final ProcessorInitializationContext context) {
-        final List<PropertyDescriptor> descriptors = new ArrayList<>();
-        descriptors.add(NETWORK_INTF_NAME);
-        descriptors.add(PORT);
-        descriptors.add(RECV_BUFFER_SIZE);
-        descriptors.add(MAX_MESSAGE_QUEUE_SIZE);
-        descriptors.add(MAX_SOCKET_BUFFER_SIZE);
-        descriptors.add(CHARSET);
-        descriptors.add(MAX_BATCH_SIZE);
-        descriptors.add(MESSAGE_DELIMITER);
-        descriptors.addAll(getAdditionalProperties());
-        this.descriptors = Collections.unmodifiableList(descriptors);
-
-        final Set<Relationship> relationships = new HashSet<>();
-        relationships.add(REL_SUCCESS);
-        relationships.addAll(getAdditionalRelationships());
-        this.relationships = Collections.unmodifiableSet(relationships);
-    }
-
-    @Override
-    @OnScheduled
-    public void onScheduled(ProcessContext context) throws IOException {
-        super.onScheduled(context);
-        final String msgDemarcator = 
context.getProperty(MESSAGE_DELIMITER).getValue().replace("\\n", 
"\n").replace("\\r", "\r").replace("\\t", "\t");
-        messageDemarcatorBytes = msgDemarcator.getBytes(charset);
-    }
-
-    @Override
-    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
-        final int maxBatchSize = 
context.getProperty(MAX_BATCH_SIZE).asInteger();
-        final Map<String,FlowFileEventBatch> batches = getBatches(session, 
maxBatchSize, messageDemarcatorBytes);
-
-        // if the size is 0 then there was nothing to process so return
-        // we don't need to yield here because we have a long poll in side of 
getBatches
-        if (batches.size() == 0) {
-            return;
-        }
-
-        final List<E> allEvents = new ArrayList<>();
-
-        for (Map.Entry<String,FlowFileEventBatch> entry : batches.entrySet()) {
-            FlowFile flowFile = entry.getValue().getFlowFile();
-            final List<E> events = entry.getValue().getEvents();
-
-            if (flowFile.getSize() == 0L || events.size() == 0) {
-                session.remove(flowFile);
-                getLogger().debug("No data written to FlowFile from batch {}; 
removing FlowFile", new Object[] {entry.getKey()});
-                continue;
-            }
-
-            final Map<String,String> attributes = 
getAttributes(entry.getValue());
-            flowFile = session.putAllAttributes(flowFile, attributes);
-
-            getLogger().debug("Transferring {} to success", new Object[] 
{flowFile});
-            session.transfer(flowFile, REL_SUCCESS);
-            session.adjustCounter("FlowFiles Transferred to Success", 1L, 
false);
-
-            // the sender and command will be the same for all events based on 
the batch key
-            final String transitUri = getTransitUri(entry.getValue());
-            session.getProvenanceReporter().receive(flowFile, transitUri);
-
-            allEvents.addAll(events);
-        }
-
-        // let sub-classes take any additional actions
-        postProcess(context, session, allEvents);
-    }
-
-    /**
-     * Creates the attributes for the FlowFile of the given batch.
-     *
-     * @param batch the current batch
-     * @return the Map of FlowFile attributes
-     */
-    protected abstract Map<String,String> getAttributes(final 
FlowFileEventBatch batch);
-
-    /**
-     * Creates the transit uri to be used when reporting a provenance receive 
event for the given batch.
-     *
-     * @param batch the current batch
-     * @return the transit uri string
-     */
-    protected abstract String getTransitUri(final FlowFileEventBatch batch);
-
-    /**
-     * Called at the end of onTrigger to allow sub-classes to take post 
processing action on the events
-     *
-     * @param context the current context
-     * @param session the current session
-     * @param events the list of all events processed by the current execution 
of onTrigger
-     */
-    protected void postProcess(ProcessContext context, ProcessSession session, 
final List<E> events) {
-        // empty implementation so sub-classes only have to override if 
necessary
-    }
-
-    /**
-     * Batches together up to the batchSize events. Events are grouped 
together based on a batch key which
-     * by default is the sender of the event, but can be override by 
sub-classes.
-     *
-     * This method will return when batchSize has been reached, or when no 
more events are available on the queue.
-     *
-     * @param session the current session
-     * @param totalBatchSize the total number of events to process
-     * @param messageDemarcatorBytes the demarcator to put between messages 
when writing to a FlowFile
-     *
-     * @return a Map from the batch key to the FlowFile and events for that 
batch, the size of events in all
-     *              the batches will be <= batchSize
-     */
-    protected Map<String,FlowFileEventBatch> getBatches(final ProcessSession 
session, final int totalBatchSize,
-                                                        final byte[] 
messageDemarcatorBytes) {
-
-        final Map<String,FlowFileEventBatch> batches = new HashMap<>();
-        for (int i=0; i < totalBatchSize; i++) {
-            final E event = getMessage(true, true, session);
-            if (event == null) {
-                break;
-            }
-
-            final String batchKey = getBatchKey(event);
-            FlowFileEventBatch batch = batches.get(batchKey);
-
-            // if we don't have a batch for this key then create a new one
-            if (batch == null) {
-                batch = new FlowFileEventBatch(session.create(), new 
ArrayList<E>());
-                batches.put(batchKey, batch);
-            }
-
-            // add the current event to the batch
-            batch.getEvents().add(event);
-
-            // append the event's data to the FlowFile, write the demarcator 
first if not on the first event
-            final boolean writeDemarcator = (i > 0);
-            try {
-                final byte[] rawMessage = event.getData();
-                FlowFile appendedFlowFile = 
session.append(batch.getFlowFile(), new OutputStreamCallback() {
-                    @Override
-                    public void process(final OutputStream out) throws 
IOException {
-                        if (writeDemarcator) {
-                            out.write(messageDemarcatorBytes);
-                        }
-
-                        out.write(rawMessage);
-                    }
-                });
-
-                // update the FlowFile reference in the batch object
-                batch.setFlowFile(appendedFlowFile);
-
-            } catch (final Exception e) {
-                getLogger().error("Failed to write contents of the message to 
FlowFile due to {}; will re-queue message and try again",
-                        new Object[] {e.getMessage()}, e);
-                errorEvents.offer(event);
-                break;
-            }
-        }
-
-        return batches;
-    }
-
-    /**
-     * @param event an event that was pulled off the queue
-     *
-     * @return a key to use for batching events together, by default this uses 
the sender of the
-     *              event, but sub-classes should override this to batch by 
something else
-     */
-    protected String getBatchKey(final E event) {
-        return event.getSender();
-    }
-
-    /**
-     * Wrapper to hold a FlowFile and the events that have been appended to it.
-     */
-    protected final class FlowFileEventBatch {
-
-        private FlowFile flowFile;
-        private List<E> events;
-
-        public FlowFileEventBatch(final FlowFile flowFile, final List<E> 
events) {
-            this.flowFile = flowFile;
-            this.events = events;
-        }
-
-        public FlowFile getFlowFile() {
-            return flowFile;
-        }
-
-        public List<E> getEvents() {
-            return events;
-        }
-
-        public void setFlowFile(FlowFile flowFile) {
-            this.flowFile = flowFile;
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java
 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java
deleted file mode 100644
index 43d01b8..0000000
--- 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java
+++ /dev/null
@@ -1,284 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processor.util.listen;
-
-import static 
org.apache.nifi.processor.util.listen.ListenerProperties.NETWORK_INTF_NAME;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.processor.AbstractProcessor;
-import org.apache.nifi.processor.DataUnit;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.ProcessorInitializationContext;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
-import org.apache.nifi.processor.util.listen.event.Event;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.NetworkInterface;
-import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-/**
- * An abstract processor to extend from when listening for events over a 
channel. This processor
- * will start a ChannelDispatcher, and optionally a ChannelResponseDispatcher, 
in a background
- * thread which will end up placing events on a queue to polled by the 
onTrigger method. Sub-classes
- * are responsible for providing the dispatcher implementations.
- *
- * @param <E> the type of events being produced
- */
-public abstract class AbstractListenEventProcessor<E extends Event> extends 
AbstractProcessor {
-
-
-
-    public static final PropertyDescriptor PORT = new PropertyDescriptor
-            .Builder().name("Port")
-            .description("The port to listen on for communication.")
-            .required(true)
-            .addValidator(StandardValidators.PORT_VALIDATOR)
-            .build();
-    public static final PropertyDescriptor CHARSET = new 
PropertyDescriptor.Builder()
-            .name("Character Set")
-            .description("Specifies the character set of the received data.")
-            .required(true)
-            .defaultValue("UTF-8")
-            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
-            .build();
-    public static final PropertyDescriptor RECV_BUFFER_SIZE = new 
PropertyDescriptor.Builder()
-            .name("Receive Buffer Size")
-            .description("The size of each buffer used to receive messages. 
Adjust this value appropriately based on the expected size of the " +
-                    "incoming messages.")
-            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
-            .defaultValue("65507 B")
-            .required(true)
-            .build();
-    public static final PropertyDescriptor MAX_SOCKET_BUFFER_SIZE = new 
PropertyDescriptor.Builder()
-            .name("Max Size of Socket Buffer")
-            .description("The maximum size of the socket buffer that should be 
used. This is a suggestion to the Operating System " +
-                    "to indicate how big the socket buffer should be. If this 
value is set too low, the buffer may fill up before " +
-                    "the data can be read, and incoming data will be dropped.")
-            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
-            .defaultValue("1 MB")
-            .required(true)
-            .build();
-    public static final PropertyDescriptor MAX_MESSAGE_QUEUE_SIZE = new 
PropertyDescriptor.Builder()
-            .name("Max Size of Message Queue")
-            .description("The maximum size of the internal queue used to 
buffer messages being transferred from the underlying channel to the processor. 
" +
-                    "Setting this value higher allows more messages to be 
buffered in memory during surges of incoming messages, but increases the total 
" +
-                    "memory used by the processor.")
-            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
-            .defaultValue("10000")
-            .required(true)
-            .build();
-
-    // Putting these properties here so sub-classes don't have to redefine 
them, but they are
-    // not added to the properties by default since not all processors may 
need them
-
-    public static final PropertyDescriptor MAX_CONNECTIONS = new 
PropertyDescriptor.Builder()
-            .name("Max Number of TCP Connections")
-            .description("The maximum number of concurrent TCP connections to 
accept.")
-            .addValidator(StandardValidators.createLongValidator(1, 65535, 
true))
-            .defaultValue("2")
-            .required(true)
-            .build();
-
-
-    public static final Relationship REL_SUCCESS = new Relationship.Builder()
-            .name("success")
-            .description("Messages received successfully will be sent out this 
relationship.")
-            .build();
-
-    public static final int POLL_TIMEOUT_MS = 20;
-
-    protected Set<Relationship> relationships;
-    protected List<PropertyDescriptor> descriptors;
-
-    protected volatile int port;
-    protected volatile Charset charset;
-    protected volatile ChannelDispatcher dispatcher;
-    protected volatile BlockingQueue<E> events;
-    protected volatile BlockingQueue<E> errorEvents = new 
LinkedBlockingQueue<>();
-
-    @Override
-    protected void init(final ProcessorInitializationContext context) {
-        final List<PropertyDescriptor> descriptors = new ArrayList<>();
-        descriptors.add(NETWORK_INTF_NAME);
-        descriptors.add(PORT);
-        descriptors.add(RECV_BUFFER_SIZE);
-        descriptors.add(MAX_MESSAGE_QUEUE_SIZE);
-        descriptors.add(MAX_SOCKET_BUFFER_SIZE);
-        descriptors.add(CHARSET);
-        descriptors.addAll(getAdditionalProperties());
-        this.descriptors = Collections.unmodifiableList(descriptors);
-
-        final Set<Relationship> relationships = new HashSet<>();
-        relationships.add(REL_SUCCESS);
-        relationships.addAll(getAdditionalRelationships());
-        this.relationships = Collections.unmodifiableSet(relationships);
-    }
-
-    /**
-     * Override to provide additional relationships for the processor.
-     *
-     * @return a list of relationships
-     */
-    protected List<Relationship> getAdditionalRelationships() {
-        return Collections.EMPTY_LIST;
-    }
-
-    /**
-     * Override to provide additional properties for the processor.
-     *
-     * @return a list of properties
-     */
-    protected List<PropertyDescriptor> getAdditionalProperties() {
-        return Collections.EMPTY_LIST;
-    }
-
-    @Override
-    public final Set<Relationship> getRelationships() {
-        return this.relationships;
-    }
-
-    @Override
-    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return descriptors;
-    }
-
-    @OnScheduled
-    public void onScheduled(final ProcessContext context) throws IOException {
-        charset = Charset.forName(context.getProperty(CHARSET).getValue());
-        port = context.getProperty(PORT).asInteger();
-        events = new 
LinkedBlockingQueue<>(context.getProperty(MAX_MESSAGE_QUEUE_SIZE).asInteger());
-
-        final String nicIPAddressStr = 
context.getProperty(NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue();
-        final int maxChannelBufferSize = 
context.getProperty(MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
-
-        InetAddress nicIPAddress = null;
-        if (!StringUtils.isEmpty(nicIPAddressStr)) {
-            NetworkInterface netIF = 
NetworkInterface.getByName(nicIPAddressStr);
-            nicIPAddress = netIF.getInetAddresses().nextElement();
-        }
-
-        // create the dispatcher and call open() to bind to the given port
-        dispatcher = createDispatcher(context, events);
-        dispatcher.open(nicIPAddress, port, maxChannelBufferSize);
-
-        // start a thread to run the dispatcher
-        final Thread readerThread = new Thread(dispatcher);
-        readerThread.setName(getClass().getName() + " [" + getIdentifier() + 
"]");
-        readerThread.setDaemon(true);
-        readerThread.start();
-    }
-
-    /**
-     * @param context the ProcessContext to retrieve property values from
-     * @return a ChannelDispatcher to handle incoming connections
-     *
-     * @throws IOException if unable to listen on the requested port
-     */
-    protected abstract ChannelDispatcher createDispatcher(final ProcessContext 
context, final BlockingQueue<E> events) throws IOException;
-
-    // used for testing to access the random port that was selected
-    public final int getDispatcherPort() {
-        return dispatcher == null ? 0 : dispatcher.getPort();
-    }
-
-    public int getErrorQueueSize() {
-        return errorEvents.size();
-    }
-
-    public int getQueueSize() {
-        return events == null ? 0 : events.size();
-    }
-
-    @OnUnscheduled
-    public void onUnscheduled() {
-        if (dispatcher != null) {
-            dispatcher.close();
-        }
-    }
-
-    /**
-     * Creates a pool of ByteBuffers with the given size.
-     *
-     * @param poolSize the number of buffers to initialize the pool with
-     * @param bufferSize the size of each buffer
-     * @return a blocking queue with size equal to poolSize and each buffer 
equal to bufferSize
-     */
-    protected BlockingQueue<ByteBuffer> createBufferPool(final int poolSize, 
final int bufferSize) {
-        final LinkedBlockingQueue<ByteBuffer> bufferPool = new 
LinkedBlockingQueue<>(poolSize);
-        for (int i = 0; i < poolSize; i++) {
-            bufferPool.offer(ByteBuffer.allocate(bufferSize));
-        }
-        return bufferPool;
-    }
-
-    /**
-     * If pollErrorQueue is true, the error queue will be checked first and 
event will be
-     * returned from the error queue if available.
-     *
-     * If pollErrorQueue is false, or no data is in the error queue, the 
regular queue is polled.
-     *
-     * If longPoll is true, the regular queue will be polled with a short 
timeout, otherwise it will
-     * poll with no timeout which will return immediately.
-     *
-     * @param longPoll whether or not to poll the main queue with a small 
timeout
-     * @param pollErrorQueue whether or not to poll the error queue first
-     *
-     * @return an event from one of the queues, or null if none are available
-     */
-    protected E getMessage(final boolean longPoll, final boolean 
pollErrorQueue, final ProcessSession session) {
-        E event = null;
-        if (pollErrorQueue) {
-            event = errorEvents.poll();
-        }
-
-        if (event == null) {
-            try {
-                if (longPoll) {
-                    event = events.poll(POLL_TIMEOUT_MS, 
TimeUnit.MILLISECONDS);
-                } else {
-                    event = events.poll();
-                }
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                return null;
-            }
-        }
-
-        if (event != null) {
-            session.adjustCounter("Messages Received", 1L, false);
-        }
-
-        return event;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/ListenerProperties.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/ListenerProperties.java
 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/ListenerProperties.java
deleted file mode 100644
index 5e4c639..0000000
--- 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/ListenerProperties.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processor.util.listen;
-
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.components.Validator;
-import org.apache.nifi.expression.AttributeExpression;
-
-import java.net.NetworkInterface;
-import java.net.SocketException;
-import java.util.Enumeration;
-import java.util.HashSet;
-import java.util.Set;
-
-/**
- * Shared properties.
- */
-public class ListenerProperties {
-
-    private static final Set<String> interfaceSet = new HashSet<>();
-
-    static {
-        try {
-            final Enumeration<NetworkInterface> interfaceEnum = 
NetworkInterface.getNetworkInterfaces();
-            while (interfaceEnum.hasMoreElements()) {
-                final NetworkInterface ifc = interfaceEnum.nextElement();
-                interfaceSet.add(ifc.getName());
-            }
-        } catch (SocketException e) {
-        }
-    }
-
-    public static final PropertyDescriptor NETWORK_INTF_NAME = new 
PropertyDescriptor.Builder()
-            .name("Local Network Interface")
-            .description("The name of a local network interface to be used to 
restrict listening to a specific LAN.")
-            .addValidator(new Validator() {
-                @Override
-                public ValidationResult validate(String subject, String input, 
ValidationContext context) {
-                    ValidationResult result = new ValidationResult.Builder()
-                            .subject("Local Network 
Interface").valid(true).input(input).build();
-                    if (interfaceSet.contains(input.toLowerCase())) {
-                        return result;
-                    }
-
-                    String message;
-                    String realValue = input;
-                    try {
-                        if (context.isExpressionLanguagePresent(input)) {
-                            AttributeExpression ae = 
context.newExpressionLanguageCompiler().compile(input);
-                            realValue = ae.evaluate();
-                        }
-
-                        if (interfaceSet.contains(realValue.toLowerCase())) {
-                            return result;
-                        }
-
-                        message = realValue + " is not a valid network name. 
Valid names are " + interfaceSet.toString();
-
-                    } catch (IllegalArgumentException e) {
-                        message = "Not a valid AttributeExpression: " + 
e.getMessage();
-                    }
-                    result = new ValidationResult.Builder().subject("Local 
Network Interface")
-                            
.valid(false).input(input).explanation(message).build();
-
-                    return result;
-                }
-            })
-            .expressionLanguageSupported(true)
-            .build();
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/AsyncChannelDispatcher.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/AsyncChannelDispatcher.java
 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/AsyncChannelDispatcher.java
deleted file mode 100644
index 5215a21..0000000
--- 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/AsyncChannelDispatcher.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processor.util.listen.dispatcher;
-
-import java.nio.channels.SelectionKey;
-
-/**
- * A ChannelDispatcher that handles channels asynchronously.
- */
-public interface AsyncChannelDispatcher extends ChannelDispatcher {
-
-    /**
-     * Informs the dispatcher that the connection for the given key is 
complete.
-     *
-     * @param key a key that was previously selected
-     */
-    void completeConnection(SelectionKey key);
-
-    /**
-     * Informs the dispatcher that the connection for the given key can be 
added back for selection.
-     *
-     * @param key a key that was previously selected
-     */
-    void addBackForSelection(SelectionKey key);
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/ChannelDispatcher.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/ChannelDispatcher.java
 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/ChannelDispatcher.java
deleted file mode 100644
index 444aeb1..0000000
--- 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/ChannelDispatcher.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processor.util.listen.dispatcher;
-
-import java.io.IOException;
-import java.net.InetAddress;
-
-/**
- * Dispatches handlers for a given channel.
- */
-public interface ChannelDispatcher extends Runnable {
-
-    /**
-     * Opens the dispatcher listening on the given port and attempts to set the
-     * OS socket buffer to maxBufferSize.
-     *
-     * @param nicAddress the local network interface to listen on, if null 
will listen on the wildcard address
-     *                   which means listening on all local network interfaces
-     *
-     * @param port the port to listen on
-     *
-     * @param maxBufferSize the size to set the OS socket buffer to
-     *
-     * @throws IOException if an error occurred listening on the given port
-     */
-    void open(InetAddress nicAddress, int port, int maxBufferSize) throws 
IOException;
-
-    /**
-     * @return the port being listened to
-     */
-    int getPort();
-
-    /**
-     * Closes all listeners and stops all handler threads.
-     */
-    void close();
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/DatagramChannelDispatcher.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/DatagramChannelDispatcher.java
 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/DatagramChannelDispatcher.java
deleted file mode 100644
index 69a1998..0000000
--- 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/DatagramChannelDispatcher.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processor.util.listen.dispatcher;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.util.listen.event.Event;
-import org.apache.nifi.processor.util.listen.event.EventFactory;
-import org.apache.nifi.processor.util.listen.event.EventFactoryUtil;
-import org.apache.nifi.processor.util.listen.event.EventQueue;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.net.StandardSocketOptions;
-import java.nio.ByteBuffer;
-import java.nio.channels.DatagramChannel;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-
-/**
- * Reads from the Datagram channel into an available buffer. If data is read 
then the buffer is queued for
- * processing, otherwise the buffer is returned to the buffer pool.
- */
-public class DatagramChannelDispatcher<E extends Event<DatagramChannel>> 
implements ChannelDispatcher {
-
-    private final EventFactory<E> eventFactory;
-    private final BlockingQueue<ByteBuffer> bufferPool;
-    private final EventQueue<E> events;
-    private final ComponentLog logger;
-    private final String sendingHost;
-    private final Integer sendingPort;
-
-    private Selector selector;
-    private DatagramChannel datagramChannel;
-    private volatile boolean stopped = false;
-
-    public DatagramChannelDispatcher(final EventFactory<E> eventFactory,
-                                     final BlockingQueue<ByteBuffer> 
bufferPool,
-                                     final BlockingQueue<E> events,
-                                     final ComponentLog logger) {
-        this(eventFactory, bufferPool, events, logger, null, null);
-    }
-
-    public DatagramChannelDispatcher(final EventFactory<E> eventFactory,
-                                     final BlockingQueue<ByteBuffer> 
bufferPool,
-                                     final BlockingQueue<E> events,
-                                     final ComponentLog logger,
-                                     final String sendingHost,
-                                     final Integer sendingPort) {
-        this.eventFactory = eventFactory;
-        this.bufferPool = bufferPool;
-        this.logger = logger;
-        this.sendingHost = sendingHost;
-        this.sendingPort = sendingPort;
-        this.events = new EventQueue<>(events, logger);
-
-        if (bufferPool == null || bufferPool.size() == 0) {
-            throw new IllegalArgumentException("A pool of available 
ByteBuffers is required");
-        }
-    }
-
-    @Override
-    public void open(final InetAddress nicAddress, final int port, final int 
maxBufferSize) throws IOException {
-        stopped = false;
-        datagramChannel = DatagramChannel.open();
-        datagramChannel.configureBlocking(false);
-
-        if (maxBufferSize > 0) {
-            datagramChannel.setOption(StandardSocketOptions.SO_RCVBUF, 
maxBufferSize);
-            final int actualReceiveBufSize = 
datagramChannel.getOption(StandardSocketOptions.SO_RCVBUF);
-            if (actualReceiveBufSize < maxBufferSize) {
-                logger.warn("Attempted to set Socket Buffer Size to " + 
maxBufferSize + " bytes but could only set to "
-                        + actualReceiveBufSize + "bytes. You may want to 
consider changing the Operating System's "
-                        + "maximum receive buffer");
-            }
-        }
-
-        // we don't have to worry about nicAddress being null here because 
InetSocketAddress already handles it
-        datagramChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
-        datagramChannel.socket().bind(new InetSocketAddress(nicAddress, port));
-
-        // if a sending host and port were provided then connect to that 
specific address to only receive
-        // datagrams from that host/port, otherwise we can receive datagrams 
from any host/port
-        if (sendingHost != null && sendingPort != null) {
-            datagramChannel.connect(new InetSocketAddress(sendingHost, 
sendingPort));
-        }
-
-        selector = Selector.open();
-        datagramChannel.register(selector, SelectionKey.OP_READ);
-    }
-
-    @Override
-    public void run() {
-        final ByteBuffer buffer = bufferPool.poll();
-        while (!stopped) {
-            try {
-                int selected = selector.select();
-                // if stopped the selector could already be closed which would 
result in a ClosedSelectorException
-                if (selected > 0 && !stopped) {
-                    Iterator<SelectionKey> selectorKeys = 
selector.selectedKeys().iterator();
-                    // if stopped we don't want to modify the keys because 
close() may still be in progress
-                    while (selectorKeys.hasNext() && !stopped) {
-                        SelectionKey key = selectorKeys.next();
-                        selectorKeys.remove();
-                        if (!key.isValid()) {
-                            continue;
-                        }
-                        DatagramChannel channel = (DatagramChannel) 
key.channel();
-                        SocketAddress socketAddress;
-                        buffer.clear();
-                        while (!stopped && (socketAddress = 
channel.receive(buffer)) != null) {
-                            String sender = "";
-                            if (socketAddress instanceof InetSocketAddress) {
-                                sender = ((InetSocketAddress) 
socketAddress).getAddress().toString();
-                            }
-
-                            // create a byte array from the buffer
-                            buffer.flip();
-                            byte bytes[] = new byte[buffer.limit()];
-                            buffer.get(bytes, 0, buffer.limit());
-
-                            final Map<String,String> metadata = 
EventFactoryUtil.createMapWithSender(sender);
-                            final E event = eventFactory.create(bytes, 
metadata, null);
-                            events.offer(event);
-
-                            buffer.clear();
-                        }
-                    }
-                }
-            } catch (InterruptedException e) {
-                stopped = true;
-                Thread.currentThread().interrupt();
-            } catch (IOException e) {
-                logger.error("Error reading from DatagramChannel", e);
-            }
-        }
-
-        if (buffer != null) {
-            try {
-                bufferPool.put(buffer);
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-            }
-        }
-    }
-
-    @Override
-    public int getPort() {
-        return datagramChannel == null ? 0 : 
datagramChannel.socket().getLocalPort();
-    }
-
-    @Override
-    public void close() {
-        stopped = true;
-        if (selector != null) {
-            selector.wakeup();
-        }
-        IOUtils.closeQuietly(selector);
-        IOUtils.closeQuietly(datagramChannel);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelAttachment.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelAttachment.java
 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelAttachment.java
deleted file mode 100644
index f2479f1..0000000
--- 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelAttachment.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processor.util.listen.dispatcher;
-
-import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
-
-import java.nio.ByteBuffer;
-
-/**
- * Wrapper class so we can attach a buffer and/or an SSLSocketChannel to the 
selector key.
- * */
-public class SocketChannelAttachment {
-
-    private final ByteBuffer byteBuffer;
-    private final SSLSocketChannel sslSocketChannel;
-
-    public SocketChannelAttachment(final ByteBuffer byteBuffer, final 
SSLSocketChannel sslSocketChannel) {
-        this.byteBuffer = byteBuffer;
-        this.sslSocketChannel = sslSocketChannel;
-    }
-
-    public ByteBuffer getByteBuffer() {
-        return byteBuffer;
-    }
-
-    public SSLSocketChannel getSslSocketChannel() {
-        return sslSocketChannel;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java
 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java
deleted file mode 100644
index b07fbb9..0000000
--- 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java
+++ /dev/null
@@ -1,284 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processor.util.listen.dispatcher;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.util.listen.event.Event;
-import org.apache.nifi.processor.util.listen.event.EventFactory;
-import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory;
-import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
-import org.apache.nifi.security.util.SslContextFactory;
-
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.StandardSocketOptions;
-import java.nio.ByteBuffer;
-import java.nio.channels.Channel;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-import java.nio.charset.Charset;
-import java.util.Iterator;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Accepts Socket connections on the given port and creates a handler for each 
connection to
- * be executed by a thread pool.
- */
-public class SocketChannelDispatcher<E extends Event<SocketChannel>> 
implements AsyncChannelDispatcher {
-
-    private final EventFactory<E> eventFactory;
-    private final ChannelHandlerFactory<E, AsyncChannelDispatcher> 
handlerFactory;
-    private final BlockingQueue<ByteBuffer> bufferPool;
-    private final BlockingQueue<E> events;
-    private final ComponentLog logger;
-    private final int maxConnections;
-    private final SSLContext sslContext;
-    private final SslContextFactory.ClientAuth clientAuth;
-    private final Charset charset;
-
-    private ExecutorService executor;
-    private volatile boolean stopped = false;
-    private Selector selector;
-    private final BlockingQueue<SelectionKey> keyQueue;
-    private final AtomicInteger currentConnections = new AtomicInteger(0);
-
-    public SocketChannelDispatcher(final EventFactory<E> eventFactory,
-                                   final ChannelHandlerFactory<E, 
AsyncChannelDispatcher> handlerFactory,
-                                   final BlockingQueue<ByteBuffer> bufferPool,
-                                   final BlockingQueue<E> events,
-                                   final ComponentLog logger,
-                                   final int maxConnections,
-                                   final SSLContext sslContext,
-                                   final Charset charset) {
-        this(eventFactory, handlerFactory, bufferPool, events, logger, 
maxConnections, sslContext, SslContextFactory.ClientAuth.REQUIRED, charset);
-    }
-
-    public SocketChannelDispatcher(final EventFactory<E> eventFactory,
-                                   final ChannelHandlerFactory<E, 
AsyncChannelDispatcher> handlerFactory,
-                                   final BlockingQueue<ByteBuffer> bufferPool,
-                                   final BlockingQueue<E> events,
-                                   final ComponentLog logger,
-                                   final int maxConnections,
-                                   final SSLContext sslContext,
-                                   final SslContextFactory.ClientAuth 
clientAuth,
-                                   final Charset charset) {
-        this.eventFactory = eventFactory;
-        this.handlerFactory = handlerFactory;
-        this.bufferPool = bufferPool;
-        this.events = events;
-        this.logger = logger;
-        this.maxConnections = maxConnections;
-        this.keyQueue = new LinkedBlockingQueue<>(maxConnections);
-        this.sslContext = sslContext;
-        this.clientAuth = clientAuth;
-        this.charset = charset;
-
-        if (bufferPool == null || bufferPool.size() == 0 || bufferPool.size() 
!= maxConnections) {
-            throw new IllegalArgumentException(
-                    "A pool of available ByteBuffers equal to the maximum 
number of connections is required");
-        }
-    }
-
-    @Override
-    public void open(final InetAddress nicAddress, final int port, final int 
maxBufferSize) throws IOException {
-        stopped = false;
-        executor = Executors.newFixedThreadPool(maxConnections);
-
-        final ServerSocketChannel serverSocketChannel = 
ServerSocketChannel.open();
-        serverSocketChannel.configureBlocking(false);
-        if (maxBufferSize > 0) {
-            serverSocketChannel.setOption(StandardSocketOptions.SO_RCVBUF, 
maxBufferSize);
-            final int actualReceiveBufSize = 
serverSocketChannel.getOption(StandardSocketOptions.SO_RCVBUF);
-            if (actualReceiveBufSize < maxBufferSize) {
-                logger.warn("Attempted to set Socket Buffer Size to " + 
maxBufferSize + " bytes but could only set to "
-                        + actualReceiveBufSize + "bytes. You may want to 
consider changing the Operating System's "
-                        + "maximum receive buffer");
-            }
-        }
-
-        serverSocketChannel.socket().bind(new InetSocketAddress(nicAddress, 
port));
-
-        selector = Selector.open();
-        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
-    }
-
-    @Override
-    public void run() {
-        while (!stopped) {
-            try {
-                int selected = selector.select();
-                // if stopped the selector could already be closed which would 
result in a ClosedSelectorException
-                if (selected > 0 && !stopped){
-                    Iterator<SelectionKey> selectorKeys = 
selector.selectedKeys().iterator();
-                    // if stopped we don't want to modify the keys because 
close() may still be in progress
-                    while (selectorKeys.hasNext() && !stopped) {
-                        SelectionKey key = selectorKeys.next();
-                        selectorKeys.remove();
-                        if (!key.isValid()){
-                            continue;
-                        }
-                        if (key.isAcceptable()) {
-                            // Handle new connections coming in
-                            final ServerSocketChannel channel = 
(ServerSocketChannel) key.channel();
-                            final SocketChannel socketChannel = 
channel.accept();
-                            // Check for available connections
-                            if (currentConnections.incrementAndGet() > 
maxConnections){
-                                currentConnections.decrementAndGet();
-                                logger.warn("Rejecting connection from {} 
because max connections has been met",
-                                        new Object[]{ 
socketChannel.getRemoteAddress().toString() });
-                                IOUtils.closeQuietly(socketChannel);
-                                continue;
-                            }
-                            logger.debug("Accepted incoming connection from 
{}",
-                                    new 
Object[]{socketChannel.getRemoteAddress().toString()});
-                            // Set socket to non-blocking, and register with 
selector
-                            socketChannel.configureBlocking(false);
-                            SelectionKey readKey = 
socketChannel.register(selector, SelectionKey.OP_READ);
-
-                            // Prepare the byte buffer for the reads, clear it 
out
-                            ByteBuffer buffer = bufferPool.poll();
-                            buffer.clear();
-                            buffer.mark();
-
-                            // If we have an SSLContext then create an 
SSLEngine for the channel
-                            SSLSocketChannel sslSocketChannel = null;
-                            if (sslContext != null) {
-                                final SSLEngine sslEngine = 
sslContext.createSSLEngine();
-                                sslEngine.setUseClientMode(false);
-
-                                switch (clientAuth) {
-                                    case REQUIRED:
-                                        sslEngine.setNeedClientAuth(true);
-                                        break;
-                                    case WANT:
-                                        sslEngine.setWantClientAuth(true);
-                                        break;
-                                    case NONE:
-                                        sslEngine.setNeedClientAuth(false);
-                                        sslEngine.setWantClientAuth(false);
-                                        break;
-                                }
-
-                                sslSocketChannel = new 
SSLSocketChannel(sslEngine, socketChannel);
-                            }
-
-                            // Attach the buffer and SSLSocketChannel to the 
key
-                            SocketChannelAttachment attachment = new 
SocketChannelAttachment(buffer, sslSocketChannel);
-                            readKey.attach(attachment);
-                        } else if (key.isReadable()) {
-                            // Clear out the operations the select is 
interested in until done reading
-                            key.interestOps(0);
-                            // Create a handler based on the protocol and 
whether an SSLEngine was provided or not
-                            final Runnable handler;
-                            if (sslContext != null) {
-                                handler = handlerFactory.createSSLHandler(key, 
this, charset, eventFactory, events, logger);
-                            } else {
-                                handler = handlerFactory.createHandler(key, 
this, charset, eventFactory, events, logger);
-                            }
-
-                            // run the handler
-                            executor.execute(handler);
-                        }
-                    }
-                }
-                // Add back all idle sockets to the select
-                SelectionKey key;
-                while((key = keyQueue.poll()) != null){
-                    key.interestOps(SelectionKey.OP_READ);
-                }
-            } catch (IOException e) {
-                logger.error("Error accepting connection from SocketChannel", 
e);
-            }
-        }
-    }
-
-    @Override
-    public int getPort() {
-        // Return the port for the key listening for accepts
-        for(SelectionKey key : selector.keys()){
-            if (key.isValid()) {
-                final Channel channel = key.channel();
-                if (channel instanceof  ServerSocketChannel) {
-                    return 
((ServerSocketChannel)channel).socket().getLocalPort();
-                }
-            }
-        }
-        return 0;
-    }
-
-    @Override
-    public void close() {
-        stopped = true;
-        if (selector != null) {
-            selector.wakeup();
-        }
-
-        if (executor != null) {
-            executor.shutdown();
-            try {
-                // Wait a while for existing tasks to terminate
-                if (!executor.awaitTermination(1000L, TimeUnit.MILLISECONDS)) {
-                    executor.shutdownNow();
-                }
-            } catch (InterruptedException ie) {
-                // (Re-)Cancel if current thread also interrupted
-                executor.shutdownNow();
-                // Preserve interrupt status
-                Thread.currentThread().interrupt();
-            }
-        }
-
-        if (selector != null) {
-            synchronized (selector.keys()) {
-                for (SelectionKey key : selector.keys()) {
-                    IOUtils.closeQuietly(key.channel());
-                }
-            }
-        }
-        IOUtils.closeQuietly(selector);
-    }
-
-    @Override
-    public void completeConnection(SelectionKey key) {
-        // connection is done. Return the buffer to the pool
-        SocketChannelAttachment attachment = (SocketChannelAttachment) 
key.attachment();
-        try {
-            bufferPool.put(attachment.getByteBuffer());
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-        }
-        currentConnections.decrementAndGet();
-    }
-
-    @Override
-    public void addBackForSelection(SelectionKey key) {
-        keyQueue.offer(key);
-        selector.wakeup();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/Event.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/Event.java
 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/Event.java
deleted file mode 100644
index 83989f8..0000000
--- 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/Event.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processor.util.listen.event;
-
-import org.apache.nifi.processor.util.listen.response.ChannelResponder;
-
-import java.nio.channels.SelectableChannel;
-
-/**
- * An event that was read from a channel.
- *
- * @param <C> the type of SelectableChannel the event was read from
- */
-public interface Event<C extends SelectableChannel> {
-
-    /**
-     * @return the sending host of the data
-     */
-    String getSender();
-
-    /**
-     * @return raw data for this event
-     */
-    byte[] getData();
-
-    /**
-     * @return the responder to use for responding to this event, or null
-     *              if responses are not supported
-     */
-    ChannelResponder<C> getResponder();
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/EventFactory.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/EventFactory.java
 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/EventFactory.java
deleted file mode 100644
index 1bd9f0d..0000000
--- 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/EventFactory.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processor.util.listen.event;
-
-import org.apache.nifi.processor.util.listen.response.ChannelResponder;
-
-import java.util.Map;
-
-/**
- * Factory to create instances of a given type of Event.
- */
-public interface EventFactory<E extends Event> {
-
-    /**
-     * The key in the metadata map for the sender.
-     */
-    String SENDER_KEY = "sender";
-
-    /**
-     * Creates an event for the given data and metadata.
-     *
-     * @param data raw data from a channel
-     * @param metadata additional metadata
-     * @param responder a responder for the event with the channel populated
-     *
-     * @return an instance of the given type
-     */
-    E create(final byte[] data, final Map<String, String> metadata, final 
ChannelResponder responder);
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/EventFactoryUtil.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/EventFactoryUtil.java
 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/EventFactoryUtil.java
deleted file mode 100644
index 54529cf..0000000
--- 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/EventFactoryUtil.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processor.util.listen.event;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Utility methods for EventFactory.
- */
-public class EventFactoryUtil {
-
-    public static Map<String,String> createMapWithSender(final String sender) {
-        Map<String,String> metadata = new HashMap<>();
-        metadata.put(EventFactory.SENDER_KEY, sender);
-        return metadata;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/EventQueue.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/EventQueue.java
 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/EventQueue.java
deleted file mode 100644
index 35e9ae0..0000000
--- 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/EventQueue.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processor.util.listen.event;
-
-import org.apache.commons.lang3.Validate;
-import org.apache.nifi.logging.ComponentLog;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Wraps a BlockingQueue to centralize logic for offering events across UDP, 
TCP, and SSL.
- *
- * @param <E> the type of event
- */
-public class EventQueue<E extends Event> {
-
-    /**
-     * The default number of milliseconds to wait when offering new events to 
the queue.
-     */
-    public static final long DEFAULT_OFFER_WAIT_MS = 100;
-
-    private final long offerWaitMs;
-    private final BlockingQueue<E> events;
-    private final ComponentLog logger;
-
-    public EventQueue(final BlockingQueue<E> events, final ComponentLog 
logger) {
-        this(events, DEFAULT_OFFER_WAIT_MS, logger);
-    }
-
-    public EventQueue(final BlockingQueue<E> events, final long offerWaitMs, 
final ComponentLog logger) {
-        this.events = events;
-        this.offerWaitMs = offerWaitMs;
-        this.logger = logger;
-        Validate.notNull(this.events);
-        Validate.notNull(this.logger);
-    }
-
-    /**
-     * Offers the given event to the events queue with a wait time, if the 
offer fails the event
-     * is dropped an error is logged.
-     *
-     * @param event the event to offer
-     * @throws InterruptedException if interrupted while waiting to offer
-     */
-    public void offer(final E event) throws InterruptedException {
-        boolean queued = events.offer(event, offerWaitMs, 
TimeUnit.MILLISECONDS);
-        if (!queued) {
-            logger.error("Internal queue at maximum capacity, could not queue 
event");
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/StandardEvent.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/StandardEvent.java
 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/StandardEvent.java
deleted file mode 100644
index fa3699e..0000000
--- 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/StandardEvent.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processor.util.listen.event;
-
-import org.apache.nifi.processor.util.listen.response.ChannelResponder;
-
-import java.nio.channels.SelectableChannel;
-
-/**
- * Standard implementation of Event.
- */
-public class StandardEvent<C extends SelectableChannel> implements Event<C> {
-
-    private final String sender;
-    private final byte[] data;
-    private final ChannelResponder<C> responder;
-
-    public StandardEvent(final String sender, final byte[] data, final 
ChannelResponder<C> responder) {
-        this.sender = sender;
-        this.data = data;
-        this.responder = responder;
-    }
-
-    @Override
-    public String getSender() {
-        return sender;
-    }
-
-    @Override
-    public byte[] getData() {
-        return data;
-    }
-
-    public ChannelResponder<C> getResponder() {
-        return responder;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/StandardEventFactory.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/StandardEventFactory.java
 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/StandardEventFactory.java
deleted file mode 100644
index 9ae6161..0000000
--- 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/event/StandardEventFactory.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processor.util.listen.event;
-
-import org.apache.nifi.processor.util.listen.response.ChannelResponder;
-
-import java.util.Map;
-
-/**
- * EventFactory to create StandardEvent instances.
- */
-public class StandardEventFactory implements EventFactory<StandardEvent> {
-
-    @Override
-    public StandardEvent create(final byte[] data, final Map<String, String> 
metadata, final ChannelResponder responder) {
-        String sender = null;
-        if (metadata != null && metadata.containsKey(EventFactory.SENDER_KEY)) 
{
-            sender = metadata.get(EventFactory.SENDER_KEY);
-        }
-        return new StandardEvent(sender, data, responder);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/ChannelHandler.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/ChannelHandler.java
 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/ChannelHandler.java
deleted file mode 100644
index 84ef062..0000000
--- 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/ChannelHandler.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processor.util.listen.handler;
-
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
-import org.apache.nifi.processor.util.listen.event.Event;
-import org.apache.nifi.processor.util.listen.event.EventFactory;
-import org.apache.nifi.processor.util.listen.event.EventQueue;
-
-import java.nio.channels.SelectionKey;
-import java.nio.charset.Charset;
-import java.util.concurrent.BlockingQueue;
-
-/**
- * Base class for all channel handlers.
- */
-public abstract class ChannelHandler<E extends Event, D extends 
ChannelDispatcher> implements Runnable {
-
-    protected final SelectionKey key;
-    protected final D dispatcher;
-    protected final Charset charset;
-    protected final EventFactory<E> eventFactory;
-    protected final EventQueue<E> events;
-    protected final ComponentLog logger;
-
-    public ChannelHandler(final SelectionKey key,
-                          final D dispatcher,
-                          final Charset charset,
-                          final EventFactory<E> eventFactory,
-                          final BlockingQueue<E> events,
-                          final ComponentLog logger) {
-        this.key = key;
-        this.dispatcher = dispatcher;
-        this.charset = charset;
-        this.eventFactory = eventFactory;
-        this.logger = logger;
-        this.events = new EventQueue<E>(events, logger);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/ChannelHandlerFactory.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/ChannelHandlerFactory.java
 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/ChannelHandlerFactory.java
deleted file mode 100644
index 9ca6bdd..0000000
--- 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/ChannelHandlerFactory.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processor.util.listen.handler;
-
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
-import org.apache.nifi.processor.util.listen.event.Event;
-import org.apache.nifi.processor.util.listen.event.EventFactory;
-
-import java.nio.channels.SelectionKey;
-import java.nio.charset.Charset;
-import java.util.concurrent.BlockingQueue;
-
-/**
- * Factory that can produce ChannelHandlers for the given type of Event and 
ChannelDispatcher.
- */
-public interface ChannelHandlerFactory<E extends Event, D extends 
ChannelDispatcher> {
-
-    ChannelHandler<E, D> createHandler(final SelectionKey key,
-                                    final D dispatcher,
-                                    final Charset charset,
-                                    final EventFactory<E> eventFactory,
-                                    final BlockingQueue<E> events,
-                                    final ComponentLog logger);
-
-    ChannelHandler<E, D> createSSLHandler(final SelectionKey key,
-                                       final D dispatcher,
-                                       final Charset charset,
-                                       final EventFactory<E> eventFactory,
-                                       final BlockingQueue<E> events,
-                                       final ComponentLog logger);
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SSLSocketChannelHandler.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SSLSocketChannelHandler.java
 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SSLSocketChannelHandler.java
deleted file mode 100644
index ef747e1..0000000
--- 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SSLSocketChannelHandler.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processor.util.listen.handler.socket;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
-import 
org.apache.nifi.processor.util.listen.dispatcher.SocketChannelAttachment;
-import org.apache.nifi.processor.util.listen.event.Event;
-import org.apache.nifi.processor.util.listen.event.EventFactory;
-import org.apache.nifi.processor.util.listen.event.EventFactoryUtil;
-import 
org.apache.nifi.processor.util.listen.response.socket.SSLSocketChannelResponder;
-import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.SocketTimeoutException;
-import java.nio.ByteBuffer;
-import java.nio.channels.ClosedByInterruptException;
-import java.nio.channels.ClosedChannelException;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
-import java.nio.charset.Charset;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-
-/**
- * Wraps a SocketChannel with an SSLSocketChannel for receiving messages over 
TLS.
- */
-public class SSLSocketChannelHandler<E extends Event<SocketChannel>> extends 
SocketChannelHandler<E> {
-
-    private final ByteArrayOutputStream currBytes = new 
ByteArrayOutputStream(4096);
-
-    public SSLSocketChannelHandler(final SelectionKey key,
-                                   final AsyncChannelDispatcher dispatcher,
-                                   final Charset charset,
-                                   final EventFactory<E> eventFactory,
-                                   final BlockingQueue<E> events,
-                                   final ComponentLog logger) {
-        super(key, dispatcher, charset, eventFactory, events, logger);
-    }
-
-    @Override
-    public void run() {
-        boolean eof = false;
-        SSLSocketChannel sslSocketChannel = null;
-        try {
-            int bytesRead;
-            final SocketChannel socketChannel = (SocketChannel) key.channel();
-            final SocketChannelAttachment attachment = 
(SocketChannelAttachment) key.attachment();
-
-            // get the SSLSocketChannel from the attachment
-            sslSocketChannel = attachment.getSslSocketChannel();
-
-            // SSLSocketChannel deals with byte[] so ByteBuffer isn't used 
here, but we'll use the size to create a new byte[]
-            final ByteBuffer socketBuffer = attachment.getByteBuffer();
-            byte[] socketBufferArray = new byte[socketBuffer.limit()];
-
-            // read until no more data
-            try {
-                while ((bytesRead = sslSocketChannel.read(socketBufferArray)) 
> 0) {
-                    processBuffer(sslSocketChannel, socketChannel, bytesRead, 
socketBufferArray);
-                    logger.debug("bytes read from sslSocketChannel {}", new 
Object[]{bytesRead});
-                }
-            } catch (SocketTimeoutException ste) {
-                // SSLSocketChannel will throw this exception when 0 bytes are 
read and the timeout threshold
-                // is exceeded, we don't want to close the connection in this 
case
-                bytesRead = 0;
-            }
-
-            // Check for closed socket
-            if( bytesRead < 0 ){
-                eof = true;
-                logger.debug("Reached EOF, closing connection");
-            } else {
-                logger.debug("No more data available, returning for 
selection");
-            }
-        } catch (ClosedByInterruptException | InterruptedException e) {
-            logger.debug("read loop interrupted, closing connection");
-            // Treat same as closed socket
-            eof = true;
-        } catch (ClosedChannelException e) {
-            // ClosedChannelException doesn't have a message so handle it 
separately from IOException
-            logger.error("Error reading from channel due to channel being 
closed", e);
-            // Treat same as closed socket
-            eof = true;
-        } catch (IOException e) {
-            logger.error("Error reading from channel due to {}", new Object[] 
{e.getMessage()}, e);
-            // Treat same as closed socket
-            eof = true;
-        } finally {
-            if(eof == true) {
-                IOUtils.closeQuietly(sslSocketChannel);
-                dispatcher.completeConnection(key);
-            } else {
-                dispatcher.addBackForSelection(key);
-            }
-        }
-    }
-
-    /**
-     * Process the contents of the buffer. Give sub-classes a chance to 
override this behavior.
-     *
-     * @param sslSocketChannel the channel the data was read from
-     * @param socketChannel the socket channel being wrapped by 
sslSocketChannel
-     * @param bytesRead the number of bytes read
-     * @param buffer the buffer to process
-     * @throws InterruptedException thrown if interrupted while queuing events
-     */
-    protected void processBuffer(final SSLSocketChannel sslSocketChannel, 
final SocketChannel socketChannel,
-                                 final int bytesRead, final byte[] buffer) 
throws InterruptedException, IOException {
-        final InetAddress sender = socketChannel.socket().getInetAddress();
-
-        // go through the buffer looking for the end of each message
-        for (int i = 0; i < bytesRead; i++) {
-            final byte currByte = buffer[i];
-
-            // check if at end of a message
-            if (currByte == getDelimiter()) {
-                if (currBytes.size() > 0) {
-                    final SSLSocketChannelResponder response = new 
SSLSocketChannelResponder(socketChannel, sslSocketChannel);
-                    final Map<String, String> metadata = 
EventFactoryUtil.createMapWithSender(sender.toString());
-                    final E event = 
eventFactory.create(currBytes.toByteArray(), metadata, response);
-                    events.offer(event);
-                    currBytes.reset();
-                }
-            } else {
-                currBytes.write(currByte);
-            }
-        }
-    }
-
-    @Override
-    public byte getDelimiter() {
-        return TCP_DELIMITER;
-    }
-
-}

Reply via email to