This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 9a125dc  NIFI-8616 Refactored PutTCP, PutUDP, and PutSplunk to use 
Netty nifi-event-transport classes
9a125dc is described below

commit 9a125dcaafbaa922066f067d0c5e65f7286f8d44
Author: Nathan Gough <[email protected]>
AuthorDate: Tue Jun 22 15:45:38 2021 -0400

    NIFI-8616 Refactored PutTCP, PutUDP, and PutSplunk to use Netty 
nifi-event-transport classes
    
    This closes #5182
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../ByteArrayMessageNettyEventServerFactory.java   |   1 -
 .../netty/ByteArrayNettyEventSenderFactory.java    |  46 +++
 .../transport/netty/DelimitedInputStream.java      |  43 +++
 .../event/transport/netty/NettyEventSender.java    |  15 +-
 .../transport/netty/NettyEventSenderFactory.java   |  27 +-
 .../transport/netty/NettyEventServerFactory.java   |  14 +-
 .../netty/StreamingNettyEventSenderFactory.java    |  51 +++
 .../netty/codec/InputStreamMessageEncoder.java     |  47 +++
 .../transport/netty/NettyEventSenderTest.java      |   2 +-
 .../nifi-processor-utils/pom.xml                   |   6 +
 .../util/put/AbstractPutEventProcessor.java        | 214 +++----------
 .../apache/nifi/processors/splunk/PutSplunk.java   |  79 ++---
 .../nifi/processors/splunk/TestPutSplunk.java      | 341 +++++++--------------
 .../apache/nifi/processors/standard/PutTCP.java    | 114 ++-----
 .../apache/nifi/processors/standard/PutUDP.java    |  45 +--
 .../nifi/processors/standard/TestListenSyslog.java |   1 -
 .../nifi/processors/standard/TestPutTCP.java       | 109 +++----
 .../nifi/processors/standard/TestPutUDP.java       |  94 +++---
 .../processors/standard/util/TCPTestServer.java    | 166 ----------
 19 files changed, 556 insertions(+), 859 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/ByteArrayMessageNettyEventServerFactory.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/ByteArrayMessageNettyEventServerFactory.java
index 9647565..b6cd5bf 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/ByteArrayMessageNettyEventServerFactory.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/ByteArrayMessageNettyEventServerFactory.java
@@ -55,7 +55,6 @@ public class ByteArrayMessageNettyEventServerFactory extends 
NettyEventServerFac
                                                    final int maxFrameLength,
                                                    final 
