This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 39f8a00  NIFI-8039 Adding properties to ListenTCP in order to allow 
refine behaviour under higher load; Refining thread pool for better scalability
39f8a00 is described below

commit 39f8a008d4bde2024eb804ff7017542a6e86b572
Author: Bence Simon <[email protected]>
AuthorDate: Wed Nov 25 21:53:01 2020 +0100

    NIFI-8039 Adding properties to ListenTCP in order to allow refine behaviour 
under higher load; Refining thread pool for better scalability
    
    NIFI-8039 Review findings; refining thread pool to be able to scale down 
properly when not under load
    NIFI-8039 Answers to PR comments
    
    This closes #4689.
    
    Signed-off-by: Peter Turcsanyi <[email protected]>
---
 .../apache/nifi/processors/beats/ListenBeats.java  | 28 ++++----
 .../handler/TestBeatsSocketChannelHandler.java     |  9 +--
 .../util/listen/AbstractListenEventProcessor.java  | 20 +-----
 .../util/listen/dispatcher/ByteBufferFactory.java  | 37 ++++++++++
 .../util/listen/dispatcher/ByteBufferPool.java     | 55 +++++++++++++++
 .../util/listen/dispatcher/ByteBufferSource.java   | 39 ++++++++++
 .../dispatcher/DatagramChannelDispatcher.java      | 22 ++----
 .../listen/dispatcher/SocketChannelDispatcher.java | 82 +++++++++++++---------
 .../processors/lumberjack/ListenLumberjack.java    | 28 ++++----
 .../handler/ITLumberjackSocketChannelHandler.java  | 36 +++++-----
 .../nifi/processors/standard/ListenRELP.java       | 28 ++++----
 .../nifi/processors/standard/ListenSyslog.java     | 62 ++++++++--------
 .../apache/nifi/processors/standard/ListenTCP.java | 80 +++++++++++++++++----
 .../apache/nifi/processors/standard/ListenUDP.java |  7 +-
 .../nifi/processors/standard/ListenUDPRecord.java  |  7 +-
 .../relp/handler/TestRELPSocketChannelHandler.java |  9 +--
 16 files changed, 366 insertions(+), 183 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/ListenBeats.java
 
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/ListenBeats.java
index eab3e76..bedca27 100644
--- 
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/ListenBeats.java
+++ 
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/ListenBeats.java
@@ -16,17 +16,6 @@
  */
 package org.apache.nifi.processors.beats;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import javax.net.ssl.SSLContext;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
@@ -45,6 +34,8 @@ import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import 
org.apache.nifi.processor.util.listen.AbstractListenEventBatchingProcessor;
 import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
+import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferPool;
+import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferSource;
 import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
 import 
org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher;
 import org.apache.nifi.processor.util.listen.event.EventFactory;
@@ -61,6 +52,17 @@ import org.apache.nifi.security.util.ClientAuth;
 import org.apache.nifi.ssl.RestrictedSSLContextService;
 import org.apache.nifi.ssl.SSLContextService;
 
+import javax.net.ssl.SSLContext;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+
 @InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
 @Tags({"listen", "beats", "tcp", "logs"})
 @CapabilityDescription("Listens for messages sent by libbeat compatible 
clients (e.g. filebeats, metricbeats, etc) using Libbeat's 'output.logstash', 
writing its JSON formatted payload " +
@@ -147,7 +149,7 @@ public class ListenBeats extends 
AbstractListenEventBatchingProcessor<BeatsEvent
         final Charset charSet = 
Charset.forName(context.getProperty(CHARSET).getValue());
 
         // initialize the buffer pool based on max number of connections and 
the buffer size
-        final BlockingQueue<ByteBuffer> bufferPool = 
createBufferPool(maxConnections, bufferSize);
+        final ByteBufferSource byteBufferSource = new 
ByteBufferPool(maxConnections, bufferSize);
 
         // if an SSLContextService was provided then create an SSLContext to 
pass down to the dispatcher
         SSLContext sslContext = null;
@@ -161,7 +163,7 @@ public class ListenBeats extends 
AbstractListenEventBatchingProcessor<BeatsEvent
         }
 
         // if we decide to support SSL then get the context and pass it in here
-        return new SocketChannelDispatcher<>(eventFactory, handlerFactory, 
bufferPool, events,
+        return new SocketChannelDispatcher<>(eventFactory, handlerFactory, 
byteBufferSource, events,
             getLogger(), maxConnections, sslContext, clientAuth, charSet);
     }
 
diff --git 
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/handler/TestBeatsSocketChannelHandler.java
 
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/handler/TestBeatsSocketChannelHandler.java
index 44f73a2..7c6c1cb 100644
--- 
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/handler/TestBeatsSocketChannelHandler.java
+++ 
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/handler/TestBeatsSocketChannelHandler.java
@@ -33,6 +33,8 @@ import javax.xml.bind.DatatypeConverter;
 
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
+import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferPool;
+import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferSource;
 import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
 import 
org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher;
 import org.apache.nifi.processor.util.listen.event.Event;
