turcsanyip commented on a change in pull request #4689:
URL: https://github.com/apache/nifi/pull/4689#discussion_r531734236



##########
File path: 
nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/SocketChannelDispatcher.java
##########
@@ -52,61 +53,80 @@
 
     private final EventFactory<E> eventFactory;
     private final ChannelHandlerFactory<E, AsyncChannelDispatcher> 
handlerFactory;
-    private final BlockingQueue<ByteBuffer> bufferPool;
+    private final ByteBufferSource bufferSource;
     private final BlockingQueue<E> events;
     private final ComponentLog logger;
     private final int maxConnections;
+    private final int maxThreadPoolSize;
     private final SSLContext sslContext;
     private final ClientAuth clientAuth;
     private final Charset charset;
 
-    private ExecutorService executor;
+    private ThreadPoolExecutor executor;
     private volatile boolean stopped = false;
     private Selector selector;
     private final BlockingQueue<SelectionKey> keyQueue;
     private final AtomicInteger currentConnections = new AtomicInteger(0);
 
     public SocketChannelDispatcher(final EventFactory<E> eventFactory,
                                    final ChannelHandlerFactory<E, 
AsyncChannelDispatcher> handlerFactory,
-                                   final BlockingQueue<ByteBuffer> bufferPool,
+                                   final ByteBufferSource bufferSource,
+                                   final BlockingQueue<E> events,
+                                   final ComponentLog logger,
+                                   final int maxConnections,
+                                   final SSLContext sslContext,
+                                   final Charset charset) {
+        this(eventFactory, handlerFactory, bufferSource, events, logger, 
maxConnections, sslContext, ClientAuth.REQUIRED, charset);
+    }
+
+    public SocketChannelDispatcher(final EventFactory<E> eventFactory,
+                                   final ChannelHandlerFactory<E, 
AsyncChannelDispatcher> handlerFactory,
+                                   final ByteBufferSource bufferSource,
                                    final BlockingQueue<E> events,
                                    final ComponentLog logger,
                                    final int maxConnections,
                                    final SSLContext sslContext,
+                                   final ClientAuth clientAuth,
                                    final Charset charset) {
-        this(eventFactory, handlerFactory, bufferPool, events, logger, 
maxConnections, sslContext, ClientAuth.REQUIRED, charset);
+        this(eventFactory, handlerFactory, bufferSource, events, logger, 
maxConnections, maxConnections, sslContext, clientAuth, charset);
     }
 
     public SocketChannelDispatcher(final EventFactory<E> eventFactory,
                                    final ChannelHandlerFactory<E, 
AsyncChannelDispatcher> handlerFactory,
-                                   final BlockingQueue<ByteBuffer> bufferPool,
+                                   final ByteBufferSource bufferSource,
                                    final BlockingQueue<E> events,
                                    final ComponentLog logger,
                                    final int maxConnections,
+                                   final int maxThreadPoolSize,
                                    final SSLContext sslContext,
                                    final ClientAuth clientAuth,
                                    final Charset charset) {
         this.eventFactory = eventFactory;
         this.handlerFactory = handlerFactory;
-        this.bufferPool = bufferPool;
+        this.bufferSource = bufferSource;
         this.events = events;
         this.logger = logger;
         this.maxConnections = maxConnections;
+        this.maxThreadPoolSize = maxThreadPoolSize;
         this.keyQueue = new LinkedBlockingQueue<>(maxConnections);
         this.sslContext = sslContext;
         this.clientAuth = clientAuth;
         this.charset = charset;
-
-        if (bufferPool == null || bufferPool.size() == 0 || bufferPool.size() 
!= maxConnections) {
-            throw new IllegalArgumentException(
-                    "A pool of available ByteBuffers equal to the maximum 
number of connections is required");
-        }
     }
 
     @Override
     public void open(final InetAddress nicAddress, final int port, final int 
maxBufferSize) throws IOException {
+        final InetSocketAddress inetSocketAddress = new 
InetSocketAddress(nicAddress, port);
+
         stopped = false;
-        executor = Executors.newFixedThreadPool(maxConnections);
+        executor = new ThreadPoolExecutor(
+                maxThreadPoolSize,
+                maxThreadPoolSize,
+                60L,
+                TimeUnit.SECONDS,
+                new LinkedBlockingQueue<>(),
+                new 
BasicThreadFactory.Builder().namingPattern(inetSocketAddress.toString() + 
"-dispatcher-%d").build());

Review comment:
       These are "worker" threads started by the dispatcher so "-worker" would 
be the appropriate suffix.
   
   Regarding the prefix: It is more typical to use the processor's (class)name 
+ id like in case of the dispatcher thread: 
https://github.com/apache/nifi/blob/fe950131c35756dabd677fb21b436a1f85eabced/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java#L198
   
   It would be better to link the dispatcher and the worker threads together 
via a same naming convention (eg. to make troubleshooting easier).
   I see that the processor's attributes are not available here right now so 
this change would not be straightforward.
   I would suggest reverting it back to the original logic (default 
pool-x-thread-y) and solving it when the other listen processors are getting 
fixed.

##########
File path: 
nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/dispatcher/ByteBufferSource.java
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.listen.dispatcher;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Manages byte buffers for the dispatchers.
+ */
+public interface ByteBufferSource {
+
+    /**
+     * @return Returns for a buffer for usage. The buffer can be pooled or 
created on demand depending on the implementation.

Review comment:
       "Returns a buffer..."

##########
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
##########
@@ -83,10 +87,37 @@
             .defaultValue(ClientAuth.REQUIRED.name())
             .build();
 
+    public static final PropertyDescriptor MAX_RECV_THREAD_POOL_SIZE = new 
PropertyDescriptor.Builder()
+            .name("max-receiving-threads")
+            .displayName("Max Number of Receiving Message Handler Threads")
+            .description(
+                    "The maximum number of threads might be available for 
handling receiving messages ready all the time. " +
+                    "Cannot be bigger than the \"Max Number of TCP 
Connections\". " +
+                    "If not set, the value of \"Max Number of TCP 
Connections\" will be used.")
+            .addValidator(StandardValidators.createLongValidator(1, 65535, 
true))
+            .required(false)
+            .build();
+
+    protected static final PropertyDescriptor POOL_RECV_BUFFERS = new 
PropertyDescriptor.Builder()
+            .name("pool-receive-buffers")
+            .displayName("Pool Receive Buffers")
+            .description(
+                    "When turned on, the processor uses pre-populated pool of 
buffers when receiving messages. " +
+                    "This is prepared during initialisation of the processor. 
" +
+                    "With high value of Max Number of TCP Connections and 
Receive Buffer Size this strategy might allocate significant amount of memory! 
" +
+                    "When turned off, the byte buffers will be created ad 
hoc.")

Review comment:
       "will be created on demand and be destroyed after use."




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to