http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/PeerStatusCache.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/PeerStatusCache.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/PeerStatusCache.java
new file mode 100644
index 0000000..c52b4b7
--- /dev/null
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/PeerStatusCache.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.util;
+
+import java.util.Set;
+
+import org.apache.nifi.remote.PeerStatus;
+
+public class PeerStatusCache {
+
+    private final Set<PeerStatus> statuses;
+    private final long timestamp;
+
+    public PeerStatusCache(final Set<PeerStatus> statuses) {
+        this(statuses, System.currentTimeMillis());
+    }
+
+    public PeerStatusCache(final Set<PeerStatus> statuses, final long 
timestamp) {
+        this.statuses = statuses;
+        this.timestamp = timestamp;
+    }
+
+    public Set<PeerStatus> getStatuses() {
+        return statuses;
+    }
+
+    public long getTimestamp() {
+        return timestamp;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/StandardDataPacket.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/StandardDataPacket.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/StandardDataPacket.java
new file mode 100644
index 0000000..70bb324
--- /dev/null
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/StandardDataPacket.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.util;
+
+import java.io.InputStream;
+import java.util.Map;
+
+import org.apache.nifi.remote.protocol.DataPacket;
+import org.apache.nifi.stream.io.LimitingInputStream;
+import org.apache.nifi.stream.io.MinimumLengthInputStream;
+
+public class StandardDataPacket implements DataPacket {
+
+    private final Map<String, String> attributes;
+    private final InputStream stream;
+    private final long size;
+
+    public StandardDataPacket(final Map<String, String> attributes, final 
InputStream stream, final long size) {
+        this.attributes = attributes;
+        this.stream = new MinimumLengthInputStream(new 
LimitingInputStream(stream, size), size);
+        this.size = size;
+    }
+
+    public Map<String, String> getAttributes() {
+        return attributes;
+    }
+
+    public InputStream getData() {
+        return stream;
+    }
+
+    public long getSize() {
+        return size;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java
 
b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java
new file mode 100644
index 0000000..8336745
--- /dev/null
+++ 
b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.client.socket;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.nifi.remote.PeerStatus;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.cluster.ClusterNodeInformation;
+import org.apache.nifi.remote.cluster.NodeInformation;
+import org.junit.Test;
+
+public class TestEndpointConnectionStatePool {
+
+    @Test
+    public void testFormulateDestinationListForOutput() throws IOException {
+        final ClusterNodeInformation clusterNodeInfo = new 
ClusterNodeInformation();
+        final List<NodeInformation> collection = new ArrayList<>();
+        collection.add(new NodeInformation("ShouldGetMedium", 1, 1111, true, 
4096));
+        collection.add(new NodeInformation("ShouldGetLots", 2, 2222, true, 
10240));
+        collection.add(new NodeInformation("ShouldGetLittle", 3, 3333, true, 
1024));
+        collection.add(new NodeInformation("ShouldGetMedium", 4, 4444, true, 
4096));
+        collection.add(new NodeInformation("ShouldGetMedium", 5, 5555, true, 
4096));
+
+        clusterNodeInfo.setNodeInformation(collection);
+        final List<PeerStatus> destinations = 
EndpointConnectionPool.formulateDestinationList(clusterNodeInfo, 
TransferDirection.RECEIVE);
+        for (final PeerStatus peerStatus : destinations) {
+            System.out.println(peerStatus.getPeerDescription());
+        }
+    }
+
+    @Test
+    public void testFormulateDestinationListForOutputHugeDifference() throws 
IOException {
+        final ClusterNodeInformation clusterNodeInfo = new 
ClusterNodeInformation();
+        final List<NodeInformation> collection = new ArrayList<>();
+        collection.add(new NodeInformation("ShouldGetLittle", 1, 1111, true, 
500));
+        collection.add(new NodeInformation("ShouldGetLots", 2, 2222, true, 
50000));
+
+        clusterNodeInfo.setNodeInformation(collection);
+        final List<PeerStatus> destinations = 
EndpointConnectionPool.formulateDestinationList(clusterNodeInfo, 
TransferDirection.RECEIVE);
+        for (final PeerStatus peerStatus : destinations) {
+            System.out.println(peerStatus.getPeerDescription());
+        }
+    }
+
+    @Test
+    public void testFormulateDestinationListForInputPorts() throws IOException 
{
+        final ClusterNodeInformation clusterNodeInfo = new 
ClusterNodeInformation();
+        final List<NodeInformation> collection = new ArrayList<>();
+        collection.add(new NodeInformation("ShouldGetMedium", 1, 1111, true, 
4096));
+        collection.add(new NodeInformation("ShouldGetLittle", 2, 2222, true, 
10240));
+        collection.add(new NodeInformation("ShouldGetLots", 3, 3333, true, 
1024));
+        collection.add(new NodeInformation("ShouldGetMedium", 4, 4444, true, 
4096));
+        collection.add(new NodeInformation("ShouldGetMedium", 5, 5555, true, 
4096));
+
+        clusterNodeInfo.setNodeInformation(collection);
+        final List<PeerStatus> destinations = 
EndpointConnectionPool.formulateDestinationList(clusterNodeInfo, 
TransferDirection.SEND);
+        for (final PeerStatus peerStatus : destinations) {
+            System.out.println(peerStatus.getPeerDescription());
+        }
+    }
+
+    @Test
+    public void testFormulateDestinationListForInputPortsHugeDifference() 
throws IOException {
+        final ClusterNodeInformation clusterNodeInfo = new 
ClusterNodeInformation();
+        final List<NodeInformation> collection = new ArrayList<>();
+        collection.add(new NodeInformation("ShouldGetLots", 1, 1111, true, 
500));
+        collection.add(new NodeInformation("ShouldGetLittle", 2, 2222, true, 
50000));
+
+        clusterNodeInfo.setNodeInformation(collection);
+        final List<PeerStatus> destinations = 
EndpointConnectionPool.formulateDestinationList(clusterNodeInfo, 
TransferDirection.SEND);
+        for (final PeerStatus peerStatus : destinations) {
+            System.out.println(peerStatus.getPeerDescription());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java
 
b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java
new file mode 100644
index 0000000..155fc95
--- /dev/null
+++ 
b/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.client.socket;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.remote.protocol.DataPacket;
+import org.apache.nifi.remote.util.StandardDataPacket;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+public class TestSiteToSiteClient {
+
+    @Test
+    @Ignore("For local testing only; not really a unit test but a manual test")
+    public void testReceive() throws IOException {
+        
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote", 
"DEBUG");
+
+        final SiteToSiteClient client = new SiteToSiteClient.Builder()
+                .url("http://localhost:8080/nifi";)
+                .portName("cba")
+                .requestBatchCount(10)
+                .build();
+
+        try {
+            for (int i = 0; i < 1000; i++) {
+                final Transaction transaction = 
client.createTransaction(TransferDirection.RECEIVE);
+                Assert.assertNotNull(transaction);
+
+                DataPacket packet;
+                while (true) {
+                    packet = transaction.receive();
+                    if (packet == null) {
+                        break;
+                    }
+
+                    final InputStream in = packet.getData();
+                    final long size = packet.getSize();
+                    final byte[] buff = new byte[(int) size];
+
+                    StreamUtils.fillBuffer(in, buff);
+                }
+
+                transaction.confirm();
+                transaction.complete();
+            }
+        } finally {
+            client.close();
+        }
+    }
+
+    @Test
+    @Ignore("For local testing only; not really a unit test but a manual test")
+    public void testSend() throws IOException {
+        
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote", 
"DEBUG");
+
+        final SiteToSiteClient client = new SiteToSiteClient.Builder()
+                .url("http://localhost:8080/nifi";)
+                .portName("input")
+                .build();
+
+        try {
+            final Transaction transaction = 
client.createTransaction(TransferDirection.SEND);
+            Assert.assertNotNull(transaction);
+
+            final Map<String, String> attrs = new HashMap<>();
+            attrs.put("site-to-site", "yes, please!");
+            final byte[] bytes = "Hello".getBytes();
+            final ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+            final DataPacket packet = new StandardDataPacket(attrs, bais, 
bytes.length);
+            transaction.send(packet);
+
+            transaction.confirm();
+            transaction.complete();
+        } finally {
+            client.close();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-socket-utils/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-socket-utils/pom.xml 
b/nifi-commons/nifi-socket-utils/pom.xml
new file mode 100644
index 0000000..4ed05ef
--- /dev/null
+++ b/nifi-commons/nifi-socket-utils/pom.xml
@@ -0,0 +1,55 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-commons</artifactId>
+        <version>0.3.0-SNAPSHOT</version>
+    </parent>
+    <artifactId>nifi-socket-utils</artifactId>
+    <description>Utilities for socket communication</description>
+    <dependencies>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-logging-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>commons-net</groupId>
+            <artifactId>commons-net</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-properties</artifactId>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/AbstractChannelReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/AbstractChannelReader.java
 
b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/AbstractChannelReader.java
new file mode 100644
index 0000000..cc24575
--- /dev/null
+++ 
b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/AbstractChannelReader.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io.nio;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.nifi.io.nio.consumer.StreamConsumer;
+import org.apache.nifi.io.nio.consumer.StreamConsumerFactory;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractChannelReader implements Runnable {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(AbstractChannelReader.class);
+    private final String uniqueId;
+    private final SelectionKey key;
+    private final BufferPool bufferPool;
+    private final StreamConsumer consumer;
+    private final AtomicBoolean isClosed = new AtomicBoolean(false);
+    private final AtomicReference<ScheduledFuture<?>> future = new 
AtomicReference<>(null);//the future on which this reader runs...
+
+    public AbstractChannelReader(final String id, final SelectionKey key, 
final BufferPool empties, final StreamConsumerFactory consumerFactory) {
+        this.uniqueId = id;
+        this.key = key;
+        this.bufferPool = empties;
+        this.consumer = consumerFactory.newInstance(id);
+        consumer.setReturnBufferQueue(bufferPool);
+    }
+
+    protected void setScheduledFuture(final ScheduledFuture<?> future) {
+        this.future.set(future);
+    }
+
+    protected ScheduledFuture<?> getScheduledFuture() {
+        return future.get();
+    }
+
+    protected SelectionKey getSelectionKey() {
+        return key;
+    }
+
+    public boolean isClosed() {
+        return isClosed.get();
+    }
+
+    private void closeStream() {
+        if (isClosed.get()) {
+            return;
+        }
+        try {
+            isClosed.set(true);
+            future.get().cancel(false);
+            key.cancel();
+            key.channel().close();
+        } catch (final IOException ioe) {
+            LOGGER.warn("Unable to cleanly close stream due to " + ioe);
+        } finally {
+            consumer.signalEndOfStream();
+        }
+    }
+
+    /**
+     * Allows a subclass to specifically handle how it reads from the given
+     * key's channel into the given buffer.
+     *
+     * @param key of channel to read from
+     * @param buffer to fill
+     * @return the number of bytes read in the final read cycle. A value of 
zero
+     * or more indicates the channel is still open but a value of -1 indicates
+     * end of stream.
+     * @throws IOException if reading from channel causes failure
+     */
+    protected abstract int fillBuffer(SelectionKey key, ByteBuffer buffer) 
throws IOException;
+
+    @Override
+    public final void run() {
+        if (!key.isValid() || consumer.isConsumerFinished()) {
+            closeStream();
+            return;
+        }
+        if (!key.isReadable()) {
+            return;//there is nothing available to read...or we aren't allow 
to read due to throttling
+        }
+        ByteBuffer buffer = null;
+        try {
+            buffer = bufferPool.poll();
+            if (buffer == null) {
+                return; // no buffers available - come back later
+            }
+            final int bytesRead = fillBuffer(key, buffer);
+            buffer.flip();
+            if (buffer.remaining() > 0) {
+                consumer.addFilledBuffer(buffer);
+                buffer = null; //clear the reference - is now the consumer's 
responsiblity
+            } else {
+                buffer.clear();
+                bufferPool.returnBuffer(buffer, 0);
+                buffer = null; //clear the reference - is now back to the queue
+            }
+            if (bytesRead < 0) { //we've reached the end
+                closeStream();
+            }
+        } catch (final Exception ioe) {
+            closeStream();
+            LOGGER.error("Closed channel reader " + this + " due to " + ioe);
+        } finally {
+            if (buffer != null) {
+                buffer.clear();
+                bufferPool.returnBuffer(buffer, 0);
+            }
+        }
+    }
+
+    @Override
+    public final boolean equals(final Object obj) {
+        if (obj == null) {
+            return false;
+        }
+        if (obj == this) {
+            return true;
+        }
+        if (obj.getClass() != getClass()) {
+            return false;
+        }
+        AbstractChannelReader rhs = (AbstractChannelReader) obj;
+        return new 
EqualsBuilder().appendSuper(super.equals(obj)).append(uniqueId, 
rhs.uniqueId).isEquals();
+    }
+
+    @Override
+    public final int hashCode() {
+        return new HashCodeBuilder(17, 37).append(uniqueId).toHashCode();
+    }
+
+    @Override
+    public final String toString() {
+        return new ToStringBuilder(this, 
ToStringStyle.NO_FIELD_NAMES_STYLE).append(uniqueId).toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/BufferPool.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/BufferPool.java
 
b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/BufferPool.java
new file mode 100644
index 0000000..007034b
--- /dev/null
+++ 
b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/BufferPool.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io.nio;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BufferPool implements Runnable {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(BufferPool.class);
+    final BlockingQueue<ByteBuffer> bufferPool;
+    private final static double ONE_MB = 1 << 20;
+    private Calendar lastRateSampleTime = Calendar.getInstance();
+    private final Calendar startTime = Calendar.getInstance();
+    double lastRateSampleMBps = -1.0;
+    double overallMBps = -1.0;
+    private long totalBytesExtracted = 0L;
+    private long lastTotalBytesExtracted = 0L;
+    final double maxRateMBps;
+
+    public BufferPool(final int bufferCount, final int bufferCapacity, final 
boolean allocateDirect, final double maxRateMBps) {
+        bufferPool = new 
LinkedBlockingDeque<>(BufferPool.createBuffers(bufferCount, bufferCapacity, 
allocateDirect));
+        this.maxRateMBps = maxRateMBps;
+    }
+
+    /**
+     * Returns the given buffer to the pool - and clears it.
+     *
+     * @param buffer buffer to return
+     * @param bytesProcessed bytes processed for this buffer being returned
+     * @return true if buffer returned to pool
+     */
+    public synchronized boolean returnBuffer(ByteBuffer buffer, final int 
bytesProcessed) {
+        totalBytesExtracted += bytesProcessed;
+        buffer.clear();
+        return bufferPool.add(buffer);
+    }
+
+    //here we enforce the desired rate we want by restricting access to 
buffers when we're over rate
+    public synchronized ByteBuffer poll() {
+        computeRate();
+        final double weightedAvg = (lastRateSampleMBps * 0.7) + (overallMBps * 
0.3);
+        if (overallMBps >= maxRateMBps || weightedAvg >= maxRateMBps) {
+            return null;
+        }
+        return bufferPool.poll();
+    }
+
+    public int size() {
+        return bufferPool.size();
+    }
+
+    private synchronized void computeRate() {
+        final Calendar now = Calendar.getInstance();
+        final long measurementDurationMillis = now.getTimeInMillis() - 
lastRateSampleTime.getTimeInMillis();
+        final double duractionSecs = ((double) measurementDurationMillis) / 
1000.0;
+        if (duractionSecs >= 0.75) { //recompute every 3/4 second or when 
we're too fast
+            final long totalDuractionMillis = now.getTimeInMillis() - 
startTime.getTimeInMillis();
+            final double totalDurationSecs = ((double) totalDuractionMillis) / 
1000.0;
+            final long differenceBytes = totalBytesExtracted - 
lastTotalBytesExtracted;
+            lastTotalBytesExtracted = totalBytesExtracted;
+            lastRateSampleTime = now;
+            final double bps = ((double) differenceBytes) / duractionSecs;
+            final double totalBps = ((double) totalBytesExtracted / 
totalDurationSecs);
+            lastRateSampleMBps = bps / ONE_MB;
+            overallMBps = totalBps / ONE_MB;
+        }
+    }
+
+    public static List<ByteBuffer> createBuffers(final int bufferCount, final 
int bufferCapacity, final boolean allocateDirect) {
+        final List<ByteBuffer> buffers = new ArrayList<>();
+        for (int i = 0; i < bufferCount; i++) {
+            final ByteBuffer buffer = (allocateDirect) ? 
ByteBuffer.allocateDirect(bufferCapacity) : ByteBuffer.allocate(bufferCapacity);
+            buffers.add(buffer);
+        }
+        return buffers;
+    }
+
+    private void logChannelReadRates() {
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug(String.format("Overall rate= %,.4f MB/s / Current 
Rate= %,.4f MB/s / Total Bytes Read= %d", overallMBps, lastRateSampleMBps, 
totalBytesExtracted));
+        }
+    }
+
+    @Override
+    public void run() {
+        computeRate();
+        logChannelReadRates();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelDispatcher.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelDispatcher.java
 
b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelDispatcher.java
new file mode 100644
index 0000000..a4308e3
--- /dev/null
+++ 
b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelDispatcher.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io.nio;
+
+import java.io.IOException;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.Iterator;
+import java.util.UUID;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.nifi.io.nio.consumer.StreamConsumerFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class ChannelDispatcher implements Runnable {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ChannelDispatcher.class);
+    private final Selector serverSocketSelector;
+    private final Selector socketChannelSelector;
+    private final ScheduledExecutorService executor;
+    private final BufferPool emptyBuffers;
+    private final StreamConsumerFactory factory;
+    private final AtomicLong channelReaderFrequencyMilliseconds = new 
AtomicLong(DEFAULT_CHANNEL_READER_PERIOD_MILLISECONDS);
+    private final long timeout;
+    private final boolean readSingleDatagram;
+    private volatile boolean stop = false;
+    public static final long DEFAULT_CHANNEL_READER_PERIOD_MILLISECONDS = 100L;
+
+    public ChannelDispatcher(final Selector serverSocketSelector, final 
Selector socketChannelSelector, final ScheduledExecutorService service,
+            final StreamConsumerFactory factory, final BufferPool buffers, 
final long timeout, final TimeUnit unit, final boolean readSingleDatagram) {
+        this.serverSocketSelector = serverSocketSelector;
+        this.socketChannelSelector = socketChannelSelector;
+        this.executor = service;
+        this.factory = factory;
+        emptyBuffers = buffers;
+        this.timeout = TimeUnit.MILLISECONDS.convert(timeout, unit);
+        this.readSingleDatagram = readSingleDatagram;
+    }
+
+    public void setChannelReaderFrequency(final long period, final TimeUnit 
timeUnit) {
+        
channelReaderFrequencyMilliseconds.set(TimeUnit.MILLISECONDS.convert(period, 
timeUnit));
+    }
+
+    @Override
+    public void run() {
+        while (!stop) {
+            try {
+                selectServerSocketKeys();
+                selectSocketChannelKeys();
+            } catch (final Exception ex) {
+                LOGGER.warn("Key selection failed: {} Normal during 
shutdown.", new Object[]{ex});
+            }
+        }
+    }
+
+    /*
+     * When serverSocketsChannels are registered with the selector, want each 
invoke of this method to loop through all
+     * channels' keys.
+     *
+     * @throws IOException if unable to select keys
+     */
+    private void selectServerSocketKeys() throws IOException {
+        int numSelected = serverSocketSelector.select(timeout);
+        if (numSelected == 0) {
+            return;
+        }
+
+        // for each registered server socket - see if any connections are 
waiting to be established
+        final Iterator<SelectionKey> itr = 
serverSocketSelector.selectedKeys().iterator();
+        while (itr.hasNext()) {
+            SelectionKey serverSocketkey = itr.next();
+            final SelectableChannel channel = serverSocketkey.channel();
+            AbstractChannelReader reader = null;
+            if (serverSocketkey.isValid() && serverSocketkey.isAcceptable()) {
+                final ServerSocketChannel ssChannel = (ServerSocketChannel) 
serverSocketkey.channel();
+                final SocketChannel sChannel = ssChannel.accept();
+                if (sChannel != null) {
+                    sChannel.configureBlocking(false);
+                    final SelectionKey socketChannelKey = 
sChannel.register(socketChannelSelector, SelectionKey.OP_READ);
+                    final String readerId = sChannel.socket().toString();
+                    reader = new SocketChannelReader(readerId, 
socketChannelKey, emptyBuffers, factory);
+                    final ScheduledFuture<?> readerFuture = 
executor.scheduleWithFixedDelay(reader, 10L,
+                            channelReaderFrequencyMilliseconds.get(), 
TimeUnit.MILLISECONDS);
+                    reader.setScheduledFuture(readerFuture);
+                    socketChannelKey.attach(reader);
+                }
+            }
+            itr.remove(); // do this so that the next select operation returns 
a positive value; otherwise, it will return 0.
+            if (reader != null && LOGGER.isDebugEnabled()) {
+                LOGGER.debug(this + " New Connection established.  Server 
channel: " + channel + " Reader: " + reader);
+            }
+        }
+    }
+
+    /*
+     * When invoking this method, only want to iterate through the selected 
keys once. When a key is entered into the selectors
+     * selected key set, select will return a positive value. The next select 
will return 0 if nothing has changed. Note that
+     * the selected key set is not manually changed via a remove operation.
+     *
+     * @throws IOException if unable to select keys
+     */
+    private void selectSocketChannelKeys() throws IOException {
+        // once a channel associated with a key in this selector is 'ready', 
it causes this select to immediately return.
+        // thus, for each trip through the run() we only get hit with one real 
timeout...the one in selectServerSocketKeys.
+        int numSelected = socketChannelSelector.select(timeout);
+        if (numSelected == 0) {
+            return;
+        }
+
+        for (SelectionKey socketChannelKey : 
socketChannelSelector.selectedKeys()) {
+            final SelectableChannel channel = socketChannelKey.channel();
+            AbstractChannelReader reader = null;
+            // there are 2 kinds of channels in this selector, both which have 
their own readers and are executed in their own
+            // threads. We will get here whenever a new SocketChannel is 
created due to an incoming connection. However,
+            // for a DatagramChannel we don't want to create a new reader 
unless it is a new DatagramChannel. The only
+            // way to tell if it's new is the lack of an attachment.
+            if (channel instanceof DatagramChannel && 
socketChannelKey.attachment() == null) {
+                reader = new 
DatagramChannelReader(UUID.randomUUID().toString(), socketChannelKey, 
emptyBuffers, factory, readSingleDatagram);
+                socketChannelKey.attach(reader);
+                final ScheduledFuture<?> readerFuture = 
executor.scheduleWithFixedDelay(reader, 10L, 
channelReaderFrequencyMilliseconds.get(),
+                        TimeUnit.MILLISECONDS);
+                reader.setScheduledFuture(readerFuture);
+            }
+            if (reader != null && LOGGER.isDebugEnabled()) {
+                LOGGER.debug(this + " New Connection established.  Server 
channel: " + channel + " Reader: " + reader);
+            }
+        }
+
+    }
+
+    public void stop() {
+        stop = true;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelListener.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelListener.java
 
b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelListener.java
new file mode 100644
index 0000000..ab77063
--- /dev/null
+++ 
b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/ChannelListener.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io.nio;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.StandardSocketOptions;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.io.nio.consumer.StreamConsumerFactory;
+
+import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class provides the entry point to NIO based socket listeners for NiFi
+ * processors and services. There are 2 supported types of Listeners, Datagram
+ * (UDP based transmissions) and ServerSocket (TCP based transmissions). This
+ * will create the ChannelDispatcher, which is a Runnable and is controlled via
+ * the ScheduledExecutorService, which is also created by this class. The
+ * ChannelDispatcher handles connections to the ServerSocketChannels and 
creates
+ * the readers associated with the resulting SocketChannels. Additionally, this
+ * creates and manages two Selectors, one for ServerSocketChannels and another
+ * for SocketChannels and DatagramChannels.
+ *
+ * The threading model for this consists of one thread for the
+ * ChannelDispatcher, one thread per added SocketChannel reader, one thread per
+ * added DatagramChannel reader. The ChannelDispatcher is not scheduled with
+ * fixed delay as the others are. It is throttled by the provided timeout 
value.
+ * Within the ChannelDispatcher there are two blocking operations which will
+ * block for the given timeout each time through the enclosing loop.
+ *
+ * All channels are cached in one of the two Selectors via their SelectionKey.
+ * The serverSocketSelector maintains all the added ServerSocketChannels; the
+ * socketChannelSelector maintains the all the add DatagramChannels and the
+ * created SocketChannels. Further, the SelectionKey of the DatagramChannel and
+ * the SocketChannel is injected with the channel's associated reader.
+ *
+ * All ChannelReaders will get throttled by the unavailability of buffers in 
the
+ * provided BufferPool. This is designed to create back pressure.
+ *
+ */
+public final class ChannelListener {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ChannelListener.class);
+    private final ScheduledExecutorService executor;
+    private final Selector serverSocketSelector; // used to listen for new 
connections
+    private final Selector socketChannelSelector; // used to listen on 
existing connections
+    private final ChannelDispatcher channelDispatcher;
+    private final BufferPool bufferPool;
+    private final int initialBufferPoolSize;
+    private volatile long channelReaderFrequencyMSecs = 50;
+
+    public ChannelListener(final int threadPoolSize, final 
StreamConsumerFactory consumerFactory, final BufferPool bufferPool, int timeout,
+            TimeUnit unit, final boolean readSingleDatagram) throws 
IOException {
+        this.executor = Executors.newScheduledThreadPool(threadPoolSize + 1); 
// need to allow for long running ChannelDispatcher thread
+        this.serverSocketSelector = Selector.open();
+        this.socketChannelSelector = Selector.open();
+        this.bufferPool = bufferPool;
+        this.initialBufferPoolSize = bufferPool.size();
+        channelDispatcher = new ChannelDispatcher(serverSocketSelector, 
socketChannelSelector, executor, consumerFactory, bufferPool,
+                timeout, unit, readSingleDatagram);
+        executor.schedule(channelDispatcher, 50, TimeUnit.MILLISECONDS);
+    }
+
+    public void setChannelReaderSchedulingPeriod(final long period, final 
TimeUnit unit) {
+        channelReaderFrequencyMSecs = TimeUnit.MILLISECONDS.convert(period, 
unit);
+        channelDispatcher.setChannelReaderFrequency(period, unit);
+    }
+
+    /**
+     * Adds a server socket channel for listening to connections.
+     *
+     * @param nicIPAddress - if null binds to wildcard address
+     * @param port - port to bind to
+     * @param receiveBufferSize - size of OS receive buffer to request. If less
+     * than 0 then will not be set and OS default will win.
+     * @throws IOException if unable to add socket
+     */
+    public void addServerSocket(final InetAddress nicIPAddress, final int 
port, final int receiveBufferSize)
+            throws IOException {
+        final ServerSocketChannel ssChannel = ServerSocketChannel.open();
+        ssChannel.configureBlocking(false);
+        if (receiveBufferSize > 0) {
+            ssChannel.setOption(StandardSocketOptions.SO_RCVBUF, 
receiveBufferSize);
+            final int actualReceiveBufSize = 
ssChannel.getOption(StandardSocketOptions.SO_RCVBUF);
+            if (actualReceiveBufSize < receiveBufferSize) {
+                LOGGER.warn(this + " attempted to set TCP Receive Buffer Size 
to "
+                        + receiveBufferSize + " bytes but could only set to " 
+ actualReceiveBufSize
+                        + "bytes. You may want to consider changing the 
Operating System's "
+                        + "maximum receive buffer");
+            }
+        }
+        ssChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
+        ssChannel.bind(new InetSocketAddress(nicIPAddress, port));
+        ssChannel.register(serverSocketSelector, SelectionKey.OP_ACCEPT);
+    }
+
+    /**
+     * Binds to listen for data grams on the given local IPAddress/port
+     *
+     * @param nicIPAddress - if null will listen on wildcard address, which
+     * means datagrams will be received on all local network interfaces.
+     * Otherwise, will bind to the provided IP address associated with some 
NIC.
+     * @param port - the port to listen on
+     * @param receiveBufferSize - the number of bytes to request for a receive
+     * buffer from OS
+     * @throws IOException if unable to add channel
+     */
+    public void addDatagramChannel(final InetAddress nicIPAddress, final int 
port, final int receiveBufferSize)
+            throws IOException {
+        final DatagramChannel dChannel = 
createAndBindDatagramChannel(nicIPAddress, port, receiveBufferSize);
+        dChannel.register(socketChannelSelector, SelectionKey.OP_READ);
+    }
+
+    /**
+     * Binds to listen for data grams on the given local IPAddress/port and
+     * restricts receipt of datagrams to those from the provided host and port,
+     * must specify both. This improves performance for datagrams coming from a
+     * sender that is known a-priori.
+     *
+     * @param nicIPAddress - if null will listen on wildcard address, which
+     * means datagrams will be received on all local network interfaces.
+     * Otherwise, will bind to the provided IP address associated with some 
NIC.
+     * @param port - the port to listen on. This is used to provide a 
well-known
+     * destination for a sender.
+     * @param receiveBufferSize - the number of bytes to request for a receive
+     * buffer from OS
+     * @param sendingHost - the hostname, or IP address, of the sender of
+     * datagrams. Only datagrams from this host will be received. If this is
+     * null the wildcard ip is used, which means datagrams may be received from
+     * any network interface on the local host.
+     * @param sendingPort - the port used by the sender of datagrams. Only
+     * datagrams from this port will be received.
+     * @throws IOException if unable to add channel
+     */
+    public void addDatagramChannel(final InetAddress nicIPAddress, final int 
port, final int receiveBufferSize, final String sendingHost,
+            final Integer sendingPort) throws IOException {
+
+        if (sendingHost == null || sendingPort == null) {
+            addDatagramChannel(nicIPAddress, port, receiveBufferSize);
+            return;
+        }
+        final DatagramChannel dChannel = 
createAndBindDatagramChannel(nicIPAddress, port, receiveBufferSize);
+        dChannel.connect(new InetSocketAddress(sendingHost, sendingPort));
+        dChannel.register(socketChannelSelector, SelectionKey.OP_READ);
+    }
+
+    private DatagramChannel createAndBindDatagramChannel(final InetAddress 
nicIPAddress, final int port, final int receiveBufferSize)
+            throws IOException {
+        final DatagramChannel dChannel = DatagramChannel.open();
+        dChannel.configureBlocking(false);
+        if (receiveBufferSize > 0) {
+            dChannel.setOption(StandardSocketOptions.SO_RCVBUF, 
receiveBufferSize);
+            final int actualReceiveBufSize = 
dChannel.getOption(StandardSocketOptions.SO_RCVBUF);
+            if (actualReceiveBufSize < receiveBufferSize) {
+                LOGGER.warn(this + " attempted to set UDP Receive Buffer Size 
to "
+                        + receiveBufferSize + " bytes but could only set to " 
+ actualReceiveBufSize
+                        + "bytes. You may want to consider changing the 
Operating System's "
+                        + "maximum receive buffer");
+            }
+        }
+        dChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
+        dChannel.bind(new InetSocketAddress(nicIPAddress, port));
+        return dChannel;
+    }
+
+    public void shutdown(final long period, final TimeUnit timeUnit) {
+        channelDispatcher.stop();
+        for (SelectionKey selectionKey : socketChannelSelector.keys()) {
+            final AbstractChannelReader reader = (AbstractChannelReader) 
selectionKey.attachment();
+            selectionKey.cancel();
+            if (reader != null) {
+                while (!reader.isClosed()) {
+                    try {
+                        Thread.sleep(channelReaderFrequencyMSecs);
+                    } catch (InterruptedException e) {
+                    }
+                }
+                final ScheduledFuture<?> readerFuture = 
reader.getScheduledFuture();
+                readerFuture.cancel(false);
+            }
+            IOUtils.closeQuietly(selectionKey.channel()); // should already be 
closed via reader, but if reader did not exist...
+        }
+        IOUtils.closeQuietly(socketChannelSelector);
+
+        for (SelectionKey selectionKey : serverSocketSelector.keys()) {
+            selectionKey.cancel();
+            IOUtils.closeQuietly(selectionKey.channel());
+        }
+        IOUtils.closeQuietly(serverSocketSelector);
+        executor.shutdown();
+        try {
+            executor.awaitTermination(period, timeUnit);
+        } catch (final InterruptedException ex) {
+            LOGGER.warn("Interrupted while trying to shutdown executor");
+        }
+        final int currentBufferPoolSize = bufferPool.size();
+        final String warning = (currentBufferPoolSize != 
initialBufferPoolSize) ? "Initial buffer count=" + initialBufferPoolSize
+                + " Current buffer count=" + currentBufferPoolSize
+                + " Could indicate a buffer leak.  Ensure all consumers are 
executed until they complete." : "";
+        LOGGER.info("Channel listener shutdown. " + warning);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/DatagramChannelReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/DatagramChannelReader.java
 
b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/DatagramChannelReader.java
new file mode 100644
index 0000000..a4670b9
--- /dev/null
+++ 
b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/DatagramChannelReader.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io.nio;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectionKey;
+
+import org.apache.nifi.io.nio.consumer.StreamConsumerFactory;
+
+public final class DatagramChannelReader extends AbstractChannelReader {
+
+    public static final int MAX_UDP_PACKET_SIZE = 65507;
+
+    private final boolean readSingleDatagram;
+
+    public DatagramChannelReader(final String id, final SelectionKey key, 
final BufferPool empties, final StreamConsumerFactory consumerFactory,
+            final boolean readSingleDatagram) {
+        super(id, key, empties, consumerFactory);
+        this.readSingleDatagram = readSingleDatagram;
+    }
+
+    /**
+     * Will receive UDP data from channel and won't receive anything unless the
+     * given buffer has enough space for at least one full max udp packet.
+     *
+     * @param key selection key
+     * @param buffer to fill
+     * @return bytes read
+     * @throws IOException if error filling buffer from channel
+     */
+    @Override
+    protected int fillBuffer(final SelectionKey key, final ByteBuffer buffer) 
throws IOException {
+        final DatagramChannel dChannel = (DatagramChannel) key.channel();
+        final int initialBufferPosition = buffer.position();
+        while (buffer.remaining() > MAX_UDP_PACKET_SIZE && key.isValid() && 
key.isReadable()) {
+            if (dChannel.receive(buffer) == null || readSingleDatagram) {
+                break;
+            }
+        }
+        return buffer.position() - initialBufferPosition;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/SocketChannelReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/SocketChannelReader.java
 
b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/SocketChannelReader.java
new file mode 100644
index 0000000..29c2973
--- /dev/null
+++ 
b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/SocketChannelReader.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io.nio;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import org.apache.nifi.io.nio.consumer.StreamConsumerFactory;
+
+public final class SocketChannelReader extends AbstractChannelReader {
+
+    public SocketChannelReader(final String id, final SelectionKey key, final 
BufferPool empties, final StreamConsumerFactory consumerFactory) {
+        super(id, key, empties, consumerFactory);
+    }
+
+    /**
+     * Receives TCP data from the socket channel for the given key.
+     *
+     * @param key selection key
+     * @param buffer byte buffer to fill
+     * @return bytes read
+     * @throws IOException if error reading bytes
+     */
+    @Override
+    protected int fillBuffer(final SelectionKey key, final ByteBuffer buffer) 
throws IOException {
+        int bytesRead = 0;
+        final SocketChannel sChannel = (SocketChannel) key.channel();
+        while (key.isValid() && key.isReadable()) {
+            bytesRead = sChannel.read(buffer);
+            if (bytesRead <= 0) {
+                break;
+            }
+        }
+        return bytesRead;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/AbstractStreamConsumer.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/AbstractStreamConsumer.java
 
b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/AbstractStreamConsumer.java
new file mode 100644
index 0000000..bb57e26
--- /dev/null
+++ 
b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/AbstractStreamConsumer.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io.nio.consumer;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.nifi.io.nio.BufferPool;
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+
+/**
+ *
+ */
+public abstract class AbstractStreamConsumer implements StreamConsumer {
+
+    private final String uniqueId;
+    private BufferPool bufferPool = null;
+    private final BlockingQueue<ByteBuffer> filledBuffers = new 
LinkedBlockingQueue<>();
+    private final AtomicBoolean streamEnded = new AtomicBoolean(false);
+    private final AtomicBoolean consumerEnded = new AtomicBoolean(false);
+
+    public AbstractStreamConsumer(final String id) {
+        uniqueId = id;
+    }
+
+    @Override
+    public final void setReturnBufferQueue(final BufferPool returnQueue) {
+        bufferPool = returnQueue;
+    }
+
+    @Override
+    public final void addFilledBuffer(final ByteBuffer buffer) {
+        if (isConsumerFinished()) {
+            buffer.clear();
+            bufferPool.returnBuffer(buffer, buffer.remaining());
+        } else {
+            filledBuffers.add(buffer);
+        }
+    }
+
+    @Override
+    public final void process() throws IOException {
+        if (isConsumerFinished()) {
+            return;
+        }
+        if (streamEnded.get() && filledBuffers.isEmpty()) {
+            consumerEnded.set(true);
+            onConsumerDone();
+            return;
+        }
+        final ByteBuffer buffer = filledBuffers.poll();
+        if (buffer != null) {
+            final int bytesToProcess = buffer.remaining();
+            try {
+                processBuffer(buffer);
+            } finally {
+                buffer.clear();
+                bufferPool.returnBuffer(buffer, bytesToProcess);
+            }
+        }
+    }
+
+    protected abstract void processBuffer(ByteBuffer buffer) throws 
IOException;
+
+    @Override
+    public final void signalEndOfStream() {
+        streamEnded.set(true);
+    }
+
+    /**
+     * Convenience method that is called when the consumer is done processing
+     * based on being told the signal is end of stream and has processed all
+     * given buffers.
+     */
+    protected void onConsumerDone() {
+    }
+
+    @Override
+    public final boolean isConsumerFinished() {
+        return consumerEnded.get();
+    }
+
+    @Override
+    public final String getId() {
+        return uniqueId;
+    }
+
+    @Override
+    public final boolean equals(final Object obj) {
+        if (obj == null) {
+            return false;
+        }
+        if (obj == this) {
+            return true;
+        }
+        if (obj.getClass() != getClass()) {
+            return false;
+        }
+        AbstractStreamConsumer rhs = (AbstractStreamConsumer) obj;
+        return new 
EqualsBuilder().appendSuper(super.equals(obj)).append(uniqueId, 
rhs.uniqueId).isEquals();
+    }
+
+    @Override
+    public final int hashCode() {
+        return new HashCodeBuilder(19, 23).append(uniqueId).toHashCode();
+    }
+
+    @Override
+    public final String toString() {
+        return new ToStringBuilder(this, 
ToStringStyle.NO_FIELD_NAMES_STYLE).append(uniqueId).toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/StreamConsumer.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/StreamConsumer.java
 
b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/StreamConsumer.java
new file mode 100644
index 0000000..cac8d8b
--- /dev/null
+++ 
b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/StreamConsumer.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io.nio.consumer;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.nifi.io.nio.BufferPool;
+
+/**
+ * A StreamConsumer must be thread safe. It may be accessed concurrently by a
+ * thread providing data to process and another thread that is processing that
+ * data.
+ *
+ */
+public interface StreamConsumer {
+
+    /**
+     * Will be called once just after construction. It provides the queue to
+     * which processed and emptied and cleared buffers must be returned. For
+     * each time <code>addFilledBuffer</code> is called there should be an
+     * associated add to this given queue. If not, buffers will run out and all
+     * stream processing will halt. READ THIS!!!
+     *
+     * @param returnQueue pool of buffers to use
+     */
+    void setReturnBufferQueue(BufferPool returnQueue);
+
+    /**
+     * Will be called by the thread that produces byte buffers with available
+     * data to be processed. If the consumer is finished this should simply
+     * return the given buffer to the return buffer queue (after it is cleared)
+     *
+     * @param buffer filled buffer
+     */
+    void addFilledBuffer(ByteBuffer buffer);
+
+    /**
+     * Will be called by the thread that executes the consumption of data. May
+     * be called many times though once <code>isConsumerFinished</code> returns
+     * true this method will likely do nothing.
+     *
+     * @throws java.io.IOException if there is an issue processing
+     */
+    void process() throws IOException;
+
+    /**
+     * Called once the end of the input stream is detected
+     */
+    void signalEndOfStream();
+
+    /**
+     * If true signals the consumer is done consuming data and will not process
+     * any more buffers.
+     *
+     * @return true if finished
+     */
+    boolean isConsumerFinished();
+
+    /**
+     * Uniquely identifies the consumer
+     *
+     * @return identifier of consumer
+     */
+    String getId();
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/StreamConsumerFactory.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/StreamConsumerFactory.java
 
b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/StreamConsumerFactory.java
new file mode 100644
index 0000000..ba858b6
--- /dev/null
+++ 
b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/nio/consumer/StreamConsumerFactory.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io.nio.consumer;
+
+/**
+ *
+ */
+public interface StreamConsumerFactory {
+
+    StreamConsumer newInstance(String streamId);
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SSLContextFactory.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SSLContextFactory.java
 
b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SSLContextFactory.java
new file mode 100644
index 0000000..9c6cb82
--- /dev/null
+++ 
b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SSLContextFactory.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io.socket;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.security.KeyManagementException;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.SecureRandom;
+import java.security.UnrecoverableKeyException;
+import java.security.cert.CertificateException;
+
+import javax.net.ssl.KeyManager;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.TrustManagerFactory;
+
+import org.apache.nifi.util.NiFiProperties;
+
+public class SSLContextFactory {
+
+    private final String keystore;
+    private final char[] keystorePass;
+    private final String keystoreType;
+    private final String truststore;
+    private final char[] truststorePass;
+    private final String truststoreType;
+
+    private final KeyManager[] keyManagers;
+    private final TrustManager[] trustManagers;
+
+    public SSLContextFactory(final NiFiProperties properties) throws 
NoSuchAlgorithmException, CertificateException, FileNotFoundException, 
IOException, KeyStoreException, UnrecoverableKeyException {
+        keystore = properties.getProperty(NiFiProperties.SECURITY_KEYSTORE);
+        keystorePass = 
getPass(properties.getProperty(NiFiProperties.SECURITY_KEYSTORE_PASSWD));
+        keystoreType = 
properties.getProperty(NiFiProperties.SECURITY_KEYSTORE_TYPE);
+
+        truststore = 
properties.getProperty(NiFiProperties.SECURITY_TRUSTSTORE);
+        truststorePass = 
getPass(properties.getProperty(NiFiProperties.SECURITY_TRUSTSTORE_PASSWD));
+        truststoreType = 
properties.getProperty(NiFiProperties.SECURITY_TRUSTSTORE_TYPE);
+
+        // prepare the keystore
+        final KeyStore keyStore = KeyStore.getInstance(keystoreType);
+        keyStore.load(new FileInputStream(keystore), keystorePass);
+        final KeyManagerFactory keyManagerFactory = 
KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+        keyManagerFactory.init(keyStore, keystorePass);
+
+        // prepare the truststore
+        final KeyStore trustStore = KeyStore.getInstance(truststoreType);
+        trustStore.load(new FileInputStream(truststore), truststorePass);
+        final TrustManagerFactory trustManagerFactory = 
TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+        trustManagerFactory.init(trustStore);
+
+        keyManagers = keyManagerFactory.getKeyManagers();
+        trustManagers = trustManagerFactory.getTrustManagers();
+    }
+
+    private static char[] getPass(final String password) {
+        return password == null ? null : password.toCharArray();
+    }
+
+    /**
+     * Creates a SSLContext instance using the given information.
+     *
+     *
+     * @return a SSLContext instance
+     * @throws java.security.KeyStoreException if problem with keystore
+     * @throws java.io.IOException if unable to create context
+     * @throws java.security.NoSuchAlgorithmException if algorithm isn't known
+     * @throws java.security.cert.CertificateException if certificate is 
invalid
+     * @throws java.security.UnrecoverableKeyException if the key cannot be 
recovered
+     * @throws java.security.KeyManagementException if the key is improper
+     */
+    public SSLContext createSslContext() throws KeyStoreException, 
IOException, NoSuchAlgorithmException, CertificateException,
+            UnrecoverableKeyException, KeyManagementException {
+
+        // initialize the ssl context
+        final SSLContext sslContext = SSLContext.getInstance("TLS");
+        sslContext.init(keyManagers, trustManagers, new SecureRandom());
+        sslContext.getDefaultSSLParameters().setNeedClientAuth(true);
+
+        return sslContext;
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/ServerSocketConfiguration.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/ServerSocketConfiguration.java
 
b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/ServerSocketConfiguration.java
new file mode 100644
index 0000000..d6aca92
--- /dev/null
+++ 
b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/ServerSocketConfiguration.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io.socket;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.security.KeyManagementException;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.UnrecoverableKeyException;
+import java.security.cert.CertificateException;
+
+import javax.net.ssl.SSLContext;
+
+public final class ServerSocketConfiguration {
+
+    private boolean needClientAuth;
+    private Integer socketTimeout;
+    private Boolean reuseAddress;
+    private Integer receiveBufferSize;
+    private SSLContextFactory sslContextFactory;
+
+    public ServerSocketConfiguration() {
+    }
+
+    public SSLContext createSSLContext()
+            throws KeyManagementException, NoSuchAlgorithmException, 
UnrecoverableKeyException, KeyStoreException, CertificateException, 
FileNotFoundException, IOException {
+        return sslContextFactory == null ? null : 
sslContextFactory.createSslContext();
+    }
+
+    public void setSSLContextFactory(final SSLContextFactory 
sslContextFactory) {
+        this.sslContextFactory = sslContextFactory;
+    }
+
+    public Integer getSocketTimeout() {
+        return socketTimeout;
+    }
+
+    public void setSocketTimeout(Integer socketTimeout) {
+        this.socketTimeout = socketTimeout;
+    }
+
+    public boolean getNeedClientAuth() {
+        return needClientAuth;
+    }
+
+    public void setNeedClientAuth(boolean needClientAuth) {
+        this.needClientAuth = needClientAuth;
+    }
+
+    public Boolean getReuseAddress() {
+        return reuseAddress;
+    }
+
+    public void setReuseAddress(Boolean reuseAddress) {
+        this.reuseAddress = reuseAddress;
+    }
+
+    public Integer getReceiveBufferSize() {
+        return receiveBufferSize;
+    }
+
+    public void setReceiveBufferSize(Integer receiveBufferSize) {
+        this.receiveBufferSize = receiveBufferSize;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketConfiguration.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketConfiguration.java
 
b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketConfiguration.java
new file mode 100644
index 0000000..8b803dc
--- /dev/null
+++ 
b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketConfiguration.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io.socket;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.security.KeyManagementException;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.UnrecoverableKeyException;
+import java.security.cert.CertificateException;
+
+import javax.net.ssl.SSLContext;
+
+public final class SocketConfiguration {
+
+    private Integer socketTimeout;
+    private Integer receiveBufferSize;
+    private Integer sendBufferSize;
+    private Boolean reuseAddress;
+    private Boolean keepAlive;
+    private Boolean oobInline;
+    private Boolean tcpNoDelay;
+    private Integer trafficClass;
+    private SSLContextFactory sslContextFactory;
+
+    public SSLContext createSSLContext()
+            throws KeyManagementException, NoSuchAlgorithmException, 
UnrecoverableKeyException, KeyStoreException, CertificateException, 
FileNotFoundException, IOException {
+        return sslContextFactory == null ? null : 
sslContextFactory.createSslContext();
+    }
+
+    public void setSSLContextFactory(final SSLContextFactory 
sslContextFactory) {
+        this.sslContextFactory = sslContextFactory;
+    }
+
+    public Integer getSocketTimeout() {
+        return socketTimeout;
+    }
+
+    public void setSocketTimeout(Integer socketTimeout) {
+        this.socketTimeout = socketTimeout;
+    }
+
+    public Boolean getReuseAddress() {
+        return reuseAddress;
+    }
+
+    public void setReuseAddress(Boolean reuseAddress) {
+        this.reuseAddress = reuseAddress;
+    }
+
+    public Boolean getKeepAlive() {
+        return keepAlive;
+    }
+
+    public void setKeepAlive(Boolean keepAlive) {
+        this.keepAlive = keepAlive;
+    }
+
+    public Boolean getOobInline() {
+        return oobInline;
+    }
+
+    public void setOobInline(Boolean oobInline) {
+        this.oobInline = oobInline;
+    }
+
+    public Integer getReceiveBufferSize() {
+        return receiveBufferSize;
+    }
+
+    public void setReceiveBufferSize(Integer receiveBufferSize) {
+        this.receiveBufferSize = receiveBufferSize;
+    }
+
+    public Integer getSendBufferSize() {
+        return sendBufferSize;
+    }
+
+    public void setSendBufferSize(Integer sendBufferSize) {
+        this.sendBufferSize = sendBufferSize;
+    }
+
+    public Boolean getTcpNoDelay() {
+        return tcpNoDelay;
+    }
+
+    public void setTcpNoDelay(Boolean tcpNoDelay) {
+        this.tcpNoDelay = tcpNoDelay;
+    }
+
+    public Integer getTrafficClass() {
+        return trafficClass;
+    }
+
+    public void setTrafficClass(Integer trafficClass) {
+        this.trafficClass = trafficClass;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketListener.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketListener.java
 
b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketListener.java
new file mode 100644
index 0000000..b509035
--- /dev/null
+++ 
b/nifi-commons/nifi-socket-utils/src/main/java/org/apache/nifi/io/socket/SocketListener.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.io.socket;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.security.KeyManagementException;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.UnrecoverableKeyException;
+import java.security.cert.CertificateException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.nifi.logging.NiFiLog;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implements a listener for TCP/IP messages sent over unicast socket.
+ *
+ */
+public abstract class SocketListener {
+
+    private static final int DEFAULT_SHUTDOWN_LISTENER_SECONDS = 5;
+    private static final Logger logger = new 
NiFiLog(LoggerFactory.getLogger(SocketListener.class));
+    private volatile ExecutorService executorService;  // volatile to 
guarantee most current value is visible
+    private volatile ServerSocket serverSocket;        // volatile to 
guarantee most current value is visible
+    private final int numThreads;
+    private final int port;
+    private final ServerSocketConfiguration configuration;
+    private final AtomicInteger shutdownListenerSeconds = new 
AtomicInteger(DEFAULT_SHUTDOWN_LISTENER_SECONDS);
+
+    public SocketListener(
+            final int numThreads,
+            final int port,
+            final ServerSocketConfiguration configuration) {
+
+        if (numThreads <= 0) {
+            throw new IllegalArgumentException("Number of threads may not be 
less than or equal to zero.");
+        } else if (configuration == null) {
+            throw new IllegalArgumentException("Server socket configuration 
may not be null.");
+        }
+
+        this.numThreads = numThreads;
+        this.port = port;
+        this.configuration = configuration;
+    }
+
+    /**
+     * Implements the action to perform when a new socket request is received.
+     * This class will close the socket.
+     *
+     * @param socket the socket
+     */
+    public abstract void dispatchRequest(final Socket socket);
+
+    public void start() throws IOException {
+
+        if (isRunning()) {
+            return;
+        }
+
+        try {
+            serverSocket = SocketUtils.createServerSocket(port, configuration);
+        } catch (KeyManagementException | UnrecoverableKeyException | 
NoSuchAlgorithmException | KeyStoreException | CertificateException e) {
+            throw new IOException(e);
+        }
+
+        final ThreadFactory defaultThreadFactory = 
Executors.defaultThreadFactory();
+        executorService = Executors.newFixedThreadPool(numThreads, new 
ThreadFactory() {
+            private final AtomicLong threadCounter = new AtomicLong(0L);
+
+            @Override
+            public Thread newThread(final Runnable r) {
+                final Thread newThread = defaultThreadFactory.newThread(r);
+                newThread.setName("Process NCM Request-" + 
threadCounter.incrementAndGet());
+                return newThread;
+            }
+        });
+
+        final ExecutorService runnableExecServiceRef = executorService;
+        final ServerSocket runnableServerSocketRef = serverSocket;
+
+        final Thread t = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                while (runnableExecServiceRef.isShutdown() == false) {
+                    Socket socket = null;
+                    try {
+                        try {
+                            socket = runnableServerSocketRef.accept();
+                            if (configuration.getSocketTimeout() != null) {
+                                
socket.setSoTimeout(configuration.getSocketTimeout());
+                            }
+                        } catch (final SocketTimeoutException ste) {
+                            // nobody connected to us. Go ahead and call 
closeQuietly just to make sure we don't leave
+                            // any sockets lingering
+                            SocketUtils.closeQuietly(socket);
+                            continue;
+                        } catch (final SocketException se) {
+                            logger.warn("Failed to communicate with " + 
(socket == null ? "Unknown Host" : socket.getInetAddress().getHostName()) + " 
due to " + se, se);
+                            SocketUtils.closeQuietly(socket);
+                            continue;
+                        } catch (final Throwable t) {
+                            logger.warn("Socket Listener encountered 
exception: " + t, t);
+                            SocketUtils.closeQuietly(socket);
+                            continue;
+                        }
+
+                        final Socket finalSocket = socket;
+                        runnableExecServiceRef.execute(new Runnable() {
+                            @Override
+                            public void run() {
+                                try {
+                                    dispatchRequest(finalSocket);
+                                } catch (final Throwable t) {
+                                    logger.warn("Dispatching socket request 
encountered exception due to: " + t, t);
+                                } finally {
+                                    SocketUtils.closeQuietly(finalSocket);
+                                }
+                            }
+                        });
+                    } catch (final Throwable t) {
+                        logger.error("Socket Listener encountered exception: " 
+ t, t);
+                        SocketUtils.closeQuietly(socket);
+                    }
+                }
+            }
+        });
+        t.setName("Cluster Socket Listener");
+        t.start();
+    }
+
+    public boolean isRunning() {
+        return (executorService != null && executorService.isShutdown() == 
false);
+    }
+
+    public void stop() throws IOException {
+
+        if (isRunning() == false) {
+            return;
+        }
+
+        // shutdown executor service
+        try {
+            if (getShutdownListenerSeconds() <= 0) {
+                executorService.shutdownNow();
+            } else {
+                executorService.shutdown();
+            }
+            executorService.awaitTermination(getShutdownListenerSeconds(), 
TimeUnit.SECONDS);
+        } catch (final InterruptedException ex) {
+            Thread.currentThread().interrupt();
+        } finally {
+            if (executorService.isTerminated()) {
+                logger.info("Socket Listener has been terminated 
successfully.");
+            } else {
+                logger.warn("Socket Listener has not terminated properly.  
There exists an uninterruptable thread that will take an indeterminate amount 
of time to stop.");
+            }
+        }
+
+        // shutdown server socket
+        SocketUtils.closeQuietly(serverSocket);
+
+    }
+
+    public int getShutdownListenerSeconds() {
+        return shutdownListenerSeconds.get();
+    }
+
+    public void setShutdownListenerSeconds(final int shutdownListenerSeconds) {
+        this.shutdownListenerSeconds.set(shutdownListenerSeconds);
+    }
+
+    public ServerSocketConfiguration getConfiguration() {
+        return configuration;
+    }
+
+    public int getPort() {
+        if (isRunning()) {
+            return serverSocket.getLocalPort();
+        } else {
+            return port;
+        }
+    }
+
+}

Reply via email to