@@ -51,7 +53,7 @@ import org.mockito.Mockito;
 public class TestBeatsSocketChannelHandler {
     private EventFactory<TestEvent> eventFactory;
     private ChannelHandlerFactory<TestEvent,AsyncChannelDispatcher> 
channelHandlerFactory;
-    private BlockingQueue<ByteBuffer> byteBuffers;
+    private ByteBufferSource byteBufferSource;
     private BlockingQueue<TestEvent> events;
     private ComponentLog logger = Mockito.mock(ComponentLog.class);
     private int maxConnections;
@@ -64,8 +66,7 @@ public class TestBeatsSocketChannelHandler {
         eventFactory = new TestEventHolderFactory();
         channelHandlerFactory = new BeatsSocketChannelHandlerFactory<>();
 
-        byteBuffers = new LinkedBlockingQueue<>();
-        byteBuffers.add(ByteBuffer.allocate(4096));
+        byteBufferSource = new ByteBufferPool(1, 4096);
 
         events = new LinkedBlockingQueue<>();
         logger = Mockito.mock(ComponentLog.class);
@@ -74,7 +75,7 @@ public class TestBeatsSocketChannelHandler {
         sslContext = null;
         charset = StandardCharsets.UTF_8;
 
-        dispatcher = new SocketChannelDispatcher<>(eventFactory, 
channelHandlerFactory, byteBuffers, events, logger,
+        dispatcher = new SocketChannelDispatcher<>(eventFactory, 
channelHandlerFactory, byteBufferSource, events, logger,
                 maxConnections, sslContext, charset);
 
     }
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java
index 20df868..edba61a 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java
@@ -16,8 +16,6 @@
  */
 package org.apache.nifi.processor.util.listen;
 
-import static 
org.apache.nifi.processor.util.listen.ListenerProperties.NETWORK_INTF_NAME;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
@@ -36,7 +34,6 @@ import org.apache.nifi.processor.util.listen.event.Event;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.NetworkInterface;
-import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -47,6 +44,8 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
+import static 
org.apache.nifi.processor.util.listen.ListenerProperties.NETWORK_INTF_NAME;
+
 /**
  * An abstract processor to extend from when listening for events over a 
channel. This processor
  * will start a ChannelDispatcher, and optionally a ChannelResponseDispatcher, 
in a background
@@ -229,21 +228,6 @@ public abstract class AbstractListenEventProcessor<E 
extends Event> extends Abst
     }
 
     /**
-     * Creates a pool of ByteBuffers with the given size.
-     *
-     * @param poolSize the number of buffers to initialize the pool with
-     * @param bufferSize the size of each buffer
-     * @return a blocking queue with size equal to poolSize and each buffer 
equal to bufferSize
-     */
-    protected BlockingQueue<ByteBuffer> createBufferPool(final int poolSize, 
final int bufferSize) {
-        final LinkedBlockingQueue<ByteBuffer> bufferPool = new 
LinkedBlockingQueue<>(poolSize);
-        for (int i = 0; i < poolSize; i++) {
-            bufferPool.offer(ByteBuffer.allocate(bufferSize));
-        }
-        return bufferPool;
-    }
-
-    /**
      * If pollErrorQueue is true, the error queue will be checked first and 
event will be
      * returned from the error queue if available.
      *
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/ByteBufferFactory.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/ByteBufferFactory.java
new file mode 100644
index 0000000..86a035a
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/ByteBufferFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.listen.dispatcher;
+
+import java.nio.ByteBuffer;
+
+public class ByteBufferFactory implements ByteBufferSource {
+    private final int bufferSize;
+
+    public ByteBufferFactory(final int bufferSize) {
+        this.bufferSize = bufferSize;
+    }
+
+    @Override
+    public ByteBuffer acquire() {
+        return ByteBuffer.allocate(bufferSize);
+    }
+
+    @Override
+    public void release(final ByteBuffer byteBuffer) {
+        // nothing to do
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/ByteBufferPool.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/ByteBufferPool.java
new file mode 100644
index 0000000..42f476f
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/ByteBufferPool.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.listen.dispatcher;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class ByteBufferPool implements ByteBufferSource {
+
+    private final BlockingQueue<ByteBuffer> pool;
+
+    public ByteBufferPool(final int poolSize, final int bufferSize) {
+        if (poolSize <= 0) {
+            throw new IllegalArgumentException("A pool of available 
ByteBuffers is required");
+        }
+
+        this.pool = new LinkedBlockingQueue<>(poolSize);
+
+        for (int i = 0; i < poolSize; i++) {
+            pool.offer(ByteBuffer.allocate(bufferSize));
+        }
+    }
+
+    @Override
+    public ByteBuffer acquire() {
+        final ByteBuffer buffer = pool.poll();
+        buffer.clear();
+        buffer.mark();
+        return buffer;
+    }
+
+    @Override
+    public void release(final ByteBuffer byteBuffer) {
+        try {
+            pool.put(byteBuffer);
+        } catch (final InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/ByteBufferSource.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/ByteBufferSource.java
new file mode 100644
index 0000000..002827d
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/ByteBufferSource.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.listen.dispatcher;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Manages byte buffers for the dispatchers.
+ */
+public interface ByteBufferSource {
+
+    /**
+     * @return Returns a buffer for usage. The buffer can be pooled or created 
on demand depending on the implementation.
+     * If the source is not capable to provide an instance, it returns {@code 
null} instead.
+     */
+    ByteBuffer acquire();
+
+    /**
+     * With calling this method the client releases the buffer. It might be 
reused by the handler and not to be used
+     * by this client any more.
+     *
+     * @param byteBuffer The byte buffer the client acquired previously.
+     */
+    void release(ByteBuffer byteBuffer);
+}
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/DatagramChannelDispatcher.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/DatagramChannelDispatcher.java
index 69a1998..cd721e6 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/DatagramChannelDispatcher.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/DatagramChannelDispatcher.java
@@ -43,7 +43,7 @@ import java.util.concurrent.BlockingQueue;
 public class DatagramChannelDispatcher<E extends Event<DatagramChannel>> 
implements ChannelDispatcher {
 
     private final EventFactory<E> eventFactory;
-    private final BlockingQueue<ByteBuffer> bufferPool;
+    private final ByteBufferSource bufferSource;
     private final EventQueue<E> events;
     private final ComponentLog logger;
     private final String sendingHost;
@@ -54,28 +54,24 @@ public class DatagramChannelDispatcher<E extends 
Event<DatagramChannel>> impleme
     private volatile boolean stopped = false;
 
     public DatagramChannelDispatcher(final EventFactory<E> eventFactory,
-                                     final BlockingQueue<ByteBuffer> 
bufferPool,
+                                     final ByteBufferSource bufferSource,
                                      final BlockingQueue<E> events,
                                      final ComponentLog logger) {
-        this(eventFactory, bufferPool, events, logger, null, null);
+        this(eventFactory, bufferSource, events, logger, null, null);
     }
 
     public DatagramChannelDispatcher(final EventFactory<E> eventFactory,
-                                     final BlockingQueue<ByteBuffer> 
bufferPool,
+                                     final ByteBufferSource bufferSource,
                                      final BlockingQueue<E> events,
                                      final ComponentLog logger,
                                      final String sendingHost,
                                      final Integer sendingPort) {
         this.eventFactory = eventFactory;
-        this.bufferPool = bufferPool;
+        this.bufferSource = bufferSource;
         this.logger = logger;
         this.sendingHost = sendingHost;
         this.sendingPort = sendingPort;
         this.events = new EventQueue<>(events, logger);
-
-        if (bufferPool == null || bufferPool.size() == 0) {
-            throw new IllegalArgumentException("A pool of available 
ByteBuffers is required");
-        }
     }
 
     @Override
@@ -110,7 +106,7 @@ public class DatagramChannelDispatcher<E extends 
Event<DatagramChannel>> impleme
 
     @Override
     public void run() {
-        final ByteBuffer buffer = bufferPool.poll();
+        final ByteBuffer buffer = bufferSource.acquire();
         while (!stopped) {
             try {
                 int selected = selector.select();
@@ -155,11 +151,7 @@ public class DatagramChannelDispatcher<E extends 
Event<DatagramChannel>> impleme
         }
 
         if (buffer != null) {
-            try {
-                bufferPool.put(buffer);
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-            }
+            bufferSource.release(buffer);
         }
     }
 
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java
index d0be256..41e1a70 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java
@@ -16,6 +16,17 @@
  */
 package org.apache.nifi.processor.util.listen.dispatcher;
 
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.listen.event.Event;
+import org.apache.nifi.processor.util.listen.event.EventFactory;
+import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory;
+import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
+import org.apache.nifi.security.util.ClientAuth;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
@@ -29,20 +40,10 @@ import java.nio.channels.SocketChannel;
 import java.nio.charset.Charset;
 import java.util.Iterator;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
-import org.apache.commons.io.IOUtils;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.util.listen.event.Event;
-import org.apache.nifi.processor.util.listen.event.EventFactory;
-import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory;
-import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
-import org.apache.nifi.security.util.ClientAuth;
 
 /**
  * Accepts Socket connections on the given port and creates a handler for each 
connection to
@@ -52,15 +53,16 @@ public class SocketChannelDispatcher<E extends 
Event<SocketChannel>> implements
 
     private final EventFactory<E> eventFactory;
     private final ChannelHandlerFactory<E, AsyncChannelDispatcher> 
handlerFactory;
-    private final BlockingQueue<ByteBuffer> bufferPool;
+    private final ByteBufferSource bufferSource;
     private final BlockingQueue<E> events;
     private final ComponentLog logger;
     private final int maxConnections;
+    private final int maxThreadPoolSize;
     private final SSLContext sslContext;
     private final ClientAuth clientAuth;
     private final Charset charset;
 
-    private ExecutorService executor;
+    private ThreadPoolExecutor executor;
     private volatile boolean stopped = false;
     private Selector selector;
     private final BlockingQueue<SelectionKey> keyQueue;
@@ -68,45 +70,63 @@ public class SocketChannelDispatcher<E extends 
Event<SocketChannel>> implements
 
     public SocketChannelDispatcher(final EventFactory<E> eventFactory,
                                    final ChannelHandlerFactory<E, 
AsyncChannelDispatcher> handlerFactory,
-                                   final BlockingQueue<ByteBuffer> bufferPool,
+                                   final ByteBufferSource bufferSource,
+                                   final BlockingQueue<E> events,
+                                   final ComponentLog logger,
+                                   final int maxConnections,
+                                   final SSLContext sslContext,
+                                   final Charset charset) {
+        this(eventFactory, handlerFactory, bufferSource, events, logger, 
maxConnections, sslContext, ClientAuth.REQUIRED, charset);
+    }
+
+    public SocketChannelDispatcher(final EventFactory<E> eventFactory,
+                                   final ChannelHandlerFactory<E, 
AsyncChannelDispatcher> handlerFactory,
+                                   final ByteBufferSource bufferSource,
                                    final BlockingQueue<E> events,
                                    final ComponentLog logger,
                                    final int maxConnections,
                                    final SSLContext sslContext,
+                                   final ClientAuth clientAuth,
                                    final Charset charset) {
-        this(eventFactory, handlerFactory, bufferPool, events, logger, 
maxConnections, sslContext, ClientAuth.REQUIRED, charset);
+        this(eventFactory, handlerFactory, bufferSource, events, logger, 
maxConnections, maxConnections, sslContext, clientAuth, charset);
     }
 
     public SocketChannelDispatcher(final EventFactory<E> eventFactory,
                                    final ChannelHandlerFactory<E, 
AsyncChannelDispatcher> handlerFactory,
-                                   final BlockingQueue<ByteBuffer> bufferPool,
+                                   final ByteBufferSource bufferSource,
                                    final BlockingQueue<E> events,
                                    final ComponentLog logger,
                                    final int maxConnections,
+                                   final int maxThreadPoolSize,
                                    final SSLContext sslContext,
                                    final ClientAuth clientAuth,
                                    final Charset charset) {
         this.eventFactory = eventFactory;
         this.handlerFactory = handlerFactory;
-        this.bufferPool = bufferPool;
+        this.bufferSource = bufferSource;
         this.events = events;
         this.logger = logger;
         this.maxConnections = maxConnections;
+        this.maxThreadPoolSize = maxThreadPoolSize;
         this.keyQueue = new LinkedBlockingQueue<>(maxConnections);
         this.sslContext = sslContext;
         this.clientAuth = clientAuth;
         this.charset = charset;
-
-        if (bufferPool == null || bufferPool.size() == 0 || bufferPool.size() 
!= maxConnections) {
-            throw new IllegalArgumentException(
-                    "A pool of available ByteBuffers equal to the maximum 
number of connections is required");
-        }
     }
 
     @Override
     public void open(final InetAddress nicAddress, final int port, final int 
maxBufferSize) throws IOException {
+        final InetSocketAddress inetSocketAddress = new 
InetSocketAddress(nicAddress, port);
+
         stopped = false;
-        executor = Executors.newFixedThreadPool(maxConnections);
+        executor = new ThreadPoolExecutor(
+                maxThreadPoolSize,
+                maxThreadPoolSize,
+                60L,
+                TimeUnit.SECONDS,
+                new LinkedBlockingQueue<>(),
+                new 
BasicThreadFactory.Builder().namingPattern(inetSocketAddress.toString() + 
"-worker-%d").build());
+        executor.allowCoreThreadTimeOut(true);
 
         final ServerSocketChannel serverSocketChannel = 
ServerSocketChannel.open();
         serverSocketChannel.configureBlocking(false);
@@ -120,7 +140,7 @@ public class SocketChannelDispatcher<E extends 
Event<SocketChannel>> implements
             }
         }
 
-        serverSocketChannel.socket().bind(new InetSocketAddress(nicAddress, 
port));
+        serverSocketChannel.socket().bind(inetSocketAddress);
 
         selector = Selector.open();
         serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
@@ -161,9 +181,7 @@ public class SocketChannelDispatcher<E extends 
Event<SocketChannel>> implements
                             SelectionKey readKey = 
socketChannel.register(selector, SelectionKey.OP_READ);
 
                             // Prepare the byte buffer for the reads, clear it 
out
-                            ByteBuffer buffer = bufferPool.poll();
-                            buffer.clear();
-                            buffer.mark();
+                            ByteBuffer buffer = bufferSource.acquire();
 
                             // If we have an SSLContext then create an 
SSLEngine for the channel
                             SSLSocketChannel sslSocketChannel = null;
@@ -265,13 +283,9 @@ public class SocketChannelDispatcher<E extends 
Event<SocketChannel>> implements
 
     @Override
     public void completeConnection(SelectionKey key) {
-        // connection is done. Return the buffer to the pool
-        SocketChannelAttachment attachment = (SocketChannelAttachment) 
key.attachment();
-        try {
-            bufferPool.put(attachment.getByteBuffer());
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-        }
+        // connection is done. Releasing buffer
+        final SocketChannelAttachment attachment = (SocketChannelAttachment) 
key.attachment();
+        bufferSource.release(attachment.getByteBuffer());
         currentConnections.decrementAndGet();
     }
 
diff --git 
a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/ListenLumberjack.java
 
b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/ListenLumberjack.java
index 7ff65ee..9ce0a76 100644
--- 
a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/ListenLumberjack.java
+++ 
b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/ListenLumberjack.java
@@ -17,17 +17,6 @@
 package org.apache.nifi.processors.lumberjack;
 
 import com.google.gson.Gson;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import javax.net.ssl.SSLContext;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
@@ -45,6 +34,8 @@ import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import 
org.apache.nifi.processor.util.listen.AbstractListenEventBatchingProcessor;
 import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
+import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferPool;
+import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferSource;
 import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
 import 
org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher;
 import org.apache.nifi.processor.util.listen.event.EventFactory;
@@ -61,6 +52,17 @@ import org.apache.nifi.security.util.ClientAuth;
 import org.apache.nifi.ssl.RestrictedSSLContextService;
 import org.apache.nifi.ssl.SSLContextService;
 
+import javax.net.ssl.SSLContext;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+
 @Deprecated
 @InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
 @Tags({"listen", "lumberjack", "tcp", "logs"})
@@ -135,7 +137,7 @@ public class ListenLumberjack extends 
AbstractListenEventBatchingProcessor<Lumbe
         final Charset charSet = 
Charset.forName(context.getProperty(CHARSET).getValue());
 
         // initialize the buffer pool based on max number of connections and 
the buffer size
-        final BlockingQueue<ByteBuffer> bufferPool = 
createBufferPool(maxConnections, bufferSize);
+        final ByteBufferSource byteBufferSource = new 
ByteBufferPool(maxConnections, bufferSize);
 
         // if an SSLContextService was provided then create an SSLContext to 
pass down to the dispatcher
         SSLContext sslContext = null;
@@ -145,7 +147,7 @@ public class ListenLumberjack extends 
AbstractListenEventBatchingProcessor<Lumbe
         }
 
         // if we decide to support SSL then get the context and pass it in here
-        return new SocketChannelDispatcher<>(eventFactory, handlerFactory, 
bufferPool, events,
+        return new SocketChannelDispatcher<>(eventFactory, handlerFactory, 
byteBufferSource, events,
             getLogger(), maxConnections, sslContext, charSet);
     }
 
diff --git 
a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/handler/ITLumberjackSocketChannelHandler.java
 
b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/handler/ITLumberjackSocketChannelHandler.java
index 0ef1e41..45e74b6 100644
--- 
a/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/handler/ITLumberjackSocketChannelHandler.java
+++ 
b/nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/test/java/org/apache/nifi/processors/lumberjack/handler/ITLumberjackSocketChannelHandler.java
@@ -16,21 +16,10 @@
  */
 package org.apache.nifi.processors.lumberjack.handler;
 
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import javax.net.ssl.SSLContext;
-import javax.xml.bind.DatatypeConverter;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
+import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferPool;
+import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferSource;
 import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
 import 
org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher;
 import org.apache.nifi.processor.util.listen.event.Event;
@@ -43,11 +32,25 @@ import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import javax.net.ssl.SSLContext;
+import javax.xml.bind.DatatypeConverter;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
 @SuppressWarnings("deprecation")
 public class ITLumberjackSocketChannelHandler {
     private EventFactory<TestEvent> eventFactory;
     private ChannelHandlerFactory<TestEvent,AsyncChannelDispatcher> 
channelHandlerFactory;
-    private BlockingQueue<ByteBuffer> byteBuffers;
+    private ByteBufferSource byteBufferSource;
     private BlockingQueue<TestEvent> events;
     private ComponentLog logger = Mockito.mock(ComponentLog.class);
     private int maxConnections;
@@ -60,8 +63,7 @@ public class ITLumberjackSocketChannelHandler {
         eventFactory = new TestEventHolderFactory();
         channelHandlerFactory = new LumberjackSocketChannelHandlerFactory<>();
 
-        byteBuffers = new LinkedBlockingQueue<>();
-        byteBuffers.add(ByteBuffer.allocate(4096));
+        byteBufferSource = new ByteBufferPool(1, 4096);
 
         events = new LinkedBlockingQueue<>();
         logger = Mockito.mock(ComponentLog.class);
@@ -70,7 +72,7 @@ public class ITLumberjackSocketChannelHandler {
         sslContext = null;
         charset = StandardCharsets.UTF_8;
 
-        dispatcher = new SocketChannelDispatcher<>(eventFactory, 
channelHandlerFactory, byteBuffers, events, logger,
+        dispatcher = new SocketChannelDispatcher<>(eventFactory, 
channelHandlerFactory, byteBufferSource, events, logger,
                 maxConnections, sslContext, charset);
 
     }
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java
index 09a68eb..53477a6 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java
@@ -16,17 +16,6 @@
  */
 package org.apache.nifi.processors.standard;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import javax.net.ssl.SSLContext;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
@@ -45,6 +34,8 @@ import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import 
org.apache.nifi.processor.util.listen.AbstractListenEventBatchingProcessor;
 import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
+import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferPool;
+import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferSource;
 import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
 import 
org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher;
 import org.apache.nifi.processor.util.listen.event.EventFactory;
@@ -61,6 +52,17 @@ import org.apache.nifi.security.util.ClientAuth;
 import org.apache.nifi.ssl.RestrictedSSLContextService;
 import org.apache.nifi.ssl.SSLContextService;
 
+import javax.net.ssl.SSLContext;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+
 @InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
 @Tags({"listen", "relp", "tcp", "logs"})
 @CapabilityDescription("Listens for RELP messages being sent to a given port 
over TCP. Each message will be " +
@@ -135,7 +137,7 @@ public class ListenRELP extends 
AbstractListenEventBatchingProcessor<RELPEvent>
         final Charset charSet = 
Charset.forName(context.getProperty(CHARSET).getValue());
 
         // initialize the buffer pool based on max number of connections and 
the buffer size
-        final BlockingQueue<ByteBuffer> bufferPool = 
createBufferPool(maxConnections, bufferSize);
+        final ByteBufferSource byteBufferSource = new 
ByteBufferPool(maxConnections, bufferSize);
 
         // if an SSLContextService was provided then create an SSLContext to 
pass down to the dispatcher
         SSLContext sslContext = null;
@@ -150,7 +152,7 @@ public class ListenRELP extends 
AbstractListenEventBatchingProcessor<RELPEvent>
         }
 
         // if we decide to support SSL then get the context and pass it in here
-        return new SocketChannelDispatcher<>(eventFactory, handlerFactory, 
bufferPool, events,
+        return new SocketChannelDispatcher<>(eventFactory, handlerFactory, 
byteBufferSource, events,
                 getLogger(), maxConnections, sslContext, clientAuth, charSet);
     }
 
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
index 77a9a28..4cc7b33 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java
@@ -16,28 +16,6 @@
  */
 package org.apache.nifi.processors.standard;
 
-import static 
org.apache.nifi.processor.util.listen.ListenerProperties.NETWORK_INTF_NAME;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.InetAddress;
-import java.net.NetworkInterface;
-import java.nio.ByteBuffer;
-import java.nio.channels.SelectableChannel;
-import java.nio.channels.SocketChannel;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import javax.net.ssl.SSLContext;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
@@ -62,6 +40,8 @@ import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
+import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferPool;
+import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferSource;
 import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
 import 
org.apache.nifi.processor.util.listen.dispatcher.DatagramChannelDispatcher;
 import 
org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher;
@@ -77,6 +57,28 @@ import org.apache.nifi.syslog.attributes.SyslogAttributes;
 import org.apache.nifi.syslog.events.SyslogEvent;
 import org.apache.nifi.syslog.parsers.SyslogParser;
 
+import javax.net.ssl.SSLContext;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SocketChannel;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.nifi.processor.util.listen.ListenerProperties.NETWORK_INTF_NAME;
+
 @SupportsBatching
 @InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
 @Tags({"syslog", "listen", "udp", "tcp", "logs"})
@@ -202,7 +204,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
 
     private volatile ChannelDispatcher channelDispatcher;
     private volatile SyslogParser parser;
-    private volatile BlockingQueue<ByteBuffer> bufferPool;
+    private volatile ByteBufferSource byteBufferSource;
     private volatile BlockingQueue<RawSyslogEvent> syslogEvents;
     private final BlockingQueue<RawSyslogEvent> errorEvents = new 
LinkedBlockingQueue<>();
     private volatile byte[] messageDemarcatorBytes; //it is only the array 
reference that is volatile - not the contents.
@@ -303,11 +305,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
             maxConnections = 
context.getProperty(MAX_CONNECTIONS).asLong().intValue();
         }
 
-        bufferPool = new LinkedBlockingQueue<>(maxConnections);
-        for (int i = 0; i < maxConnections; i++) {
-            bufferPool.offer(ByteBuffer.allocate(bufferSize));
-        }
-
+        byteBufferSource = new ByteBufferPool(maxConnections, bufferSize);
         parser = new SyslogParser(Charset.forName(charSet));
         syslogEvents = new LinkedBlockingQueue<>(maxMessageQueueSize);
 
@@ -319,7 +317,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
 
         // create either a UDP or TCP reader and call open() to bind to the 
given port
         final SSLContextService sslContextService = 
context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
-        channelDispatcher = createChannelReader(context, protocol, bufferPool, 
syslogEvents, maxConnections, sslContextService, Charset.forName(charSet));
+        channelDispatcher = createChannelReader(context, protocol, 
byteBufferSource, syslogEvents, maxConnections, sslContextService, 
Charset.forName(charSet));
         channelDispatcher.open(nicIPAddress, port, maxChannelBufferSize);
 
         final Thread readerThread = new Thread(channelDispatcher);
@@ -334,14 +332,14 @@ public class ListenSyslog extends AbstractSyslogProcessor 
{
     }
 
     // visible for testing to be overridden and provide a mock 
ChannelDispatcher if desired
-    protected ChannelDispatcher createChannelReader(final ProcessContext 
context, final String protocol, final BlockingQueue<ByteBuffer> bufferPool,
+    protected ChannelDispatcher createChannelReader(final ProcessContext 
context, final String protocol, final ByteBufferSource byteBufferSource,
                                                     final 
BlockingQueue<RawSyslogEvent> events, final int maxConnections,
                                                     final SSLContextService 
sslContextService, final Charset charset) throws IOException {
 
         final EventFactory<RawSyslogEvent> eventFactory = new 
RawSyslogEventFactory();
 
         if (UDP_VALUE.getValue().equals(protocol)) {
-            return new DatagramChannelDispatcher(eventFactory, bufferPool, 
events, getLogger());
+            return new DatagramChannelDispatcher(eventFactory, 
byteBufferSource, events, getLogger());
         } else {
             // if an SSLContextService was provided then create an SSLContext 
to pass down to the dispatcher
             SSLContext sslContext = null;
@@ -354,7 +352,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
             }
 
             final ChannelHandlerFactory<RawSyslogEvent<SocketChannel>, 
AsyncChannelDispatcher> handlerFactory = new SocketChannelHandlerFactory<>();
-            return new SocketChannelDispatcher(eventFactory, handlerFactory, 
bufferPool, events, getLogger(), maxConnections, sslContext, clientAuth, 
charset);
+            return new SocketChannelDispatcher(eventFactory, handlerFactory, 
byteBufferSource, events, getLogger(), maxConnections, sslContext, clientAuth, 
charset);
         }
     }
 
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
index 8359221..7aac899 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
@@ -16,18 +16,6 @@
  */
 package org.apache.nifi.processors.standard;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import javax.net.ssl.SSLContext;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
@@ -40,8 +28,12 @@ import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
 import 
org.apache.nifi.processor.util.listen.AbstractListenEventBatchingProcessor;
 import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
+import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferFactory;
+import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferPool;
+import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferSource;
 import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
 import 
org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher;
 import org.apache.nifi.processor.util.listen.event.EventFactory;
@@ -53,6 +45,18 @@ import org.apache.nifi.security.util.ClientAuth;
 import org.apache.nifi.ssl.RestrictedSSLContextService;
 import org.apache.nifi.ssl.SSLContextService;
 
+import javax.net.ssl.SSLContext;
+import java.io.IOException;
+import java.nio.channels.SocketChannel;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+
 @SupportsBatching
 @InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
 @Tags({"listen", "tcp", "tls", "ssl"})
@@ -83,10 +87,37 @@ public class ListenTCP extends 
AbstractListenEventBatchingProcessor<StandardEven
             .defaultValue(ClientAuth.REQUIRED.name())
             .build();
 
+    public static final PropertyDescriptor MAX_RECV_THREAD_POOL_SIZE = new 
PropertyDescriptor.Builder()
+            .name("max-receiving-threads")
+            .displayName("Max Number of Receiving Message Handler Threads")
+            .description(
+                    "The maximum number of threads might be available for 
handling receiving messages ready all the time. " +
+                    "Cannot be bigger than the \"Max Number of TCP 
Connections\". " +
+                    "If not set, the value of \"Max Number of TCP 
Connections\" will be used.")
+            .addValidator(StandardValidators.createLongValidator(1, 65535, 
true))
+            .required(false)
+            .build();
+
+    protected static final PropertyDescriptor POOL_RECV_BUFFERS = new 
PropertyDescriptor.Builder()
+            .name("pool-receive-buffers")
+            .displayName("Pool Receive Buffers")
+            .description(
+                    "When turned on, the processor uses pre-populated pool of 
buffers when receiving messages. " +
+                    "This is prepared during initialisation of the processor. 
" +
+                    "With high value of Max Number of TCP Connections and 
Receive Buffer Size this strategy might allocate significant amount of memory! 
" +
+                    "When turned off, the byte buffers will be created on 
demand and be destroyed after use.")
+            .required(true)
+            .defaultValue("True")
+            .allowableValues("True", "False")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
     @Override
     protected List<PropertyDescriptor> getAdditionalProperties() {
         return Arrays.asList(
                 MAX_CONNECTIONS,
+                MAX_RECV_THREAD_POOL_SIZE,
+                POOL_RECV_BUFFERS,
                 SSL_CONTEXT_SERVICE,
                 CLIENT_AUTH
         );
@@ -105,6 +136,20 @@ public class ListenTCP extends 
AbstractListenEventBatchingProcessor<StandardEven
                     .valid(false).subject("Client Auth").build());
         }
 
+        final int maxConnections = 
validationContext.getProperty(MAX_CONNECTIONS).asInteger();
+
+        if (validationContext.getProperty(MAX_RECV_THREAD_POOL_SIZE).isSet()) {
+            final int maxPoolSize = 
validationContext.getProperty(MAX_RECV_THREAD_POOL_SIZE).asInteger();
+
+            if (maxPoolSize > maxConnections) {
+                results.add(new ValidationResult.Builder()
+                        .explanation("\"" + 
MAX_RECV_THREAD_POOL_SIZE.getDisplayName() + "\" cannot be bigger than \"" + 
MAX_CONNECTIONS.getDisplayName() + "\"")
+                        .valid(false)
+                        .subject(MAX_RECV_THREAD_POOL_SIZE.getDisplayName())
+                        .build());
+            }
+        }
+
         return results;
     }
 
@@ -113,11 +158,17 @@ public class ListenTCP extends 
AbstractListenEventBatchingProcessor<StandardEven
             throws IOException {
 
         final int maxConnections = 
context.getProperty(MAX_CONNECTIONS).asInteger();
+        final int maxThreadPoolSize = 
context.getProperty(MAX_RECV_THREAD_POOL_SIZE).isSet()
+                ? context.getProperty(MAX_RECV_THREAD_POOL_SIZE).asInteger()
+                : maxConnections;
+
         final int bufferSize = 
context.getProperty(RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
         final Charset charSet = 
Charset.forName(context.getProperty(CHARSET).getValue());
 
         // initialize the buffer pool based on max number of connections and 
the buffer size
-        final BlockingQueue<ByteBuffer> bufferPool = 
createBufferPool(maxConnections, bufferSize);
+        final ByteBufferSource byteBufferSource = 
context.getProperty(POOL_RECV_BUFFERS).asBoolean()
+                ? new ByteBufferPool(maxConnections, bufferSize)
+                : new ByteBufferFactory(bufferSize);
 
         // if an SSLContextService was provided then create an SSLContext to 
pass down to the dispatcher
         SSLContext sslContext = null;
@@ -132,7 +183,8 @@ public class ListenTCP extends 
AbstractListenEventBatchingProcessor<StandardEven
 
         final EventFactory<StandardEvent> eventFactory = new 
StandardEventFactory();
         final ChannelHandlerFactory<StandardEvent<SocketChannel>, 
AsyncChannelDispatcher> handlerFactory = new SocketChannelHandlerFactory<>();
-        return new SocketChannelDispatcher(eventFactory, handlerFactory, 
bufferPool, events, getLogger(), maxConnections, sslContext, clientAuth, 
charSet);
+        return new SocketChannelDispatcher(eventFactory, handlerFactory, 
byteBufferSource, events, getLogger(), maxConnections,
+                maxThreadPoolSize, sslContext, clientAuth, charSet);
     }
 
     @Override
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDP.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDP.java
index c8ffecb..76795f0 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDP.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDP.java
@@ -33,6 +33,8 @@ import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.util.StandardValidators;
 import 
org.apache.nifi.processor.util.listen.AbstractListenEventBatchingProcessor;
+import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferPool;
+import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferSource;
 import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
 import 
org.apache.nifi.processor.util.listen.dispatcher.DatagramChannelDispatcher;
 import org.apache.nifi.processor.util.listen.event.EventFactory;
@@ -42,7 +44,6 @@ import 
org.apache.nifi.processor.util.listen.event.StandardEventFactory;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -124,9 +125,9 @@ public class ListenUDP extends 
AbstractListenEventBatchingProcessor<StandardEven
         final String sendingHost = 
context.getProperty(SENDING_HOST).evaluateAttributeExpressions().getValue();
         final Integer sendingHostPort = 
context.getProperty(SENDING_HOST_PORT).evaluateAttributeExpressions().asInteger();
         final Integer bufferSize = 
context.getProperty(RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
-        final BlockingQueue<ByteBuffer> bufferPool = 
createBufferPool(context.getMaxConcurrentTasks(), bufferSize);
+        final ByteBufferSource byteBufferSource = new 
ByteBufferPool(context.getMaxConcurrentTasks(), bufferSize);
         final EventFactory<StandardEvent> eventFactory = new 
StandardEventFactory();
-        return new DatagramChannelDispatcher<>(eventFactory, bufferPool, 
events, getLogger(), sendingHost, sendingHostPort);
+        return new DatagramChannelDispatcher<>(eventFactory, byteBufferSource, 
events, getLogger(), sendingHost, sendingHostPort);
     }
 
     @Override
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDPRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDPRecord.java
index 6a723ea..2a20d5d 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDPRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDPRecord.java
@@ -39,6 +39,8 @@ import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processor.util.listen.AbstractListenEventProcessor;
+import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferPool;
+import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferSource;
 import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
 import 
org.apache.nifi.processor.util.listen.dispatcher.DatagramChannelDispatcher;
 import org.apache.nifi.processor.util.listen.event.EventFactory;
@@ -58,7 +60,6 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -214,9 +215,9 @@ public class ListenUDPRecord extends 
AbstractListenEventProcessor<StandardEvent>
         final String sendingHost = 
context.getProperty(SENDING_HOST).evaluateAttributeExpressions().getValue();
         final Integer sendingHostPort = 
context.getProperty(SENDING_HOST_PORT).evaluateAttributeExpressions().asInteger();
         final Integer bufferSize = 
context.getProperty(RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
-        final BlockingQueue<ByteBuffer> bufferPool = 
createBufferPool(context.getMaxConcurrentTasks(), bufferSize);
+        final ByteBufferSource byteBufferSource = new 
ByteBufferPool(context.getMaxConcurrentTasks(), bufferSize);
         final EventFactory<StandardEvent> eventFactory = new 
StandardEventFactory();
-        return new DatagramChannelDispatcher<>(eventFactory, bufferPool, 
events, getLogger(), sendingHost, sendingHostPort);
+        return new DatagramChannelDispatcher<>(eventFactory, byteBufferSource, 
events, getLogger(), sendingHost, sendingHostPort);
     }
 
     @Override
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/handler/TestRELPSocketChannelHandler.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/handler/TestRELPSocketChannelHandler.java
index 5bfaca7..d7b7d5c 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/handler/TestRELPSocketChannelHandler.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/handler/TestRELPSocketChannelHandler.java
@@ -19,6 +19,8 @@ package org.apache.nifi.processors.standard.relp.handler;
 
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
+import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferPool;
+import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferSource;
 import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
 import 
org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher;
 import org.apache.nifi.processor.util.listen.event.Event;
@@ -48,7 +50,7 @@ public class TestRELPSocketChannelHandler {
 
     private EventFactory<TestEvent> eventFactory;
     private ChannelHandlerFactory<TestEvent,AsyncChannelDispatcher> 
channelHandlerFactory;
-    private BlockingQueue<ByteBuffer> byteBuffers;
+    private ByteBufferSource byteBufferSource;
     private BlockingQueue<TestEvent> events;
     private ComponentLog logger = Mockito.mock(ComponentLog.class);
     private int maxConnections;
@@ -61,8 +63,7 @@ public class TestRELPSocketChannelHandler {
         eventFactory = new TestEventHolderFactory();
         channelHandlerFactory = new RELPSocketChannelHandlerFactory<>();
 
-        byteBuffers = new LinkedBlockingQueue<>();
-        byteBuffers.add(ByteBuffer.allocate(4096));
+        byteBufferSource = new ByteBufferPool(1, 4096);
 
         events = new LinkedBlockingQueue<>();
         logger = Mockito.mock(ComponentLog.class);
@@ -71,7 +72,7 @@ public class TestRELPSocketChannelHandler {
         sslContext = null;
         charset = StandardCharsets.UTF_8;
 
-        dispatcher = new SocketChannelDispatcher<>(eventFactory, 
channelHandlerFactory, byteBuffers, events, logger,
+        dispatcher = new SocketChannelDispatcher<>(eventFactory, 
channelHandlerFactory, byteBufferSource, events, logger,
                 maxConnections, sslContext, charset);
 
     }

Reply via email to