BlockingQueue<ByteArrayMessage> messages) {
         super(address, port, protocol);
-
         final LogExceptionChannelHandler logExceptionChannelHandler = new 
LogExceptionChannelHandler(log);
         final ByteArrayMessageChannelHandler byteArrayMessageChannelHandler = 
new ByteArrayMessageChannelHandler(messages);
 
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/ByteArrayNettyEventSenderFactory.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/ByteArrayNettyEventSenderFactory.java
new file mode 100644
index 0000000..71219b3
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/ByteArrayNettyEventSenderFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.event.transport.netty;
+
+import io.netty.channel.ChannelHandler;
+import io.netty.handler.codec.bytes.ByteArrayEncoder;
+import org.apache.nifi.event.transport.configuration.TransportProtocol;
+import 
org.apache.nifi.event.transport.netty.channel.LogExceptionChannelHandler;
+import org.apache.nifi.logging.ComponentLog;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Netty Event Sender Factory for byte array messages
+ */
+public class ByteArrayNettyEventSenderFactory extends 
NettyEventSenderFactory<byte[]> {
+    /**
+     * Netty Event Sender Factory using byte array
+     *
+     * @param log Component Log
+     * @param address Remote Address
+     * @param port Remote Port Number
+     * @param protocol Channel Protocol
+     */
+    public ByteArrayNettyEventSenderFactory(final ComponentLog log, final 
String address, final int port, final TransportProtocol protocol) {
+        super(address, port, protocol);
+        final List<ChannelHandler> handlers = new ArrayList<>();
+        handlers.add(new LogExceptionChannelHandler(log));
+        handlers.add(new ByteArrayEncoder());
+        setHandlerSupplier(() -> handlers);
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/DelimitedInputStream.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/DelimitedInputStream.java
new file mode 100644
index 0000000..4f6a4b1
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/DelimitedInputStream.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.event.transport.netty;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * A delimited InputStream which retains a delimiter for later use
+ */
+public class DelimitedInputStream extends InputStream {
+
+    private final InputStream in;
+    private final byte[] delimiter;
+
+    public DelimitedInputStream(final InputStream in, final byte[] delimiter) {
+        this.in = in;
+        this.delimiter = delimiter;
+    }
+
+    public byte[] getDelimiter() {
+        return delimiter;
+    }
+
+    @Override
+    public int read() throws IOException {
+        return in.read();
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventSender.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventSender.java
index 7d9346c..4cc2b24 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventSender.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventSender.java
@@ -38,17 +38,21 @@ class NettyEventSender<T> implements EventSender<T> {
 
     private final SocketAddress remoteAddress;
 
+    private boolean singleEventPerConnection;
+
     /**
      * Netty Channel Event Sender with Event Loop Group and Channel Pool
      *
      * @param group Event Loop Group
      * @param channelPool Channel Pool
      * @param remoteAddress Remote Address
+     * @param singleEventPerConnection If true, send a single event per 
connection, and then close it.
      */
-    NettyEventSender(final EventLoopGroup group, final ChannelPool 
channelPool, final SocketAddress remoteAddress) {
+    NettyEventSender(final EventLoopGroup group, final ChannelPool 
channelPool, final SocketAddress remoteAddress, final boolean 
singleEventPerConnection) {
         this.group = group;
         this.channelPool = channelPool;
         this.remoteAddress = remoteAddress;
+        this.singleEventPerConnection = singleEventPerConnection;
     }
 
     /**
@@ -65,7 +69,7 @@ class NettyEventSender<T> implements EventSender<T> {
                 final ChannelFuture channelFuture = 
channel.writeAndFlush(event);
                 channelFuture.syncUninterruptibly();
             } finally {
-                channelPool.release(channel);
+                releaseChannel(channel);
             }
         } catch (final Exception e) {
             throw new EventException(getChannelMessage("Send Failed"), e);
@@ -97,4 +101,11 @@ class NettyEventSender<T> implements EventSender<T> {
     private String getChannelMessage(final String message) {
         return String.format("%s Remote Address [%s]", message, remoteAddress);
     }
+
+    private void releaseChannel(final Channel channel) {
+        if (singleEventPerConnection) {
+            channel.close();
+        }
+        channelPool.release(channel);
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventSenderFactory.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventSenderFactory.java
index 85024ff..3a83fbd 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventSenderFactory.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventSenderFactory.java
@@ -49,6 +49,7 @@ import java.util.function.Supplier;
  */
 public class NettyEventSenderFactory<T> extends EventLoopGroupFactory 
implements EventSenderFactory<T> {
     private static final int MAX_PENDING_ACQUIRES = 1024;
+    private Integer socketSendBufferSize = null;
 
     private final String address;
 
@@ -64,6 +65,8 @@ public class NettyEventSenderFactory<T> extends 
EventLoopGroupFactory implements
 
     private SSLContext sslContext;
 
+    private boolean singleEventPerConnection = false;
+
     public NettyEventSenderFactory(final String address, final int port, final 
TransportProtocol protocol) {
         this.address = address;
         this.port = port;
@@ -71,6 +74,15 @@ public class NettyEventSenderFactory<T> extends 
EventLoopGroupFactory implements
     }
 
     /**
+     * Set Socket Send Buffer Size for TCP Sockets
+     *
+     * @param socketSendBufferSize Send Buffer size can be null to use default 
setting
+     */
+    public void setSocketSendBufferSize(final Integer socketSendBufferSize) {
+        this.socketSendBufferSize = socketSendBufferSize;
+    }
+
+    /**
      * Set Channel Handler Supplier
      *
      * @param handlerSupplier Channel Handler Supplier
@@ -107,6 +119,16 @@ public class NettyEventSenderFactory<T> extends 
EventLoopGroupFactory implements
     }
 
     /**
+     * Send a single event for the session and close the connection. Useful 
for endpoints which can not be configured
+     * to listen for a delimiter.
+     *
+     * @param singleEventPerConnection true if the connection should be ended 
after an event is sent
+     */
+    public void setSingleEventPerConnection(final boolean 
singleEventPerConnection) {
+        this.singleEventPerConnection = singleEventPerConnection;
+    }
+
+    /*
      * Get Event Sender with connected Channel
      *
      * @return Connected Event Sender
@@ -130,12 +152,15 @@ public class NettyEventSenderFactory<T> extends 
EventLoopGroupFactory implements
     private void setChannelOptions(final Bootstrap bootstrap) {
         final int timeoutMilliseconds = (int) timeout.toMillis();
         bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 
timeoutMilliseconds);
+        if (socketSendBufferSize != null) {
+            bootstrap.option(ChannelOption.SO_SNDBUF, socketSendBufferSize);
+        }
     }
 
     private EventSender<T> getConfiguredEventSender(final Bootstrap bootstrap) 
{
         final SocketAddress remoteAddress = bootstrap.config().remoteAddress();
         final ChannelPool channelPool = getChannelPool(bootstrap);
-        return new NettyEventSender<>(bootstrap.config().group(), channelPool, 
remoteAddress);
+        return new NettyEventSender<>(bootstrap.config().group(), channelPool, 
remoteAddress, singleEventPerConnection);
     }
 
     private ChannelPool getChannelPool(final Bootstrap bootstrap) {
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventServerFactory.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventServerFactory.java
index 6d23161..161622e 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventServerFactory.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventServerFactory.java
@@ -23,6 +23,7 @@ import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
+import io.netty.channel.FixedRecvByteBufAllocator;
 import io.netty.channel.socket.nio.NioDatagramChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import org.apache.nifi.event.transport.EventException;
@@ -107,16 +108,25 @@ public class NettyEventServerFactory extends 
EventLoopGroupFactory implements Ev
     @Override
     public EventServer getEventServer() {
         final AbstractBootstrap<?, ?> bootstrap = getBootstrap();
+        setBufferSize(bootstrap);
         final EventLoopGroup group = getEventLoopGroup();
         bootstrap.group(group);
         return getBoundEventServer(bootstrap, group);
     }
 
+    private void setBufferSize(AbstractBootstrap<?, ?> bootstrap) {
+        if (socketReceiveBuffer != null) {
+            bootstrap.option(ChannelOption.SO_RCVBUF, socketReceiveBuffer);
+            bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new 
FixedRecvByteBufAllocator(socketReceiveBuffer));
+        }
+    }
+
     private AbstractBootstrap<?, ?> getBootstrap() {
         if (TransportProtocol.UDP.equals(protocol)) {
             final Bootstrap bootstrap = new Bootstrap();
             bootstrap.channel(NioDatagramChannel.class);
             bootstrap.handler(new 
StandardChannelInitializer<>(handlerSupplier));
+
             return bootstrap;
         } else {
             final ServerBootstrap bootstrap = new ServerBootstrap();
@@ -127,10 +137,6 @@ public class NettyEventServerFactory extends 
EventLoopGroupFactory implements Ev
                 bootstrap.childHandler(new 
ServerSslHandlerChannelInitializer<>(handlerSupplier, sslContext, clientAuth));
             }
 
-            if (socketReceiveBuffer != null) {
-                bootstrap.option(ChannelOption.SO_RCVBUF, socketReceiveBuffer);
-            }
-
             return bootstrap;
         }
     }
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/StreamingNettyEventSenderFactory.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/StreamingNettyEventSenderFactory.java
new file mode 100644
index 0000000..efb130b
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/StreamingNettyEventSenderFactory.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.event.transport.netty;
+
+import io.netty.handler.stream.ChunkedWriteHandler;
+import org.apache.nifi.event.transport.configuration.TransportProtocol;
+import 
org.apache.nifi.event.transport.netty.channel.LogExceptionChannelHandler;
+import org.apache.nifi.event.transport.netty.codec.InputStreamMessageEncoder;
+import org.apache.nifi.logging.ComponentLog;
+
+import java.io.InputStream;
+import java.util.Arrays;
+
+/**
+ * Netty Event Sender Factory for messages in an InputStream
+ */
+public class StreamingNettyEventSenderFactory extends 
NettyEventSenderFactory<InputStream> {
+    /**
+     * Netty Event Sender Factory using InputStream. Uses a custom 
InputStreamMessageEncoder and a ChunkedWriteHandler.
+     *
+     * @param log Component Log
+     * @param address Remote Address
+     * @param port Remote Port Number
+     * @param protocol Channel Protocol
+     */
+    public StreamingNettyEventSenderFactory(final ComponentLog log, final 
String address, final int port, final TransportProtocol protocol) {
+        super(address, port, protocol);
+        final LogExceptionChannelHandler logExceptionChannelHandler = new 
LogExceptionChannelHandler(log);
+        final InputStreamMessageEncoder inputStreamMessageEncoder = new 
InputStreamMessageEncoder();
+
+        setHandlerSupplier(() -> Arrays.asList(
+                logExceptionChannelHandler,
+                new ChunkedWriteHandler(),
+                inputStreamMessageEncoder
+        ));
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/codec/InputStreamMessageEncoder.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/codec/InputStreamMessageEncoder.java
new file mode 100644
index 0000000..d7f4d84
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/codec/InputStreamMessageEncoder.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.event.transport.netty.codec;
+
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToMessageEncoder;
+import io.netty.handler.stream.ChunkedStream;
+import org.apache.nifi.event.transport.netty.DelimitedInputStream;
+
+import java.io.InputStream;
+import java.util.List;
+
+/**
+ * Message encoder for an InputStream, which wraps the stream in a 
ChunkedStream for use with a ChunkedWriter. Can add a delimiter
+ * to the end of the output objects if the InputStream is a 
DelimitedInputStream.
+ */
[email protected]
+public class InputStreamMessageEncoder extends 
MessageToMessageEncoder<InputStream> {
+
+    @Override
+    protected void encode(ChannelHandlerContext context, InputStream 
messageStream, List<Object> out) throws Exception {
+        ChunkedStream chunkedMessage = new ChunkedStream(messageStream);
+        out.add(chunkedMessage);
+
+        // If the message being sent requires a delimiter added to the end of 
the message, provide a DelimitedInputStream
+        if (messageStream instanceof DelimitedInputStream) {
+            DelimitedInputStream delimStream = (DelimitedInputStream) 
messageStream;
+            out.add(Unpooled.wrappedBuffer(delimStream.getDelimiter()));
+        }
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/test/java/org/apache/nifi/event/transport/netty/NettyEventSenderTest.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/test/java/org/apache/nifi/event/transport/netty/NettyEventSenderTest.java
index 9341136..5fb613c 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/test/java/org/apache/nifi/event/transport/netty/NettyEventSenderTest.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/test/java/org/apache/nifi/event/transport/netty/NettyEventSenderTest.java
@@ -47,7 +47,7 @@ public class NettyEventSenderTest {
     @Test
     public void testClose() {
         final SocketAddress socketAddress = 
InetSocketAddress.createUnresolved(LOCALHOST, 
NetworkUtils.getAvailableTcpPort());
-        final NettyEventSender<?> sender = new NettyEventSender<>(group, 
channelPool, socketAddress);
+        final NettyEventSender<?> sender = new NettyEventSender<>(group, 
channelPool, socketAddress, false);
         doReturn(shutdownFuture).when(group).shutdownGracefully();
         sender.close();
 
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/pom.xml 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/pom.xml
index 7fed8b7..b0daa96 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/pom.xml
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/pom.xml
@@ -98,6 +98,12 @@
             <artifactId>nifi-mock-record-utils</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-event-transport</artifactId>
+            <version>1.14.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
     </dependencies>
     <profiles>
         <profile>
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
index 8c3f58d..1b2798c 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
@@ -20,22 +20,25 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.event.transport.EventSender;
+import org.apache.nifi.event.transport.configuration.TransportProtocol;
+import org.apache.nifi.event.transport.netty.ByteArrayNettyEventSenderFactory;
+import org.apache.nifi.event.transport.netty.NettyEventSenderFactory;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processor.util.put.sender.ChannelSender;
-import org.apache.nifi.processor.util.put.sender.DatagramChannelSender;
-import org.apache.nifi.processor.util.put.sender.SSLSocketChannelSender;
-import org.apache.nifi.processor.util.put.sender.SocketChannelSender;
 import org.apache.nifi.ssl.SSLContextService;
 
 import javax.net.ssl.SSLContext;
 import java.io.IOException;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
@@ -85,10 +88,8 @@ public abstract class AbstractPutEventProcessor extends 
AbstractSessionFactoryPr
 
     // Putting these properties here so sub-classes don't have to redefine 
them, but they are
     // not added to the properties by default since not all processors may 
need them
-
     public static final AllowableValue TCP_VALUE = new AllowableValue("TCP", 
"TCP");
     public static final AllowableValue UDP_VALUE = new AllowableValue("UDP", 
"UDP");
-
     public static final PropertyDescriptor PROTOCOL = new PropertyDescriptor
             .Builder().name("Protocol")
             .description("The protocol for communication.")
@@ -115,6 +116,7 @@ public abstract class AbstractPutEventProcessor extends 
AbstractSessionFactoryPr
             .required(true)
             .defaultValue("UTF-8")
             .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .build();
     public static final PropertyDescriptor TIMEOUT = new 
PropertyDescriptor.Builder()
             .name("Timeout")
@@ -122,6 +124,7 @@ public abstract class AbstractPutEventProcessor extends 
AbstractSessionFactoryPr
             .required(false)
             .defaultValue("10 seconds")
             .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .build();
     public static final PropertyDescriptor OUTGOING_MESSAGE_DELIMITER = new 
PropertyDescriptor.Builder()
             .name("Outgoing Message Delimiter")
@@ -140,7 +143,6 @@ public abstract class AbstractPutEventProcessor extends 
AbstractSessionFactoryPr
             .defaultValue("false")
             .allowableValues("true", "false")
             .build();
-
     public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new 
PropertyDescriptor.Builder()
             .name("SSL Context Service")
             .description("The Controller Service to use in order to obtain an 
SSL Context. If this property is set, " +
@@ -158,14 +160,11 @@ public abstract class AbstractPutEventProcessor extends 
AbstractSessionFactoryPr
             .description("FlowFiles that failed to send to the destination are 
sent out this relationship.")
             .build();
 
-    private static final long SENDER_POOL_POLL_TIMEOUT = 250;
-    private static final TimeUnit SENDER_POOL_POLL_TIMEOUT_UNIT = 
TimeUnit.MILLISECONDS;
-
     private Set<Relationship> relationships;
     private List<PropertyDescriptor> descriptors;
 
     protected volatile String transitUri;
-    private volatile BlockingQueue<ChannelSender> senderPool;
+    protected EventSender eventSender;
 
     protected final BlockingQueue<FlowFileMessageBatch> completeBatches = new 
LinkedBlockingQueue<>();
     protected final Set<FlowFileMessageBatch> activeBatches = 
Collections.synchronizedSet(new HashSet<>());
@@ -177,6 +176,7 @@ public abstract class AbstractPutEventProcessor extends 
AbstractSessionFactoryPr
         descriptors.add(PORT);
         descriptors.add(MAX_SOCKET_SEND_BUFFER_SIZE);
         descriptors.add(IDLE_EXPIRATION);
+        descriptors.add(TIMEOUT);
         descriptors.addAll(getAdditionalProperties());
         this.descriptors = Collections.unmodifiableList(descriptors);
 
@@ -218,18 +218,14 @@ public abstract class AbstractPutEventProcessor extends 
AbstractSessionFactoryPr
     @OnScheduled
     public void onScheduled(final ProcessContext context) throws IOException {
         // initialize the queue of senders, one per task, senders will get 
created on the fly in onTrigger
-        this.senderPool = new 
LinkedBlockingQueue<>(context.getMaxConcurrentTasks());
+        this.eventSender = getEventSender(context);
         this.transitUri = createTransitUri(context);
     }
 
     @OnStopped
-    public void closeSenders() {
-        if (senderPool != null) {
-            ChannelSender sender = pollSenderPool();
-            while (sender != null) {
-                sender.close();
-                sender = pollSenderPool();
-            }
+    public void closeSenders() throws Exception {
+        if (eventSender != null) {
+            eventSender.close();
         }
     }
 
@@ -243,145 +239,30 @@ public abstract class AbstractPutEventProcessor extends 
AbstractSessionFactoryPr
      */
     protected abstract String createTransitUri(final ProcessContext context);
 
-    /**
-     * Sub-classes create a ChannelSender given a context.
-     *
-     * @param context the current context
-     * @return an implementation of ChannelSender
-     * @throws IOException if an error occurs creating the ChannelSender
-     */
-    protected abstract ChannelSender createSender(final ProcessContext 
context) throws IOException;
-
-    /**
-     * Close any senders that haven't been active with in the given threshold
-     *
-     * @param idleThreshold the threshold to consider a sender as idle
-     * @return the number of connections that were closed as a result of being 
idle
-     */
-    protected PruneResult pruneIdleSenders(final long idleThreshold) {
-        int numClosed = 0;
-        int numConsidered = 0;
-
-        long currentTime = System.currentTimeMillis();
-        final List<ChannelSender> putBack = new ArrayList<>();
-
-        // if a connection hasn't been used with in the threshold then it gets 
closed
-        ChannelSender sender;
-        while ((sender = pollSenderPool()) != null) {
-            numConsidered++;
-            if (currentTime > (sender.getLastUsed() + idleThreshold)) {
-                getLogger().debug("Closing idle connection...");
-                sender.close();
-                numClosed++;
-            } else {
-                putBack.add(sender);
-            }
-        }
-
-        // re-queue senders that weren't idle, but if the queue is full then 
close the sender
-        for (ChannelSender putBackSender : putBack) {
-            boolean returned = senderPool.offer(putBackSender);
-            if (!returned) {
-                putBackSender.close();
-            }
-        }
-
-        return new PruneResult(numClosed, numConsidered);
-    }
-
-    /**
-     * Helper for sub-classes to create a sender.
-     *
-     * @param protocol the protocol for the sender
-     * @param host the host to send to
-     * @param port the port to send to
-     * @param timeout the timeout for connecting and communicating over the 
channel
-     * @param maxSendBufferSize the maximum size of the socket send buffer
-     * @param sslContext an SSLContext, or null if not using SSL
-     *
-     * @return a ChannelSender based on the given properties
-     *
-     * @throws IOException if an error occurs creating the sender
-     */
-    protected ChannelSender createSender(final String protocol,
-                                         final String host,
-                                         final int port,
-                                         final int timeout,
-                                         final int maxSendBufferSize,
-                                         final SSLContext sslContext) throws 
IOException {
-
-        ChannelSender sender;
-        if (protocol.equals(UDP_VALUE.getValue())) {
-            sender = new DatagramChannelSender(host, port, maxSendBufferSize, 
getLogger());
-        } else {
-            // if an SSLContextService is provided then we make a secure sender
-            if (sslContext != null) {
-                sender = new SSLSocketChannelSender(host, port, 
maxSendBufferSize, sslContext, getLogger());
-            } else {
-                sender = new SocketChannelSender(host, port, 
maxSendBufferSize, getLogger());
-            }
-        }
-
-        sender.setTimeout(timeout);
-        sender.open();
-        return sender;
-    }
-
-    /**
-     * Helper method to acquire an available ChannelSender from the pool. If 
the pool is empty then the a new sender is created.
-     *
-     * @param context
-     *            - the current process context.
-     *
-     * @param session
-     *            - the current process session.
-     * @param flowFile
-     *            - the FlowFile being processed in this session.
-     *
-     * @return ChannelSender - the sender that has been acquired or null if no 
sender is available and a new sender cannot be created.
-     */
-    protected ChannelSender acquireSender(final ProcessContext context, final 
ProcessSession session, final FlowFile flowFile) {
-        ChannelSender sender = pollSenderPool();
-        if (sender == null) {
-            try {
-                getLogger().debug("No available connections, creating a new 
one...");
-                sender = createSender(context);
-            } catch (IOException e) {
-                getLogger().error("No available connections, and unable to 
create a new one, transferring {} to failure",
-                        new Object[]{flowFile}, e);
-                session.transfer(flowFile, REL_FAILURE);
-                session.commitAsync();
-                context.yield();
-                sender = null;
-            }
+    protected EventSender<?> getEventSender(final ProcessContext context) {
+        final String hostname = 
context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
+        final int port = 
context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
+        final String protocol = getProtocol(context);
+        final boolean singleEventPerConnection = 
context.getProperty(CONNECTION_PER_FLOWFILE).getValue() != null ? 
context.getProperty(CONNECTION_PER_FLOWFILE).asBoolean() : false;
+
+        final NettyEventSenderFactory factory = 
getNettyEventSenderFactory(hostname, port, protocol);
+        factory.setThreadNamePrefix(String.format("%s[%s]", 
getClass().getSimpleName(), getIdentifier()));
+        factory.setWorkerThreads(context.getMaxConcurrentTasks());
+        factory.setMaxConnections(context.getMaxConcurrentTasks());
+        
factory.setSocketSendBufferSize(context.getProperty(MAX_SOCKET_SEND_BUFFER_SIZE).asDataSize(DataUnit.B).intValue());
+        factory.setSingleEventPerConnection(singleEventPerConnection);
+
+        final int timeout = 
context.getProperty(TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+        factory.setTimeout(Duration.ofMillis(timeout));
+
+        final PropertyValue sslContextServiceProperty = 
context.getProperty(SSL_CONTEXT_SERVICE);
+        if (sslContextServiceProperty.isSet()) {
+            final SSLContextService sslContextService = 
sslContextServiceProperty.asControllerService(SSLContextService.class);
+            final SSLContext sslContext = sslContextService.createContext();
+            factory.setSslContext(sslContext);
         }
 
-        return sender;
-    }
-
-
-    /**
-     * Helper method to relinquish the ChannelSender back to the pool. If the 
sender is disconnected or the pool is full
-     * then the sender is closed and discarded.
-     *
-     * @param sender the sender to return or close
-     */
-    protected void relinquishSender(final ChannelSender sender) {
-        if (sender != null) {
-            // if the connection is still open then then try to return the 
sender to the pool.
-            if (sender.isConnected()) {
-                boolean returned = senderPool.offer(sender);
-                // if the pool is full then close the sender.
-                if (!returned) {
-                    getLogger().debug("Sender wasn't returned because queue 
was full, closing sender");
-                    sender.close();
-                }
-            } else {
-                // probably already closed here, but quietly close anyway to 
be safe.
-                getLogger().debug("Sender is not connected, closing sender");
-                sender.close();
-            }
-        }
+        return factory.getEventSender();
     }
 
     /**
@@ -612,22 +493,11 @@ public abstract class AbstractPutEventProcessor extends 
AbstractSessionFactoryPr
         return delimiter;
     }
 
-    /**
-     * Poll Sender Pool when not empty using a timeout to avoid blocking 
indefinitely
-     *
-     * @return Channel Sender or null when not found
-     */
-    private ChannelSender pollSenderPool() {
-        ChannelSender channelSender = null;
-
-        if (!senderPool.isEmpty()) {
-            try {
-                channelSender = senderPool.poll(SENDER_POOL_POLL_TIMEOUT, 
SENDER_POOL_POLL_TIMEOUT_UNIT);
-            } catch (final InterruptedException e) {
-                getLogger().warn("Interrupted while polling for 
ChannelSender", e);
-            }
-        }
+    protected String getProtocol(final ProcessContext context) {
+        return context.getProperty(PROTOCOL).getValue();
+    }
 
-        return channelSender;
+    protected NettyEventSenderFactory<?> getNettyEventSenderFactory(final 
String hostname, final int port, final String protocol) {
+        return new ByteArrayNettyEventSenderFactory(getLogger(), hostname, 
port, TransportProtocol.valueOf(protocol));
     }
 }
diff --git 
a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java
 
b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java
index 275e5fa..0a354d6 100644
--- 
a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java
+++ 
b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java
@@ -25,9 +25,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
-import javax.net.ssl.SSLContext;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -36,15 +34,14 @@ import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.event.transport.EventException;
 import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessSessionFactory;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.processor.util.put.AbstractPutEventProcessor;
-import org.apache.nifi.processor.util.put.sender.ChannelSender;
 import org.apache.nifi.ssl.SSLContextService;
 import org.apache.nifi.stream.io.ByteCountingInputStream;
 import org.apache.nifi.stream.io.StreamUtils;
@@ -109,23 +106,6 @@ public class PutSplunk extends AbstractPutEventProcessor {
     }
 
     @Override
-    protected ChannelSender createSender(ProcessContext context) throws 
IOException {
-        final int port = 
context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
-        final String host = 
context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
-        final String protocol = context.getProperty(PROTOCOL).getValue();
-        final int timeout = 
context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
-        final int maxSendBuffer = 
context.getProperty(MAX_SOCKET_SEND_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
-        final SSLContextService sslContextService = 
context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
-
-        SSLContext sslContext = null;
-        if (sslContextService != null) {
-            sslContext = sslContextService.createContext();
-        }
-
-        return createSender(protocol, host, port, timeout, maxSendBuffer, 
sslContext);
-    }
-
-    @Override
     public void onTrigger(ProcessContext context, ProcessSessionFactory 
sessionFactory) throws ProcessException {
         // first complete any batches from previous executions
         FlowFileMessageBatch batch;
@@ -136,45 +116,34 @@ public class PutSplunk extends AbstractPutEventProcessor {
         // create a session and try to get a FlowFile, if none available then 
close any idle senders
         final ProcessSession session = sessionFactory.createSession();
         final FlowFile flowFile = session.get();
+
         if (flowFile == null) {
-            final PruneResult result = 
pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue());
-            // yield if we closed an idle connection, or if there were no 
connections in the first place
-            if (result.getNumClosed() > 0 || (result.getNumClosed() == 0 && 
result.getNumConsidered() == 0)) {
-                context.yield();
-            }
             return;
         }
 
-        // get a sender from the pool, or create a new one if the pool is empty
-        // if we can't create a new connection then route flow files to 
failure and yield
-        // acquireSender will handle the routing to failure and yielding
-        ChannelSender sender = acquireSender(context, session, flowFile);
-        if (sender == null) {
-            return;
+        String delimiter = 
context.getProperty(MESSAGE_DELIMITER).evaluateAttributeExpressions(flowFile).getValue();
+        if (delimiter != null) {
+            delimiter = delimiter.replace("\\n", "\n").replace("\\r", 
"\r").replace("\\t", "\t");
         }
 
+        // if no delimiter then treat the whole FlowFile as a single message
         try {
-            String delimiter = 
context.getProperty(MESSAGE_DELIMITER).evaluateAttributeExpressions(flowFile).getValue();
-            if (delimiter != null) {
-                delimiter = delimiter.replace("\\n", "\n").replace("\\r", 
"\r").replace("\\t", "\t");
-            }
-
-            // if no delimiter then treat the whole FlowFile as a single 
message
             if (delimiter == null) {
-                processSingleMessage(context, session, flowFile, sender);
+                processSingleMessage(context, session, flowFile);
             } else {
-                processDelimitedMessages(context, session, flowFile, sender, 
delimiter);
+                processDelimitedMessages(context, session, flowFile, 
delimiter);
             }
-
-        } finally {
-            relinquishSender(sender);
+        } catch (EventException e) {
+            session.transfer(flowFile, REL_FAILURE);
+            session.commitAsync();
+            context.yield();
         }
     }
 
     /**
      * Send the entire FlowFile as a single message.
      */
-    private void processSingleMessage(ProcessContext context, ProcessSession 
session, FlowFile flowFile, ChannelSender sender) {
+    private void processSingleMessage(final ProcessContext context, final 
ProcessSession session, final FlowFile flowFile) {
         // copy the contents of the FlowFile to the ByteArrayOutputStream
         final ByteArrayOutputStream baos = new 
ByteArrayOutputStream((int)flowFile.getSize() + 1);
         session.read(flowFile, new InputStreamCallback() {
@@ -200,20 +169,15 @@ public class PutSplunk extends AbstractPutEventProcessor {
         activeBatches.add(messageBatch);
 
         // attempt to send the data and add the appropriate range
-        try {
-            sender.send(buf);
-            messageBatch.addSuccessfulRange(0L, flowFile.getSize());
-        } catch (IOException e) {
-            messageBatch.addFailedRange(0L, flowFile.getSize(), e);
-            context.yield();
-        }
+        eventSender.sendEvent(buf);
+        messageBatch.addSuccessfulRange(0L, flowFile.getSize());
     }
 
     /**
      * Read delimited messages from the FlowFile tracking which messages are 
sent successfully.
      */
     private void processDelimitedMessages(final ProcessContext context, final 
ProcessSession session, final FlowFile flowFile,
-                                          final ChannelSender sender, final 
String delimiter) {
+                                          final String delimiter) {
 
         final String protocol = context.getProperty(PROTOCOL).getValue();
         final byte[] delimiterBytes = 
delimiter.getBytes(StandardCharsets.UTF_8);
@@ -264,13 +228,9 @@ public class PutSplunk extends AbstractPutEventProcessor {
                                 // If the message has no data, ignore it.
                                 if (data.length != 0) {
                                     final long rangeStart = messageStartOffset;
-                                    try {
-                                        sender.send(data);
-                                        
messageBatch.addSuccessfulRange(rangeStart, messageEndOffset);
-                                        messagesSent.incrementAndGet();
-                                    } catch (final IOException e) {
-                                        
messageBatch.addFailedRange(rangeStart, messageEndOffset, e);
-                                    }
+                                    eventSender.sendEvent(data);
+                                    
messageBatch.addSuccessfulRange(rangeStart, messageEndOffset);
+                                    messagesSent.incrementAndGet();
                                 }
 
                                 // reset BAOS so that we can start a new 
message.
@@ -320,5 +280,4 @@ public class PutSplunk extends AbstractPutEventProcessor {
             return Arrays.copyOfRange(baos.toByteArray(), 0, length);
         }
     }
-
 }
diff --git 
a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestPutSplunk.java
 
b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestPutSplunk.java
index a0713a3..272faa5 100644
--- 
a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestPutSplunk.java
+++ 
b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestPutSplunk.java
@@ -16,44 +16,64 @@
  */
 package org.apache.nifi.processors.splunk;
 
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.util.put.sender.ChannelSender;
+import org.apache.nifi.event.transport.EventServer;
+import org.apache.nifi.event.transport.configuration.TransportProtocol;
+import org.apache.nifi.event.transport.message.ByteArrayMessage;
+import 
org.apache.nifi.event.transport.netty.ByteArrayMessageNettyEventServerFactory;
+import org.apache.nifi.event.transport.netty.NettyEventServerFactory;
+import org.apache.nifi.remote.io.socket.NetworkUtils;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
-import org.junit.Assert;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-import org.mockito.Mockito;
 
 import javax.net.ssl.SSLContext;
-import java.io.IOException;
+import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 
 
 public class TestPutSplunk {
 
     private TestRunner runner;
-    private TestablePutSplunk proc;
-    private CapturingChannelSender sender;
+    private BlockingQueue<ByteArrayMessage> messages;
+    private EventServer eventServer;
+    private final static int DEFAULT_TEST_TIMEOUT_PERIOD = 10000;
+    private final static String OUTGOING_MESSAGE_DELIMITER = "\n";
+    private static final Charset CHARSET = StandardCharsets.UTF_8;
+    private final static int VALID_LARGE_FILE_SIZE = 32768;
+    private static final String LOCALHOST = "localhost";
 
     @Before
-    public void init() {
-        ComponentLog logger = Mockito.mock(ComponentLog.class);
-        sender = new CapturingChannelSender("localhost", 12345, 0, logger);
-        proc = new TestablePutSplunk(sender);
+    public void setup() throws Exception {
+        runner = TestRunners.newTestRunner(PutSplunk.class);
+    }
 
-        runner = TestRunners.newTestRunner(proc);
-        runner.setProperty(PutSplunk.PORT, "12345");
+    @After
+    public void cleanup() {
+        runner.shutdown();
+        shutdownServer();
+    }
+
+    private void shutdownServer() {
+        if (eventServer != null) {
+            eventServer.shutdown();
+        }
     }
 
-    @Test
-    public void testUDPSendWholeFlowFile() {
-        runner.setProperty(PutSplunk.PROTOCOL, PutSplunk.UDP_VALUE.getValue());
+    @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
+    public void testUDPSendWholeFlowFile() throws Exception {
+        createTestServer(TransportProtocol.UDP);
+        runner.setProperty(PutSplunk.MESSAGE_DELIMITER, 
OUTGOING_MESSAGE_DELIMITER);
         final String message = "This is one message, should send the whole 
FlowFile";
 
         runner.enqueue(message);
@@ -63,16 +83,13 @@ public class TestPutSplunk {
         final MockFlowFile mockFlowFile = 
runner.getFlowFilesForRelationship(PutSplunk.REL_SUCCESS).get(0);
         mockFlowFile.assertContentEquals(message);
 
-        Assert.assertEquals(1, sender.getMessages().size());
-        Assert.assertEquals(message, sender.getMessages().get(0));
+        checkReceivedAllData(message);
     }
 
-    @Test
-    public void testTCPSendWholeFlowFile() {
-        runner.setProperty(PutSplunk.PROTOCOL, PutSplunk.TCP_VALUE.getValue());
-
+    @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
+    public void testTCPSendWholeFlowFile() throws Exception {
+        createTestServer(TransportProtocol.TCP);
         final String message = "This is one message, should send the whole 
FlowFile";
-
         runner.enqueue(message);
         runner.run(1);
         runner.assertAllFlowFilesTransferred(PutSplunk.REL_SUCCESS, 1);
@@ -80,13 +97,29 @@ public class TestPutSplunk {
         final MockFlowFile mockFlowFile = 
runner.getFlowFilesForRelationship(PutSplunk.REL_SUCCESS).get(0);
         mockFlowFile.assertContentEquals(message);
 
-        Assert.assertEquals(1, sender.getMessages().size());
-        Assert.assertEquals(message + "\n", sender.getMessages().get(0));
+        checkReceivedAllData(message);
+    }
+
+    @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
+    public void testTCPSendMultipleFlowFiles() throws Exception {
+        createTestServer(TransportProtocol.TCP);
+
+        final String message = "This is one message, should send the whole 
FlowFile";
+
+        runner.enqueue(message);
+        runner.enqueue(message);
+        runner.run(2);
+        runner.assertAllFlowFilesTransferred(PutSplunk.REL_SUCCESS, 2);
+
+        final MockFlowFile mockFlowFile = 
runner.getFlowFilesForRelationship(PutSplunk.REL_SUCCESS).get(0);
+        mockFlowFile.assertContentEquals(message);
+
+        checkReceivedAllData(message, message);
     }
 
-    @Test
-    public void testTCPSendWholeFlowFileAlreadyHasNewLine() {
-        runner.setProperty(PutSplunk.PROTOCOL, PutSplunk.TCP_VALUE.getValue());
+    @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
+    public void testTCPSendWholeFlowFileAlreadyHasNewLine() throws Exception {
+        createTestServer(TransportProtocol.TCP);
 
         final String message = "This is one message, should send the whole 
FlowFile\n";
 
@@ -97,14 +130,12 @@ public class TestPutSplunk {
         final MockFlowFile mockFlowFile = 
runner.getFlowFilesForRelationship(PutSplunk.REL_SUCCESS).get(0);
         mockFlowFile.assertContentEquals(message);
 
-        Assert.assertEquals(1, sender.getMessages().size());
-        Assert.assertEquals(message, sender.getMessages().get(0));
+        checkReceivedAllData(message.trim());
     }
 
-    @Test
-    public void testUDPSendDelimitedMessages() {
-        runner.setProperty(PutSplunk.PROTOCOL, PutSplunk.UDP_VALUE.getValue());
-
+    @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
+    public void testUDPSendDelimitedMessages() throws Exception {
+        createTestServer(TransportProtocol.UDP);
         final String delimiter = "DD";
         runner.setProperty(PutSplunk.MESSAGE_DELIMITER, delimiter);
 
@@ -117,17 +148,14 @@ public class TestPutSplunk {
         final MockFlowFile mockFlowFile = 
runner.getFlowFilesForRelationship(PutSplunk.REL_SUCCESS).get(0);
         mockFlowFile.assertContentEquals(message);
 
-        Assert.assertEquals(3, sender.getMessages().size());
-        Assert.assertEquals("This is message 1", sender.getMessages().get(0));
-        Assert.assertEquals("This is message 2", sender.getMessages().get(1));
-        Assert.assertEquals("This is message 3", sender.getMessages().get(2));
+        checkReceivedAllData("This is message 1", "This is message 2", "This 
is message 3");
     }
 
-    @Test
-    public void testTCPSendDelimitedMessages() {
+    @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
+    public void testTCPSendDelimitedMessages() throws Exception {
+        createTestServer(TransportProtocol.TCP);
         final String delimiter = "DD";
         runner.setProperty(PutSplunk.MESSAGE_DELIMITER, delimiter);
-        runner.setProperty(PutSplunk.PROTOCOL, PutSplunk.TCP_VALUE.getValue());
 
         // no delimiter at end
         final String message = "This is message 1DDThis is message 2DDThis is 
message 3";
@@ -139,17 +167,15 @@ public class TestPutSplunk {
         final MockFlowFile mockFlowFile = 
runner.getFlowFilesForRelationship(PutSplunk.REL_SUCCESS).get(0);
         mockFlowFile.assertContentEquals(message);
 
-        Assert.assertEquals(3, sender.getMessages().size());
-        Assert.assertEquals("This is message 1\n", 
sender.getMessages().get(0));
-        Assert.assertEquals("This is message 2\n", 
sender.getMessages().get(1));
-        Assert.assertEquals("This is message 3\n", 
sender.getMessages().get(2));
+        checkReceivedAllData("This is message 1", "This is message 2", "This 
is message 3");
     }
 
-    @Test
-    public void testTCPSendDelimitedMessagesWithEL() {
+    @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
+    public void testTCPSendDelimitedMessagesWithEL() throws Exception {
+        createTestServer(TransportProtocol.TCP);
+
         final String delimiter = "DD";
         runner.setProperty(PutSplunk.MESSAGE_DELIMITER, "${flow.file.delim}");
-        runner.setProperty(PutSplunk.PROTOCOL, PutSplunk.TCP_VALUE.getValue());
 
         // no delimiter at end
         final String message = "This is message 1DDThis is message 2DDThis is 
message 3";
@@ -164,17 +190,14 @@ public class TestPutSplunk {
         final MockFlowFile mockFlowFile = 
runner.getFlowFilesForRelationship(PutSplunk.REL_SUCCESS).get(0);
         mockFlowFile.assertContentEquals(message);
 
-        Assert.assertEquals(3, sender.getMessages().size());
-        Assert.assertEquals("This is message 1\n", 
sender.getMessages().get(0));
-        Assert.assertEquals("This is message 2\n", 
sender.getMessages().get(1));
-        Assert.assertEquals("This is message 3\n", 
sender.getMessages().get(2));
+        checkReceivedAllData("This is message 1", "This is message 2", "This 
is message 3");
     }
 
-    @Test
-    public void testTCPSendDelimitedMessagesEndsWithDelimiter() {
+    @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
+    public void testTCPSendDelimitedMessagesEndsWithDelimiter() throws 
Exception {
+        createTestServer(TransportProtocol.TCP);
         final String delimiter = "DD";
         runner.setProperty(PutSplunk.MESSAGE_DELIMITER, delimiter);
-        runner.setProperty(PutSplunk.PROTOCOL, PutSplunk.TCP_VALUE.getValue());
 
         // delimiter at end
         final String message = "This is message 1DDThis is message 2DDThis is 
message 3DD";
@@ -186,17 +209,16 @@ public class TestPutSplunk {
         final MockFlowFile mockFlowFile = 
runner.getFlowFilesForRelationship(PutSplunk.REL_SUCCESS).get(0);
         mockFlowFile.assertContentEquals(message);
 
-        Assert.assertEquals(3, sender.getMessages().size());
-        Assert.assertEquals("This is message 1\n", 
sender.getMessages().get(0));
-        Assert.assertEquals("This is message 2\n", 
sender.getMessages().get(1));
-        Assert.assertEquals("This is message 3\n", 
sender.getMessages().get(2));
+        checkReceivedAllData("This is message 1", "This is message 2", "This 
is message 3");
+
     }
 
-    @Test
-    public void testTCPSendDelimitedMessagesWithNewLineDelimiter() {
+    @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
+    public void testTCPSendDelimitedMessagesWithNewLineDelimiter() throws 
Exception {
+        createTestServer(TransportProtocol.TCP);
         final String delimiter = "\\n";
         runner.setProperty(PutSplunk.MESSAGE_DELIMITER, delimiter);
-        runner.setProperty(PutSplunk.PROTOCOL, PutSplunk.TCP_VALUE.getValue());
+        runner.setProperty(PutSplunk.CHARSET, "UTF-8");
 
         final String message = "This is message 1\nThis is message 2\nThis is 
message 3";
 
@@ -207,89 +229,12 @@ public class TestPutSplunk {
         final MockFlowFile mockFlowFile = 
runner.getFlowFilesForRelationship(PutSplunk.REL_SUCCESS).get(0);
         mockFlowFile.assertContentEquals(message);
 
-        Assert.assertEquals(3, sender.getMessages().size());
-        Assert.assertEquals("This is message 1\n", 
sender.getMessages().get(0));
-        Assert.assertEquals("This is message 2\n", 
sender.getMessages().get(1));
-        Assert.assertEquals("This is message 3\n", 
sender.getMessages().get(2));
-    }
-
-    @Test
-    public void testTCPSendDelimitedMessagesWithErrors() {
-        sender.setErrorStart(3);
-        sender.setErrorEnd(4);
-
-        final String delimiter = "DD";
-        runner.setProperty(PutSplunk.MESSAGE_DELIMITER, delimiter);
-        runner.setProperty(PutSplunk.PROTOCOL, PutSplunk.TCP_VALUE.getValue());
-
-        // no delimiter at end
-        final String success = "This is message 1DDThis is message 2DD";
-        final String failure = "This is message 3DDThis is message 4";
-        final String message = success + failure;
-
-        runner.enqueue(message);
-        runner.run(1);
-        runner.assertTransferCount(PutSplunk.REL_SUCCESS, 1);
-        runner.assertTransferCount(PutSplunk.REL_FAILURE, 1);
-
-        // first two messages should went out success
-        final MockFlowFile successFlowFile = 
runner.getFlowFilesForRelationship(PutSplunk.REL_SUCCESS).get(0);
-        successFlowFile.assertContentEquals(success);
-
-        // second two messages should went to failure
-        final MockFlowFile failureFlowFile = 
runner.getFlowFilesForRelationship(PutSplunk.REL_FAILURE).get(0);
-        failureFlowFile.assertContentEquals(failure);
-
-        // should only have the first two messages
-        Assert.assertEquals(2, sender.getMessages().size());
-        Assert.assertEquals("This is message 1\n", 
sender.getMessages().get(0));
-        Assert.assertEquals("This is message 2\n", 
sender.getMessages().get(1));
+        checkReceivedAllData("This is message 1", "This is message 2", "This 
is message 3");
     }
 
-    @Test
-    public void testTCPSendDelimitedMessagesWithErrorsInMiddle() {
-        sender.setErrorStart(3);
-        sender.setErrorEnd(4);
-
-        final String delimiter = "DD";
-        runner.setProperty(PutSplunk.MESSAGE_DELIMITER, delimiter);
-        runner.setProperty(PutSplunk.PROTOCOL, PutSplunk.TCP_VALUE.getValue());
-
-        // no delimiter at end
-        final String success = "This is message 1DDThis is message 2DD";
-        final String failure = "This is message 3DDThis is message 4DD";
-        final String success2 = "This is message 5DDThis is message 6DDThis is 
message 7DD";
-        final String message = success + failure + success2;
-
-        runner.enqueue(message);
-        runner.run(1);
-        runner.assertTransferCount(PutSplunk.REL_SUCCESS, 2);
-        runner.assertTransferCount(PutSplunk.REL_FAILURE, 1);
-
-        // first two messages should have went out success
-        final MockFlowFile successFlowFile1 = 
runner.getFlowFilesForRelationship(PutSplunk.REL_SUCCESS).get(0);
-        successFlowFile1.assertContentEquals(success);
-
-        // last three messages should have went out success
-        final MockFlowFile successFlowFile2 = 
runner.getFlowFilesForRelationship(PutSplunk.REL_SUCCESS).get(1);
-        successFlowFile2.assertContentEquals(success2);
-
-        // second two messages should have went to failure
-        final MockFlowFile failureFlowFile = 
runner.getFlowFilesForRelationship(PutSplunk.REL_FAILURE).get(0);
-        failureFlowFile.assertContentEquals(failure);
-
-        // should only have the first two messages
-        Assert.assertEquals(5, sender.getMessages().size());
-        Assert.assertEquals("This is message 1\n", 
sender.getMessages().get(0));
-        Assert.assertEquals("This is message 2\n", 
sender.getMessages().get(1));
-        Assert.assertEquals("This is message 5\n", 
sender.getMessages().get(2));
-        Assert.assertEquals("This is message 6\n", 
sender.getMessages().get(3));
-        Assert.assertEquals("This is message 7\n", 
sender.getMessages().get(4));
-    }
-
-    @Test
-    public void testCompletingPreviousBatchOnNextExecution() {
-        runner.setProperty(PutSplunk.PROTOCOL, PutSplunk.UDP_VALUE.getValue());
+    @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
+    public void testCompletingPreviousBatchOnNextExecution() throws Exception {
+        createTestServer(TransportProtocol.UDP);
 
         final String message = "This is one message, should send the whole 
FlowFile";
 
@@ -300,15 +245,13 @@ public class TestPutSplunk {
         final MockFlowFile mockFlowFile = 
runner.getFlowFilesForRelationship(PutSplunk.REL_SUCCESS).get(0);
         mockFlowFile.assertContentEquals(message);
 
-        Assert.assertEquals(1, sender.getMessages().size());
-        Assert.assertEquals(message, sender.getMessages().get(0));
+        checkReceivedAllData(message);
     }
 
-    @Test
-    public void testUnableToCreateConnectionShouldRouteToFailure() {
-        PutSplunk proc = new UnableToConnectPutSplunk();
-        runner = TestRunners.newTestRunner(proc);
-        runner.setProperty(PutSplunk.PORT, "12345");
+    @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
+    public void testUnableToCreateConnectionShouldRouteToFailure() throws 
InterruptedException {
+        // Set an unreachable port
+        runner.setProperty(PutSplunk.PORT, 
String.valueOf(NetworkUtils.getAvailableUdpPort()));
 
         final String message = "This is one message, should send the whole 
FlowFile";
 
@@ -317,84 +260,38 @@ public class TestPutSplunk {
         runner.assertAllFlowFilesTransferred(PutSplunk.REL_FAILURE, 1);
     }
 
-    /**
-     * Extend PutSplunk to use a CapturingChannelSender.
-     */
-    private static class UnableToConnectPutSplunk extends PutSplunk {
-
-        @Override
-        protected ChannelSender createSender(String protocol, String host, int 
port, int timeout, int maxSendBufferSize, SSLContext sslContext) throws 
IOException {
-            throw new IOException("Unable to create connection");
-        }
+    private void createTestServer(final TransportProtocol protocol) {
+        createTestServer(LOCALHOST, protocol, null);
     }
 
-    /**
-     * Extend PutSplunk to use a CapturingChannelSender.
-     */
-    private static class TestablePutSplunk extends PutSplunk {
-
-        private ChannelSender sender;
-
-        public TestablePutSplunk(ChannelSender channelSender) {
-            this.sender = channelSender;
-        }
-
-        @Override
-        protected ChannelSender createSender(String protocol, String host, int 
port, int timeout, int maxSendBufferSize, SSLContext sslContext) throws 
IOException {
-            return sender;
+    private void createTestServer(final String address, final 
TransportProtocol protocol, final SSLContext sslContext) {
+        if (protocol == TransportProtocol.UDP) {
+            createTestServer(address, NetworkUtils.getAvailableUdpPort(), 
protocol, sslContext);
+        } else {
+            createTestServer(address, NetworkUtils.getAvailableTcpPort(), 
protocol, sslContext);
         }
     }
 
-
-    /**
-     * A ChannelSender that captures each message that was sent.
-     */
-    private static class CapturingChannelSender extends ChannelSender {
-
-        private List<String> messages = new ArrayList<>();
-        private int count = 0;
-        private int errorStart = -1;
-        private int errorEnd = -1;
-
-        public CapturingChannelSender(String host, int port, int 
maxSendBufferSize, ComponentLog logger) {
-            super(host, port, maxSendBufferSize, logger);
-        }
-
-        @Override
-        public void open() throws IOException {
-
-        }
-
-        @Override
-        protected void write(byte[] data) throws IOException {
-            count++;
-            if (errorStart > 0 && count >= errorStart && errorEnd > 0 && count 
<= errorEnd) {
-                throw new IOException("this is an error");
-            }
-            messages.add(new String(data, StandardCharsets.UTF_8));
-        }
-
-        @Override
-        public boolean isConnected() {
-            return false;
-        }
-
-        @Override
-        public void close() {
-
-        }
-
-        public List<String> getMessages() {
-            return messages;
+    private void createTestServer(final String address, final int port, final 
TransportProtocol protocol, final SSLContext sslContext) {
+        messages = new LinkedBlockingQueue<>();
+        runner.setProperty(PutSplunk.PROTOCOL, protocol.name());
+        runner.setProperty(PutSplunk.PORT, String.valueOf(port));
+        final byte[] delimiter = OUTGOING_MESSAGE_DELIMITER.getBytes(CHARSET);
+        NettyEventServerFactory serverFactory = new 
ByteArrayMessageNettyEventServerFactory(runner.getLogger(), address, port, 
protocol, delimiter, VALID_LARGE_FILE_SIZE, messages);
+        if (sslContext != null) {
+            serverFactory.setSslContext(sslContext);
         }
+        eventServer = serverFactory.getEventServer();
+    }
 
-        public void setErrorStart(int errorStart) {
-            this.errorStart = errorStart;
+    private void checkReceivedAllData(final String... sentData) throws 
Exception {
+        // check each sent FlowFile was successfully sent and received.
+        for (String item : sentData) {
+            ByteArrayMessage packet = messages.take();
+            assertNotNull(packet);
+            assertArrayEquals(item.getBytes(), packet.getMessage());
         }
 
-        public void setErrorEnd(int errorEnd) {
-            this.errorEnd = errorEnd;
-        }
+        assertNull("Unexpected extra messages found", messages.poll());
     }
-
 }
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java
index 2c0f17c..da6e479 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java
@@ -16,7 +16,6 @@
  */
 package org.apache.nifi.processors.standard;
 
-import org.apache.commons.io.IOUtils;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
@@ -24,23 +23,20 @@ import 
org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.event.transport.configuration.TransportProtocol;
+import org.apache.nifi.event.transport.netty.DelimitedInputStream;
+import org.apache.nifi.event.transport.netty.NettyEventSenderFactory;
+import org.apache.nifi.event.transport.netty.StreamingNettyEventSenderFactory;
 import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessSessionFactory;
 import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.processor.util.put.AbstractPutEventProcessor;
-import org.apache.nifi.processor.util.put.sender.ChannelSender;
-import org.apache.nifi.processor.util.put.sender.SocketChannelSender;
-import org.apache.nifi.ssl.SSLContextService;
 import org.apache.nifi.util.StopWatch;
-
-import javax.net.ssl.SSLContext;
-import java.io.BufferedInputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.OutputStream;
 import java.nio.charset.Charset;
 import java.util.Arrays;
 import java.util.List;
@@ -97,31 +93,6 @@ import java.util.concurrent.TimeUnit;
 public class PutTCP extends AbstractPutEventProcessor {
 
     /**
-     * Creates a concrete instance of a ChannelSender object to use for 
sending messages over a TCP stream.
-     *
-     * @param context
-     *            - the current process context.
-     *
-     * @return ChannelSender object.
-     */
-    @Override
-    protected ChannelSender createSender(final ProcessContext context) throws 
IOException {
-        final String protocol = TCP_VALUE.getValue();
-        final String hostname = 
context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
-        final int port = 
context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
-        final int timeout = 
context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
-        final int bufferSize = 
context.getProperty(MAX_SOCKET_SEND_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
-        final SSLContextService sslContextService = (SSLContextService) 
context.getProperty(SSL_CONTEXT_SERVICE).asControllerService();
-
-        SSLContext sslContext = null;
-        if (sslContextService != null) {
-            sslContext = sslContextService.createContext();
-        }
-
-        return createSender(protocol, hostname, port, timeout, bufferSize, 
sslContext);
-    }
-
-    /**
      * Creates a Universal Resource Identifier (URI) for this processor. 
Constructs a URI of the form TCP://< host >:< port > where the host and port
      * values are taken from the configured property values.
      *
@@ -168,46 +139,25 @@ public class PutTCP extends AbstractPutEventProcessor {
         final ProcessSession session = sessionFactory.createSession();
         final FlowFile flowFile = session.get();
         if (flowFile == null) {
-            final PruneResult result = 
pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue());
-            // yield if we closed an idle connection, or if there were no 
connections in the first place
-            if (result.getNumClosed() > 0 || (result.getNumClosed() == 0 && 
result.getNumConsidered() == 0)) {
-                context.yield();
-            }
             return;
         }
 
-        ChannelSender sender = acquireSender(context, session, flowFile);
-        if (sender == null) {
-            return;
-        }
-
-        // really shouldn't happen since we know the protocol is TCP here, but 
this is more graceful so we
-        // can cast to a SocketChannelSender later in order to obtain the 
OutputStream
-        if (!(sender instanceof SocketChannelSender)) {
-            getLogger().error("Processor can only be used with a 
SocketChannelSender, but obtained: " + sender.getClass().getCanonicalName());
-            context.yield();
-            return;
-        }
-
-        boolean closeSender = isConnectionPerFlowFile(context);
         try {
-            // We might keep the connection open across invocations of the 
processor so don't auto-close this
-            final OutputStream out = 
((SocketChannelSender)sender).getOutputStream();
-            final String delimiter = getOutgoingMessageDelimiter(context, 
flowFile);
-
-            final StopWatch stopWatch = new StopWatch(true);
-            try (final InputStream rawIn = session.read(flowFile);
-                 final BufferedInputStream in = new 
BufferedInputStream(rawIn)) {
-                IOUtils.copy(in, out);
-                if (delimiter != null) {
-                    final Charset charSet = 
Charset.forName(context.getProperty(CHARSET).getValue());
-                    out.write(delimiter.getBytes(charSet), 0, 
delimiter.length());
+            StopWatch stopWatch = new StopWatch(true);
+            session.read(flowFile, new InputStreamCallback() {
+                @Override
+                public void process(final InputStream in) throws IOException {
+                    InputStream event = in;
+
+                    String delimiter = getOutgoingMessageDelimiter(context, 
flowFile);
+                    if (delimiter != null) {
+                        final Charset charSet = 
Charset.forName(context.getProperty(CHARSET).getValue());
+                        event = new DelimitedInputStream(in, 
delimiter.getBytes(charSet));
+                    }
+
+                    eventSender.sendEvent(event);
                 }
-                out.flush();
-            } catch (final Exception e) {
-                closeSender = true;
-                throw e;
-            }
+            });
 
             session.getProvenanceReporter().send(flowFile, transitUri, 
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
             session.transfer(flowFile, REL_SUCCESS);
@@ -215,14 +165,6 @@ public class PutTCP extends AbstractPutEventProcessor {
         } catch (Exception e) {
             onFailure(context, session, flowFile);
             getLogger().error("Exception while handling a process session, 
transferring {} to failure.", new Object[] { flowFile }, e);
-        } finally {
-            if (closeSender) {
-                getLogger().debug("Closing sender");
-                sender.close();
-            } else {
-                getLogger().debug("Relinquishing sender");
-                relinquishSender(sender);
-            }
         }
     }
 
@@ -243,15 +185,13 @@ public class PutTCP extends AbstractPutEventProcessor {
         context.yield();
     }
 
-    /**
-     * Gets the current value of the "Connection Per FlowFile" property.
-     *
-     * @param context
-     *            - the current process context.
-     *
-     * @return boolean value - true if a connection per FlowFile is specified.
-     */
-    protected boolean isConnectionPerFlowFile(final ProcessContext context) {
-        return 
context.getProperty(CONNECTION_PER_FLOWFILE).getValue().equalsIgnoreCase("true");
+    @Override
+    protected String getProtocol(final ProcessContext context) {
+        return TCP_VALUE.getValue();
+    }
+
+    @Override
+    protected NettyEventSenderFactory<?> getNettyEventSenderFactory(final 
String hostname, final int port, final String protocol) {
+        return new StreamingNettyEventSenderFactory(getLogger(), hostname, 
port, TransportProtocol.valueOf(protocol));
     }
 }
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutUDP.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutUDP.java
index 6bcff04..a6ce71c 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutUDP.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutUDP.java
@@ -23,14 +23,12 @@ import 
org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessSessionFactory;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.processor.util.put.AbstractPutEventProcessor;
-import org.apache.nifi.processor.util.put.sender.ChannelSender;
 import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.util.StopWatch;
 
@@ -82,24 +80,6 @@ import java.util.concurrent.TimeUnit;
 public class PutUDP extends AbstractPutEventProcessor {
 
     /**
-     * Creates a concrete instance of a ChannelSender object to use for 
sending UDP datagrams.
-     *
-     * @param context
-     *            - the current process context.
-     *
-     * @return ChannelSender object.
-     */
-    @Override
-    protected ChannelSender createSender(final ProcessContext context) throws 
IOException {
-        final String protocol = UDP_VALUE.getValue();
-        final String hostname = 
context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
-        final int port = 
context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
-        final int bufferSize = 
context.getProperty(MAX_SOCKET_SEND_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
-
-        return createSender(protocol, hostname, port, 0, bufferSize, null);
-    }
-
-    /**
      * Creates a Universal Resource Identifier (URI) for this processor. 
Constructs a URI of the form UDP://host:port where the host and port values are 
taken from the configured property values.
      *
      * @param context
@@ -131,31 +111,21 @@ public class PutUDP extends AbstractPutEventProcessor {
         final ProcessSession session = sessionFactory.createSession();
         final FlowFile flowFile = session.get();
         if (flowFile == null) {
-            final PruneResult result = 
pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue());
-            // yield if we closed an idle connection, or if there were no 
connections in the first place
-            if (result.getNumClosed() > 0 || (result.getNumClosed() == 0 && 
result.getNumConsidered() == 0)) {
-                context.yield();
-            }
-            return;
-        }
-
-        ChannelSender sender = acquireSender(context, session, flowFile);
-        if (sender == null) {
             return;
         }
 
         try {
-            byte[] content = readContent(session, flowFile);
+            final byte[] content = readContent(session, flowFile);
             StopWatch stopWatch = new StopWatch(true);
-            sender.send(content);
+            if (content != null) {
+                eventSender.sendEvent(content);
+            }
             session.getProvenanceReporter().send(flowFile, transitUri, 
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
             session.transfer(flowFile, REL_SUCCESS);
             session.commitAsync();
         } catch (Exception e) {
             getLogger().error("Exception while handling a process session, 
transferring {} to failure.", new Object[] { flowFile }, e);
             onFailure(context, session, flowFile);
-        } finally {
-            relinquishSender(sender);
         }
     }
 
@@ -176,8 +146,6 @@ public class PutUDP extends AbstractPutEventProcessor {
         context.yield();
     }
 
-
-
     /**
      * Helper method to read the FlowFile content stream into a byte array.
      *
@@ -199,4 +167,9 @@ public class PutUDP extends AbstractPutEventProcessor {
 
         return baos.toByteArray();
     }
+
+    @Override
+    protected String getProtocol(final ProcessContext context) {
+        return UDP_VALUE.getValue();
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
index 815a639..55d083f 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
@@ -164,7 +164,6 @@ public class TestListenSyslog {
         runner.run(1, STOP_ON_FINISH_DISABLED);
 
         sendMessages(protocol, port, LineEnding.UNIX, VALID_MESSAGE);
-
         runner.run(1, STOP_ON_FINISH_ENABLED, INITIALIZE_DISABLED);
 
         final List<MockFlowFile> invalidFlowFiles = 
runner.getFlowFilesForRelationship(ListenSyslog.REL_INVALID);
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutTCP.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutTCP.java
index e608f13..2a9e60b 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutTCP.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutTCP.java
@@ -17,8 +17,12 @@
 
 package org.apache.nifi.processors.standard;
 
-import org.apache.commons.lang3.ArrayUtils;
-import org.apache.nifi.processors.standard.util.TCPTestServer;
+import org.apache.nifi.event.transport.EventServer;
+import org.apache.nifi.event.transport.configuration.TransportProtocol;
+import org.apache.nifi.event.transport.message.ByteArrayMessage;
+import 
org.apache.nifi.event.transport.netty.ByteArrayMessageNettyEventServerFactory;
+import org.apache.nifi.event.transport.netty.NettyEventServerFactory;
+import org.apache.nifi.remote.io.socket.NetworkUtils;
 import org.apache.nifi.security.util.KeyStoreUtils;
 import org.apache.nifi.security.util.TlsConfiguration;
 import org.apache.nifi.ssl.SSLContextService;
@@ -32,15 +36,12 @@ import org.junit.Test;
 import org.junit.rules.Timeout;
 import org.mockito.Mockito;
 
-import javax.net.ServerSocketFactory;
 import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLServerSocketFactory;
-import java.net.InetAddress;
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
+import java.util.Arrays;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
-import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 
@@ -52,7 +53,6 @@ public class TestPutTCP {
     private final static int MIN_VALID_PORT = 1;
     private final static int MAX_VALID_PORT = 65535;
     private final static int MAX_INVALID_PORT = 65536;
-    private final static int BUFFER_SIZE = 1024;
     private final static int VALID_LARGE_FILE_SIZE = 32768;
     private final static int VALID_SMALL_FILE_SIZE = 64;
     private final static int LOAD_TEST_ITERATIONS = 500;
@@ -68,16 +68,17 @@ public class TestPutTCP {
     @Rule
     public Timeout timeout = new Timeout(30, TimeUnit.SECONDS);
 
-    private TCPTestServer server;
+    private EventServer eventServer;
     private int port;
-    private ArrayBlockingQueue<List<Byte>> received;
+    private TransportProtocol PROTOCOL = TransportProtocol.TCP;
     private TestRunner runner;
+    private BlockingQueue<ByteArrayMessage> messages;
 
     @Before
     public void setup() throws Exception {
-        received = new ArrayBlockingQueue<>(BUFFER_SIZE);
         runner = TestRunners.newTestRunner(PutTCP.class);
         runner.setVariable(SERVER_VARIABLE, TCP_SERVER_ADDRESS);
+        port = NetworkUtils.getAvailableTcpPort();
     }
 
     @After
@@ -103,11 +104,10 @@ public class TestPutTCP {
 
     @Test
     public void testRunSuccess() throws Exception {
-        createTestServer(OUTGOING_MESSAGE_DELIMITER);
         configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, 
false);
+        createTestServer(TCP_SERVER_ADDRESS, port);
         sendTestData(VALID_FILES);
         assertMessagesReceived(VALID_FILES);
-        assertServerConnections(1);
     }
 
     @Test
@@ -116,7 +116,6 @@ public class TestPutTCP {
 
         final SSLContext sslContext = 
SslContextUtils.createSslContext(tlsConfiguration);
         assertNotNull("SSLContext not found", sslContext);
-
         final String identifier = SSLContextService.class.getName();
         final SSLContextService sslContextService = 
Mockito.mock(SSLContextService.class);
         Mockito.when(sslContextService.getIdentifier()).thenReturn(identifier);
@@ -124,28 +123,24 @@ public class TestPutTCP {
         runner.addControllerService(identifier, sslContextService);
         runner.enableControllerService(sslContextService);
         runner.setProperty(PutTCP.SSL_CONTEXT_SERVICE, identifier);
-
-        final SSLServerSocketFactory serverSocketFactory = 
sslContext.getServerSocketFactory();
-        createTestServer(OUTGOING_MESSAGE_DELIMITER, false, 
serverSocketFactory);
         configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, 
false);
+        createTestServer(TCP_SERVER_ADDRESS, port, sslContext);
         sendTestData(VALID_FILES);
         assertMessagesReceived(VALID_FILES);
-        assertServerConnections(1);
     }
 
     @Test
     public void testRunSuccessServerVariableExpression() throws Exception {
-        createTestServer(OUTGOING_MESSAGE_DELIMITER);
         configureProperties(TCP_SERVER_ADDRESS_EL, OUTGOING_MESSAGE_DELIMITER, 
false);
+        createTestServer(TCP_SERVER_ADDRESS, port);
         sendTestData(VALID_FILES);
         assertMessagesReceived(VALID_FILES);
-        assertServerConnections(1);
     }
 
     @Test
     public void testRunSuccessPruneSenders() throws Exception {
-        createTestServer(OUTGOING_MESSAGE_DELIMITER);
         configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, 
false);
+        createTestServer(TCP_SERVER_ADDRESS, port);
         sendTestData(VALID_FILES);
         assertTransfers(VALID_FILES.length);
         assertMessagesReceived(VALID_FILES);
@@ -156,31 +151,28 @@ public class TestPutTCP {
         runner.clearTransferState();
         sendTestData(VALID_FILES);
         assertMessagesReceived(VALID_FILES);
-        assertServerConnections(2);
     }
 
     @Test
     public void testRunSuccessMultiCharDelimiter() throws Exception {
-        createTestServer(OUTGOING_MESSAGE_DELIMITER_MULTI_CHAR);
         configureProperties(TCP_SERVER_ADDRESS, 
OUTGOING_MESSAGE_DELIMITER_MULTI_CHAR, false);
+        createTestServer(TCP_SERVER_ADDRESS, port);
         sendTestData(VALID_FILES);
         assertMessagesReceived(VALID_FILES);
-        assertServerConnections(1);
     }
 
     @Test
     public void testRunSuccessConnectionPerFlowFile() throws Exception {
-        createTestServer(OUTGOING_MESSAGE_DELIMITER, true);
         configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, 
true);
+        createTestServer(TCP_SERVER_ADDRESS, port);
         sendTestData(VALID_FILES);
         assertMessagesReceived(VALID_FILES);
-        assertServerConnections(VALID_FILES.length);
     }
 
     @Test
     public void testRunSuccessConnectionFailure() throws Exception {
-        createTestServer(OUTGOING_MESSAGE_DELIMITER);
         configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, 
false);
+        createTestServer(TCP_SERVER_ADDRESS, port);
         sendTestData(VALID_FILES);
         assertMessagesReceived(VALID_FILES);
 
@@ -189,67 +181,64 @@ public class TestPutTCP {
         Thread.sleep(500);
         runner.assertQueueEmpty();
 
-        createTestServer(OUTGOING_MESSAGE_DELIMITER);
         configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, 
false);
+        createTestServer(TCP_SERVER_ADDRESS, port);
         sendTestData(VALID_FILES);
         assertMessagesReceived(VALID_FILES);
-        assertServerConnections(1);
     }
 
     @Test
     public void testRunSuccessEmptyFile() throws Exception {
-        createTestServer(OUTGOING_MESSAGE_DELIMITER);
         configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, 
false);
+        createTestServer(TCP_SERVER_ADDRESS, port);
         sendTestData(EMPTY_FILE);
         assertTransfers(1);
         runner.assertQueueEmpty();
-        assertServerConnections(1);
     }
 
     @Test
     public void testRunSuccessLargeValidFile() throws Exception {
-        createTestServer(OUTGOING_MESSAGE_DELIMITER);
         configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, 
true);
+        createTestServer(TCP_SERVER_ADDRESS, port);
         final String[] testData = createContent(VALID_LARGE_FILE_SIZE);
         sendTestData(testData);
         assertMessagesReceived(testData);
-        assertServerConnections(testData.length);
     }
 
     @Test
     public void testRunSuccessFiveHundredMessages() throws Exception {
-        createTestServer(OUTGOING_MESSAGE_DELIMITER);
+        configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, 
false);
+        createTestServer(TCP_SERVER_ADDRESS, port);
         Thread.sleep(1000);
         final String[] testData = createContent(VALID_SMALL_FILE_SIZE);
-        configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, 
false);
         sendTestData(testData, LOAD_TEST_ITERATIONS, LOAD_TEST_THREAD_COUNT);
         assertMessagesReceived(testData, LOAD_TEST_ITERATIONS);
-        assertServerConnections(1);
     }
 
-    private void createTestServer(final String delimiter) throws Exception {
-        createTestServer(delimiter, false);
-    }
-
-    private void createTestServer(final String delimiter, final boolean 
closeOnMessageReceived) throws Exception {
-        createTestServer(delimiter, closeOnMessageReceived, 
ServerSocketFactory.getDefault());
+    private void createTestServer(final String address, final int port, final 
SSLContext sslContext) throws Exception {
+        messages = new LinkedBlockingQueue<>();
+        final byte[] delimiter = getDelimiter();
+        NettyEventServerFactory serverFactory = new 
ByteArrayMessageNettyEventServerFactory(runner.getLogger(), address, port, 
PROTOCOL, delimiter, VALID_LARGE_FILE_SIZE, messages);
+        if (sslContext != null) {
+            serverFactory.setSslContext(sslContext);
+        }
+        eventServer = serverFactory.getEventServer();
     }
 
-    private void createTestServer(final String delimiter, final boolean 
closeOnMessageReceived, final ServerSocketFactory serverSocketFactory) throws 
Exception {
-        server = new TCPTestServer(InetAddress.getByName(TCP_SERVER_ADDRESS), 
received, delimiter, closeOnMessageReceived);
-        server.startServer(serverSocketFactory);
-        port = server.getPort();
+    private void createTestServer(final String address, final int port) throws 
Exception {
+        createTestServer(address, port, null);
     }
 
     private void shutdownServer() {
-        if (server != null) {
-            server.shutdown();
+        if (eventServer != null) {
+            eventServer.shutdown();
         }
     }
 
     private void configureProperties(String host, String 
outgoingMessageDelimiter, boolean connectionPerFlowFile) {
         runner.setProperty(PutTCP.HOSTNAME, host);
         runner.setProperty(PutTCP.PORT, Integer.toString(port));
+
         if (outgoingMessageDelimiter != null) {
             runner.setProperty(PutTCP.OUTGOING_MESSAGE_DELIMITER, 
outgoingMessageDelimiter);
         }
@@ -285,23 +274,16 @@ public class TestPutTCP {
     private void assertMessagesReceived(final String[] sentData, final int 
iterations) throws Exception {
         for (int i = 0; i < iterations; i++) {
             for (String item : sentData) {
-                final List<Byte> message = received.take();
+                final ByteArrayMessage message = messages.take();
                 assertNotNull(String.format("Message [%d] not found", i), 
message);
-                final Byte[] messageBytes = new Byte[message.size()];
-                assertArrayEquals(item.getBytes(), 
ArrayUtils.toPrimitive(message.toArray(messageBytes)));
+                assert(Arrays.asList(sentData).contains(new 
String(message.getMessage())));
             }
         }
 
         runner.assertTransferCount(PutTCP.REL_SUCCESS, sentData.length * 
iterations);
         runner.clearTransferState();
 
-        assertNull("Unexpected Message Found", received.poll());
-    }
-
-    private void assertServerConnections(final int connections) throws 
InterruptedException {
-        while (server.getTotalConnections() != connections) {
-            Thread.sleep(10);
-        }
+        assertNull("Unexpected extra messages found", messages.poll());
     }
 
     private String[] createContent(final int size) {
@@ -313,4 +295,13 @@ public class TestPutTCP {
 
         return new String[] { new String(content) };
     }
+
+    private byte[] getDelimiter() {
+        String delimiter = 
runner.getProcessContext().getProperty(PutTCP.OUTGOING_MESSAGE_DELIMITER).getValue();
+        if (delimiter != null) {
+            return delimiter.getBytes();
+        } else {
+            return null;
+        }
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutUDP.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutUDP.java
index 813a30a..69e162e 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutUDP.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutUDP.java
@@ -20,10 +20,17 @@ import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 
-import java.net.DatagramPacket;
-import java.net.InetAddress;
-import java.util.concurrent.ArrayBlockingQueue;
-
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.nifi.event.transport.EventServer;
+import org.apache.nifi.event.transport.configuration.TransportProtocol;
+import org.apache.nifi.event.transport.message.ByteArrayMessage;
+import 
org.apache.nifi.event.transport.netty.ByteArrayMessageNettyEventServerFactory;
+import org.apache.nifi.event.transport.netty.NettyEventServerFactory;
+import org.apache.nifi.remote.io.socket.NetworkUtils;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.After;
@@ -38,7 +45,9 @@ public class TestPutUDP {
     private final static String UDP_SERVER_ADDRESS_EL = "${" + SERVER_VARIABLE 
+ "}";
     private final static String UNKNOWN_HOST = "fgdsfgsdffd";
     private final static String INVALID_IP_ADDRESS = "300.300.300.300";
-    private final static int BUFFER_SIZE = 1024;
+    private static final String DELIMITER = "\n";
+    private static final Charset CHARSET = StandardCharsets.UTF_8;
+    private final static int MAX_FRAME_LENGTH = 32800;
     private final static int VALID_LARGE_FILE_SIZE = 32768;
     private final static int VALID_SMALL_FILE_SIZE = 64;
     private final static int INVALID_LARGE_FILE_SIZE = 1000000;
@@ -51,9 +60,12 @@ public class TestPutUDP {
     private final static int DEFAULT_TEST_TIMEOUT_PERIOD = 10000;
     private final static int LONG_TEST_TIMEOUT_PERIOD = 100000;
 
-    private UDPTestServer server;
     private TestRunner runner;
-    private ArrayBlockingQueue<DatagramPacket> recvQueue;
+    private int port;
+    private TransportProtocol PROTOCOL = TransportProtocol.UDP;
+    private EventServer eventServer;
+    private BlockingQueue<ByteArrayMessage> messages;
+
 
     // Test Data
     private final static String[] EMPTY_FILE = { "" };
@@ -61,15 +73,18 @@ public class TestPutUDP {
 
     @Before
     public void setup() throws Exception {
-        createTestServer(UDP_SERVER_ADDRESS, 0, BUFFER_SIZE);
         runner = TestRunners.newTestRunner(PutUDP.class);
         runner.setVariable(SERVER_VARIABLE, UDP_SERVER_ADDRESS);
+        port = NetworkUtils.getAvailableUdpPort();
+        createTestServer(UDP_SERVER_ADDRESS, port, VALID_LARGE_FILE_SIZE);
     }
 
-    private void createTestServer(final String address, final int port, final 
int recvQueueSize) throws Exception {
-        recvQueue = new ArrayBlockingQueue<DatagramPacket>(recvQueueSize);
-        server = new UDPTestServer(InetAddress.getByName(address), port, 
recvQueue);
-        server.startServer();
+    private void createTestServer(final String address, final int port, final 
int frameSize) throws Exception {
+        messages = new LinkedBlockingQueue<>();
+        final byte[] delimiter = DELIMITER.getBytes(CHARSET);
+        NettyEventServerFactory serverFactory = new 
ByteArrayMessageNettyEventServerFactory(runner.getLogger(), address, port, 
PROTOCOL, delimiter, frameSize, messages);
+        serverFactory.setSocketReceiveBuffer(MAX_FRAME_LENGTH);
+        eventServer = serverFactory.getEventServer();
     }
 
     @After
@@ -79,22 +94,12 @@ public class TestPutUDP {
     }
 
     private void removeTestServer() {
-        if (server != null) {
-            server.shutdownServer();
-            server = null;
+        if (eventServer != null) {
+            eventServer.shutdown();
+            eventServer = null;
         }
     }
 
-    private byte[] getPacketData(final DatagramPacket packet) {
-        final int length = packet.getLength();
-        final byte[] packetData = packet.getData();
-        final byte[] resizedPacketData = new byte[length];
-        for (int i = 0; i < length; i++) {
-            resizedPacketData[i] = packetData[i];
-        }
-        return resizedPacketData;
-    }
-
     @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
     public void testValidFiles() throws Exception {
         configureProperties(UDP_SERVER_ADDRESS, true);
@@ -120,8 +125,8 @@ public class TestPutUDP {
         checkInputQueueIsEmpty();
     }
 
-    @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
-    public void testlargeValidFile() throws Exception {
+    @Test(timeout = LONG_TEST_TIMEOUT_PERIOD)
+    public void testLargeValidFile() throws Exception {
         configureProperties(UDP_SERVER_ADDRESS, true);
         final String[] testData = createContent(VALID_LARGE_FILE_SIZE);
         sendTestData(testData);
@@ -130,19 +135,13 @@ public class TestPutUDP {
     }
 
     @Test(timeout = LONG_TEST_TIMEOUT_PERIOD)
-    public void testlargeInvalidFile() throws Exception {
+    public void testLargeInvalidFile() throws Exception {
         configureProperties(UDP_SERVER_ADDRESS, true);
         String[] testData = createContent(INVALID_LARGE_FILE_SIZE);
         sendTestData(testData);
         checkRelationships(0, testData.length);
         checkNoDataReceived();
         checkInputQueueIsEmpty();
-
-        // Check that the processor recovers and can send the next valid file
-        testData = createContent(VALID_LARGE_FILE_SIZE);
-        sendTestData(testData);
-        checkReceivedAllData(testData);
-        checkInputQueueIsEmpty();
     }
 
     @Ignore("This test is failing intermittently as documented in NIFI-4288")
@@ -165,16 +164,16 @@ public class TestPutUDP {
         checkInputQueueIsEmpty();
     }
 
-    @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
+    @Test(timeout = LONG_TEST_TIMEOUT_PERIOD)
     public void testReconfiguration() throws Exception {
         configureProperties(UDP_SERVER_ADDRESS, true);
         sendTestData(VALID_FILES);
         checkReceivedAllData(VALID_FILES);
-        reset(UDP_SERVER_ADDRESS, 0, BUFFER_SIZE);
+        reset(UDP_SERVER_ADDRESS, port, MAX_FRAME_LENGTH);
         configureProperties(UDP_SERVER_ADDRESS, true);
         sendTestData(VALID_FILES);
         checkReceivedAllData(VALID_FILES);
-        reset(UDP_SERVER_ADDRESS, 0, BUFFER_SIZE);
+        reset(UDP_SERVER_ADDRESS, port, MAX_FRAME_LENGTH);
         configureProperties(UDP_SERVER_ADDRESS, true);
         sendTestData(VALID_FILES);
         checkReceivedAllData(VALID_FILES);
@@ -190,15 +189,17 @@ public class TestPutUDP {
         checkInputQueueIsEmpty();
     }
 
-    private void reset(final String address, final int port, final int 
recvQueueSize) throws Exception {
+    private void reset(final String address, final int port, final int 
frameSize) throws Exception {
         runner.clearTransferState();
         removeTestServer();
-        createTestServer(address, port, recvQueueSize);
+        createTestServer(address, port, frameSize);
     }
 
     private void configureProperties(final String host, final boolean 
expectValid) {
         runner.setProperty(PutUDP.HOSTNAME, host);
-        runner.setProperty(PutUDP.PORT, 
Integer.toString(server.getLocalPort()));
+        runner.setProperty(PutUDP.PORT, Integer.toString(port));
+        runner.setProperty(PutUDP.MAX_SOCKET_SEND_BUFFER_SIZE, "40000B");
+
         if (expectValid) {
             runner.assertValid();
         } else {
@@ -231,7 +232,7 @@ public class TestPutUDP {
 
     private void checkNoDataReceived() throws Exception {
         Thread.sleep(DATA_WAIT_PERIOD);
-        assertNull(recvQueue.poll());
+        assertNull("Unexpected extra messages found", messages.poll());
     }
 
     private void checkInputQueueIsEmpty() {
@@ -244,18 +245,17 @@ public class TestPutUDP {
 
     private void checkReceivedAllData(final String[] sentData, final int 
iterations) throws Exception {
         // check each sent FlowFile was successfully sent and received.
-        for (String item : sentData) {
+         for (String item : sentData) {
             for (int i = 0; i < iterations; i++) {
-                DatagramPacket packet = recvQueue.take();
+                ByteArrayMessage packet = messages.take();
                 assertNotNull(packet);
-                assertArrayEquals(item.getBytes(), getPacketData(packet));
+                assertArrayEquals(item.getBytes(), packet.getMessage());
             }
         }
 
         runner.assertTransferCount(PutUDP.REL_SUCCESS, sentData.length * 
iterations);
 
-        // Check that we have no unexpected extra data.
-        assertNull(recvQueue.poll());
+        assertNull("Unexpected extra messages found", messages.poll());
     }
 
     private String[] createContent(final int size) {
@@ -265,6 +265,6 @@ public class TestPutUDP {
             content[i] = CONTENT_CHAR;
         }
 
-        return new String[] { new String(content) };
+        return new String[] { new String(content).concat("\n") };
     }
 }
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TCPTestServer.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TCPTestServer.java
deleted file mode 100644
index 2549ffc..0000000
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TCPTestServer.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.standard.util;
-
-import org.apache.nifi.io.socket.SocketUtils;
-
-import java.io.InputStream;
-import java.net.InetAddress;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.atomic.AtomicInteger;
-import javax.net.ServerSocketFactory;
-
-public class TCPTestServer implements Runnable {
-
-    private final InetAddress ipAddress;
-    private final String messageDelimiter;
-    private final ArrayBlockingQueue<List<Byte>> queue;
-    private final AtomicInteger totalConnections = new AtomicInteger();
-    private final boolean closeOnMessageReceived;
-
-    private volatile ServerSocket serverSocket;
-    private volatile Socket connectionSocket;
-    private int port;
-
-    public TCPTestServer(final InetAddress ipAddress, final 
ArrayBlockingQueue<List<Byte>> queue, final String messageDelimiter, final 
boolean closeOnMessageReceived) {
-        this.ipAddress = ipAddress;
-        this.queue = queue;
-        this.messageDelimiter = messageDelimiter;
-        this.closeOnMessageReceived = closeOnMessageReceived;
-    }
-
-    public synchronized void startServer(final ServerSocketFactory 
serverSocketFactory) throws Exception {
-        if (!isServerRunning()) {
-            if (serverSocketFactory == null) {
-                serverSocket = new ServerSocket(0, 0, ipAddress);
-            } else {
-                serverSocket = serverSocketFactory.createServerSocket(0, 0, 
ipAddress);
-            }
-            Thread t = new Thread(this);
-            t.setName(this.getClass().getSimpleName());
-            t.start();
-            port = serverSocket.getLocalPort();
-        }
-    }
-
-    public synchronized void shutdown() {
-        shutdownConnection();
-        shutdownServer();
-    }
-
-    public int getPort(){
-        return port;
-    }
-
-    private synchronized void shutdownServer() {
-        if (isServerRunning()) {
-            SocketUtils.closeQuietly(serverSocket);
-        }
-    }
-
-    private synchronized void shutdownConnection() {
-        if (isConnected()) {
-            SocketUtils.closeQuietly(connectionSocket);
-        }
-    }
-
-    private void storeReceivedMessage(final List<Byte> message) {
-        queue.add(message);
-        if (closeOnMessageReceived) {
-            shutdownConnection();
-        }
-    }
-
-    private boolean isServerRunning() {
-        return serverSocket != null && !serverSocket.isClosed();
-    }
-
-    private boolean isConnected() {
-        return connectionSocket != null && !connectionSocket.isClosed();
-    }
-
-    public int getTotalConnections() {
-        return totalConnections.get();
-    }
-
-    protected boolean isDelimiterPresent(final List<Byte> message) {
-        if (messageDelimiter != null && message.size() >= 
messageDelimiter.length()) {
-            for (int i = 1; i <= messageDelimiter.length(); i++) {
-                if (message.get(message.size() - i) == 
messageDelimiter.charAt(messageDelimiter.length() - i)) {
-                    if (i == messageDelimiter.length()) {
-                        return true;
-                    }
-                } else {
-                    break;
-                }
-            }
-        }
-        return false;
-    }
-
-    protected boolean removeDelimiter(final List<Byte> message) {
-        if (isDelimiterPresent(message)) {
-            final int messageSize = message.size();
-            for (int i = 1; i <= messageDelimiter.length(); i++) {
-                message.remove(messageSize - i);
-            }
-            return true;
-        }
-
-        return false;
-    }
-
-    @Override
-    public void run() {
-        try {
-            while (isServerRunning()) {
-                connectionSocket = serverSocket.accept();
-                totalConnections.incrementAndGet();
-                final InputStream inputStream = 
connectionSocket.getInputStream();
-                while (isConnected()) {
-                    final List<Byte> message = new ArrayList<>();
-                    while (true) {
-                        final int c = inputStream.read();
-                        if (c < 0) {
-                            if (!message.isEmpty()) {
-                                storeReceivedMessage(message);
-                            }
-                            shutdownConnection();
-                            break;
-                        }
-
-                        message.add((byte) c);
-
-                        if (removeDelimiter(message)) {
-                            storeReceivedMessage(message);
-                            break;
-                        }
-                    }
-                }
-            }
-        } catch (Exception e) {
-            // Do Nothing
-        } finally {
-            shutdown();
-        }
-
-    }
-}

Reply via email to