turcsanyip commented on a change in pull request #4689: URL: https://github.com/apache/nifi/pull/4689#discussion_r532445464
########## File path: nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java ########## @@ -52,61 +53,80 @@ private final EventFactory<E> eventFactory; private final ChannelHandlerFactory<E, AsyncChannelDispatcher> handlerFactory; - private final BlockingQueue<ByteBuffer> bufferPool; + private final ByteBufferSource bufferSource; private final BlockingQueue<E> events; private final ComponentLog logger; private final int maxConnections; + private final int maxThreadPoolSize; private final SSLContext sslContext; private final ClientAuth clientAuth; private final Charset charset; - private ExecutorService executor; + private ThreadPoolExecutor executor; private volatile boolean stopped = false; private Selector selector; private final BlockingQueue<SelectionKey> keyQueue; private final AtomicInteger currentConnections = new AtomicInteger(0); public SocketChannelDispatcher(final EventFactory<E> eventFactory, final ChannelHandlerFactory<E, AsyncChannelDispatcher> handlerFactory, - final BlockingQueue<ByteBuffer> bufferPool, + final ByteBufferSource bufferSource, + final BlockingQueue<E> events, + final ComponentLog logger, + final int maxConnections, + final SSLContext sslContext, + final Charset charset) { + this(eventFactory, handlerFactory, bufferSource, events, logger, maxConnections, sslContext, ClientAuth.REQUIRED, charset); + } + + public SocketChannelDispatcher(final EventFactory<E> eventFactory, + final ChannelHandlerFactory<E, AsyncChannelDispatcher> handlerFactory, + final ByteBufferSource bufferSource, final BlockingQueue<E> events, final ComponentLog logger, final int maxConnections, final SSLContext sslContext, + final ClientAuth clientAuth, final Charset charset) { - this(eventFactory, handlerFactory, bufferPool, events, logger, maxConnections, sslContext, ClientAuth.REQUIRED, charset); + this(eventFactory, handlerFactory, bufferSource, events, logger, maxConnections, maxConnections, sslContext, clientAuth, charset); } public SocketChannelDispatcher(final EventFactory<E> eventFactory, final ChannelHandlerFactory<E, AsyncChannelDispatcher> handlerFactory, - final BlockingQueue<ByteBuffer> bufferPool, + final ByteBufferSource bufferSource, final BlockingQueue<E> events, final ComponentLog logger, final int maxConnections, + final int maxThreadPoolSize, final SSLContext sslContext, final ClientAuth clientAuth, final Charset charset) { this.eventFactory = eventFactory; this.handlerFactory = handlerFactory; - this.bufferPool = bufferPool; + this.bufferSource = bufferSource; this.events = events; this.logger = logger; this.maxConnections = maxConnections; + this.maxThreadPoolSize = maxThreadPoolSize; this.keyQueue = new LinkedBlockingQueue<>(maxConnections); this.sslContext = sslContext; this.clientAuth = clientAuth; this.charset = charset; - - if (bufferPool == null || bufferPool.size() == 0 || bufferPool.size() != maxConnections) { - throw new IllegalArgumentException( - "A pool of available ByteBuffers equal to the maximum number of connections is required"); - } } @Override public void open(final InetAddress nicAddress, final int port, final int maxBufferSize) throws IOException { + final InetSocketAddress inetSocketAddress = new InetSocketAddress(nicAddress, port); + stopped = false; - executor = Executors.newFixedThreadPool(maxConnections); + executor = new ThreadPoolExecutor( + maxThreadPoolSize, + maxThreadPoolSize, + 60L, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), + new BasicThreadFactory.Builder().namingPattern(inetSocketAddress.toString() + "-dispatcher-%d").build()); Review comment: It is fine with me. Though the network address is not the best but much better than pool-x-thread-y. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org