simonbence commented on a change in pull request #4689: URL: https://github.com/apache/nifi/pull/4689#discussion_r532059585
########## File path: nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java ########## @@ -52,61 +53,80 @@ private final EventFactory<E> eventFactory; private final ChannelHandlerFactory<E, AsyncChannelDispatcher> handlerFactory; - private final BlockingQueue<ByteBuffer> bufferPool; + private final ByteBufferSource bufferSource; private final BlockingQueue<E> events; private final ComponentLog logger; private final int maxConnections; + private final int maxThreadPoolSize; private final SSLContext sslContext; private final ClientAuth clientAuth; private final Charset charset; - private ExecutorService executor; + private ThreadPoolExecutor executor; private volatile boolean stopped = false; private Selector selector; private final BlockingQueue<SelectionKey> keyQueue; private final AtomicInteger currentConnections = new AtomicInteger(0); public SocketChannelDispatcher(final EventFactory<E> eventFactory, final ChannelHandlerFactory<E, AsyncChannelDispatcher> handlerFactory, - final BlockingQueue<ByteBuffer> bufferPool, + final ByteBufferSource bufferSource, + final BlockingQueue<E> events, + final ComponentLog logger, + final int maxConnections, + final SSLContext sslContext, + final Charset charset) { + this(eventFactory, handlerFactory, bufferSource, events, logger, maxConnections, sslContext, ClientAuth.REQUIRED, charset); + } + + public SocketChannelDispatcher(final EventFactory<E> eventFactory, + final ChannelHandlerFactory<E, AsyncChannelDispatcher> handlerFactory, + final ByteBufferSource bufferSource, final BlockingQueue<E> events, final ComponentLog logger, final int maxConnections, final SSLContext sslContext, + final ClientAuth clientAuth, final Charset charset) { - this(eventFactory, handlerFactory, bufferPool, events, logger, maxConnections, sslContext, ClientAuth.REQUIRED, charset); + this(eventFactory, handlerFactory, bufferSource, events, logger, maxConnections, maxConnections, sslContext, clientAuth, charset); } public SocketChannelDispatcher(final EventFactory<E> eventFactory, final ChannelHandlerFactory<E, AsyncChannelDispatcher> handlerFactory, - final BlockingQueue<ByteBuffer> bufferPool, + final ByteBufferSource bufferSource, final BlockingQueue<E> events, final ComponentLog logger, final int maxConnections, + final int maxThreadPoolSize, final SSLContext sslContext, final ClientAuth clientAuth, final Charset charset) { this.eventFactory = eventFactory; this.handlerFactory = handlerFactory; - this.bufferPool = bufferPool; + this.bufferSource = bufferSource; this.events = events; this.logger = logger; this.maxConnections = maxConnections; + this.maxThreadPoolSize = maxThreadPoolSize; this.keyQueue = new LinkedBlockingQueue<>(maxConnections); this.sslContext = sslContext; this.clientAuth = clientAuth; this.charset = charset; - - if (bufferPool == null || bufferPool.size() == 0 || bufferPool.size() != maxConnections) { - throw new IllegalArgumentException( - "A pool of available ByteBuffers equal to the maximum number of connections is required"); - } } @Override public void open(final InetAddress nicAddress, final int port, final int maxBufferSize) throws IOException { + final InetSocketAddress inetSocketAddress = new InetSocketAddress(nicAddress, port); + stopped = false; - executor = Executors.newFixedThreadPool(maxConnections); + executor = new ThreadPoolExecutor( + maxThreadPoolSize, + maxThreadPoolSize, + 60L, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), + new BasicThreadFactory.Builder().namingPattern(inetSocketAddress.toString() + "-dispatcher-%d").build()); Review comment: Yes, the decision was made due to the processor attributes are not available here. Also, I decided to change the default value to make it easier to follow the threads in case multiple ListenTCP processors are in the flow. By adding the address, it is easy to identify which processor it is related to which adding the class does not help. I am aware of that this is not following the general convention but due to the possible multiplication of the pools it makes sense to me. Adding the class as prefix as well might work but it might be too long. For now I went with a common ground type of solution and replaced dispatcher postfix with worker. For the prefix I intend to keep it, expect you insist to change. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org