simonbence commented on a change in pull request #4689:
URL: https://github.com/apache/nifi/pull/4689#discussion_r532059585



##########
File path: 
nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java
##########
@@ -52,61 +53,80 @@
 
     private final EventFactory<E> eventFactory;
     private final ChannelHandlerFactory<E, AsyncChannelDispatcher> 
handlerFactory;
-    private final BlockingQueue<ByteBuffer> bufferPool;
+    private final ByteBufferSource bufferSource;
     private final BlockingQueue<E> events;
     private final ComponentLog logger;
     private final int maxConnections;
+    private final int maxThreadPoolSize;
     private final SSLContext sslContext;
     private final ClientAuth clientAuth;
     private final Charset charset;
 
-    private ExecutorService executor;
+    private ThreadPoolExecutor executor;
     private volatile boolean stopped = false;
     private Selector selector;
     private final BlockingQueue<SelectionKey> keyQueue;
     private final AtomicInteger currentConnections = new AtomicInteger(0);
 
     public SocketChannelDispatcher(final EventFactory<E> eventFactory,
                                    final ChannelHandlerFactory<E, 
AsyncChannelDispatcher> handlerFactory,
-                                   final BlockingQueue<ByteBuffer> bufferPool,
+                                   final ByteBufferSource bufferSource,
+                                   final BlockingQueue<E> events,
+                                   final ComponentLog logger,
+                                   final int maxConnections,
+                                   final SSLContext sslContext,
+                                   final Charset charset) {
+        this(eventFactory, handlerFactory, bufferSource, events, logger, 
maxConnections, sslContext, ClientAuth.REQUIRED, charset);
+    }
+
+    public SocketChannelDispatcher(final EventFactory<E> eventFactory,
+                                   final ChannelHandlerFactory<E, 
AsyncChannelDispatcher> handlerFactory,
+                                   final ByteBufferSource bufferSource,
                                    final BlockingQueue<E> events,
                                    final ComponentLog logger,
                                    final int maxConnections,
                                    final SSLContext sslContext,
+                                   final ClientAuth clientAuth,
                                    final Charset charset) {
-        this(eventFactory, handlerFactory, bufferPool, events, logger, 
maxConnections, sslContext, ClientAuth.REQUIRED, charset);
+        this(eventFactory, handlerFactory, bufferSource, events, logger, 
maxConnections, maxConnections, sslContext, clientAuth, charset);
     }
 
     public SocketChannelDispatcher(final EventFactory<E> eventFactory,
                                    final ChannelHandlerFactory<E, 
AsyncChannelDispatcher> handlerFactory,
-                                   final BlockingQueue<ByteBuffer> bufferPool,
+                                   final ByteBufferSource bufferSource,
                                    final BlockingQueue<E> events,
                                    final ComponentLog logger,
                                    final int maxConnections,
+                                   final int maxThreadPoolSize,
                                    final SSLContext sslContext,
                                    final ClientAuth clientAuth,
                                    final Charset charset) {
         this.eventFactory = eventFactory;
         this.handlerFactory = handlerFactory;
-        this.bufferPool = bufferPool;
+        this.bufferSource = bufferSource;
         this.events = events;
         this.logger = logger;
         this.maxConnections = maxConnections;
+        this.maxThreadPoolSize = maxThreadPoolSize;
         this.keyQueue = new LinkedBlockingQueue<>(maxConnections);
         this.sslContext = sslContext;
         this.clientAuth = clientAuth;
         this.charset = charset;
-
-        if (bufferPool == null || bufferPool.size() == 0 || bufferPool.size() 
!= maxConnections) {
-            throw new IllegalArgumentException(
-                    "A pool of available ByteBuffers equal to the maximum 
number of connections is required");
-        }
     }
 
     @Override
     public void open(final InetAddress nicAddress, final int port, final int 
maxBufferSize) throws IOException {
+        final InetSocketAddress inetSocketAddress = new 
InetSocketAddress(nicAddress, port);
+
         stopped = false;
-        executor = Executors.newFixedThreadPool(maxConnections);
+        executor = new ThreadPoolExecutor(
+                maxThreadPoolSize,
+                maxThreadPoolSize,
+                60L,
+                TimeUnit.SECONDS,
+                new LinkedBlockingQueue<>(),
+                new 
BasicThreadFactory.Builder().namingPattern(inetSocketAddress.toString() + 
"-dispatcher-%d").build());

Review comment:
       Yes, the decision was made due to the processor attributes are not 
available here. Also, I decided to change the default value to make it easier 
to follow the threads in case multiple ListenTCP processors are in the flow. By 
adding the address, it is easy to identify which processor it is related to 
which adding the class does not help. I am aware of that this is not following 
the general convention but due to the possible multiplication of the pools it 
makes sense to me. Adding the class as prefix as well might work but it might 
be too long. For now I went with a common ground type of solution and replaced 
dispatcher postfix with worker. For the prefix I intend to keep it, expect you 
insist to change.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to