turcsanyip commented on a change in pull request #4689:
URL: https://github.com/apache/nifi/pull/4689#discussion_r532445464



##########
File path: 
nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java
##########
@@ -52,61 +53,80 @@
 
     private final EventFactory<E> eventFactory;
     private final ChannelHandlerFactory<E, AsyncChannelDispatcher> 
handlerFactory;
-    private final BlockingQueue<ByteBuffer> bufferPool;
+    private final ByteBufferSource bufferSource;
     private final BlockingQueue<E> events;
     private final ComponentLog logger;
     private final int maxConnections;
+    private final int maxThreadPoolSize;
     private final SSLContext sslContext;
     private final ClientAuth clientAuth;
     private final Charset charset;
 
-    private ExecutorService executor;
+    private ThreadPoolExecutor executor;
     private volatile boolean stopped = false;
     private Selector selector;
     private final BlockingQueue<SelectionKey> keyQueue;
     private final AtomicInteger currentConnections = new AtomicInteger(0);
 
     public SocketChannelDispatcher(final EventFactory<E> eventFactory,
                                    final ChannelHandlerFactory<E, 
AsyncChannelDispatcher> handlerFactory,
-                                   final BlockingQueue<ByteBuffer> bufferPool,
+                                   final ByteBufferSource bufferSource,
+                                   final BlockingQueue<E> events,
+                                   final ComponentLog logger,
+                                   final int maxConnections,
+                                   final SSLContext sslContext,
+                                   final Charset charset) {
+        this(eventFactory, handlerFactory, bufferSource, events, logger, 
maxConnections, sslContext, ClientAuth.REQUIRED, charset);
+    }
+
+    public SocketChannelDispatcher(final EventFactory<E> eventFactory,
+                                   final ChannelHandlerFactory<E, 
AsyncChannelDispatcher> handlerFactory,
+                                   final ByteBufferSource bufferSource,
                                    final BlockingQueue<E> events,
                                    final ComponentLog logger,
                                    final int maxConnections,
                                    final SSLContext sslContext,
+                                   final ClientAuth clientAuth,
                                    final Charset charset) {
-        this(eventFactory, handlerFactory, bufferPool, events, logger, 
maxConnections, sslContext, ClientAuth.REQUIRED, charset);
+        this(eventFactory, handlerFactory, bufferSource, events, logger, 
maxConnections, maxConnections, sslContext, clientAuth, charset);
     }
 
     public SocketChannelDispatcher(final EventFactory<E> eventFactory,
                                    final ChannelHandlerFactory<E, 
AsyncChannelDispatcher> handlerFactory,
-                                   final BlockingQueue<ByteBuffer> bufferPool,
+                                   final ByteBufferSource bufferSource,
                                    final BlockingQueue<E> events,
                                    final ComponentLog logger,
                                    final int maxConnections,
+                                   final int maxThreadPoolSize,
                                    final SSLContext sslContext,
                                    final ClientAuth clientAuth,
                                    final Charset charset) {
         this.eventFactory = eventFactory;
         this.handlerFactory = handlerFactory;
-        this.bufferPool = bufferPool;
+        this.bufferSource = bufferSource;
         this.events = events;
         this.logger = logger;
         this.maxConnections = maxConnections;
+        this.maxThreadPoolSize = maxThreadPoolSize;
         this.keyQueue = new LinkedBlockingQueue<>(maxConnections);
         this.sslContext = sslContext;
         this.clientAuth = clientAuth;
         this.charset = charset;
-
-        if (bufferPool == null || bufferPool.size() == 0 || bufferPool.size() 
!= maxConnections) {
-            throw new IllegalArgumentException(
-                    "A pool of available ByteBuffers equal to the maximum 
number of connections is required");
-        }
     }
 
     @Override
     public void open(final InetAddress nicAddress, final int port, final int 
maxBufferSize) throws IOException {
+        final InetSocketAddress inetSocketAddress = new 
InetSocketAddress(nicAddress, port);
+
         stopped = false;
-        executor = Executors.newFixedThreadPool(maxConnections);
+        executor = new ThreadPoolExecutor(
+                maxThreadPoolSize,
+                maxThreadPoolSize,
+                60L,
+                TimeUnit.SECONDS,
+                new LinkedBlockingQueue<>(),
+                new 
BasicThreadFactory.Builder().namingPattern(inetSocketAddress.toString() + 
"-dispatcher-%d").build());

Review comment:
       It is fine with me. Though the network address is not the best but much 
better than pool-x-thread-y.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to