This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
     new 28421e3df0 NIFI-12370 Fixed Distributed Map Cache Client Service 
Shutdown
28421e3df0 is described below

commit 28421e3df02ef3256328ad11886bf945bdbb2366
Author: exceptionfactory <exceptionfact...@apache.org>
AuthorDate: Tue Nov 14 20:30:57 2023 -0600

    NIFI-12370 Fixed Distributed Map Cache Client Service Shutdown
    
    - Moved EventLoopGroup from CacheClientChannelPoolFactory to 
DistributedCacheClient to enable closing the EventLoopGroup after closing the 
ChannelPool
    
    Signed-off-by: Pierre Villard <pierre.villard...@gmail.com>
    
    This closes #8027.
---
 .../cache/client/CacheClientChannelPoolFactory.java       | 12 ++++--------
 .../distributed/cache/client/DistributedCacheClient.java  | 15 ++++++++++++---
 2 files changed, 16 insertions(+), 11 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientChannelPoolFactory.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientChannelPoolFactory.java
index 82a529ecd9..905de8dc52 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientChannelPoolFactory.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientChannelPoolFactory.java
@@ -20,13 +20,11 @@ import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.pool.ChannelHealthChecker;
 import io.netty.channel.pool.ChannelPool;
 import io.netty.channel.pool.ChannelPoolHandler;
 import io.netty.channel.pool.FixedChannelPool;
 import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.util.concurrent.DefaultThreadFactory;
 import 
org.apache.nifi.event.transport.netty.channel.pool.InitializingChannelPoolHandler;
 import org.apache.nifi.remote.VersionNegotiatorFactory;
 import org.apache.nifi.ssl.SSLContextService;
@@ -39,10 +37,9 @@ import java.time.Duration;
  * methods.  Cache clients include the NiFi services {@link 
DistributedSetCacheClientService}
  * and {@link DistributedMapCacheClientService}.
  */
-public class CacheClientChannelPoolFactory {
+class CacheClientChannelPoolFactory {
 
     private static final int MAX_PENDING_ACQUIRES = 1024;
-    private static final boolean DAEMON_THREAD_ENABLED = true;
 
     private int maxConnections = Runtime.getRuntime().availableProcessors() * 
2;
 
@@ -64,7 +61,7 @@ public class CacheClientChannelPoolFactory {
      * @param sslContextService the SSL context (if any) associated with 
requests to the service; if not specified,
      *                          communications will not be encrypted
      * @param factory           creator of object used to broker the version 
of the distributed cache protocol with the service
-     * @param poolName          channel pool name, used for threads name prefix
+     * @param eventLoopGroup    Netty Event Loop Group providing threads for 
managing connections
      * @return a channel pool object from which {@link Channel} objects may be 
obtained
      */
     public ChannelPool createChannelPool(final String hostname,
@@ -72,12 +69,11 @@ public class CacheClientChannelPoolFactory {
                                          final int timeoutMillis,
                                          final SSLContextService 
sslContextService,
                                          final VersionNegotiatorFactory 
factory,
-                                         final String poolName) {
+                                         final EventLoopGroup eventLoopGroup) {
         final SSLContext sslContext = (sslContextService == null) ? null : 
sslContextService.createContext();
-        final EventLoopGroup group = new NioEventLoopGroup(new 
DefaultThreadFactory(poolName, DAEMON_THREAD_ENABLED));
         final Bootstrap bootstrap = new Bootstrap();
         final CacheClientChannelInitializer initializer = new 
CacheClientChannelInitializer(sslContext, factory, 
Duration.ofMillis(timeoutMillis), Duration.ofMillis(timeoutMillis));
-        bootstrap.group(group)
+        bootstrap.group(eventLoopGroup)
                 .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeoutMillis)
                 .remoteAddress(hostname, port)
                 .channel(NioSocketChannel.class);
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedCacheClient.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedCacheClient.java
index 8ebf9bacbd..6a65e26ed1 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedCacheClient.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedCacheClient.java
@@ -17,7 +17,10 @@
 package org.apache.nifi.distributed.cache.client;
 
 import io.netty.channel.Channel;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.pool.ChannelPool;
+import io.netty.util.concurrent.DefaultThreadFactory;
 import org.apache.nifi.distributed.cache.client.adapter.InboundAdapter;
 import org.apache.nifi.distributed.cache.client.adapter.OutboundAdapter;
 import org.apache.nifi.remote.VersionNegotiatorFactory;
@@ -31,11 +34,15 @@ import java.io.IOException;
  */
 public class DistributedCacheClient {
 
+    private static final boolean DAEMON_THREAD_ENABLED = true;
+
     /**
      * The pool of network connections used to service client requests.
      */
     private final ChannelPool channelPool;
 
+    private final EventLoopGroup eventLoopGroup;
+
     /**
      * Constructor.
      *
@@ -53,9 +60,10 @@ public class DistributedCacheClient {
                                      final SSLContextService sslContextService,
                                      final VersionNegotiatorFactory factory,
                                      final String identifier) {
-        String poolName = String.format("%s[%s]", getClass().getSimpleName(), 
identifier);
+        final String poolName = String.format("%s[%s]", 
getClass().getSimpleName(), identifier);
+        this.eventLoopGroup = new NioEventLoopGroup(new 
DefaultThreadFactory(poolName, DAEMON_THREAD_ENABLED));
         this.channelPool = new 
CacheClientChannelPoolFactory().createChannelPool(
-                hostname, port, timeoutMillis, sslContextService, factory, 
poolName);
+                hostname, port, timeoutMillis, sslContextService, factory, 
eventLoopGroup);
     }
 
     /**
@@ -76,9 +84,10 @@ public class DistributedCacheClient {
     }
 
     /**
-     * Shutdown {@link ChannelPool} cleanly.
+     * Close Channel Pool and supporting Event Loop Group
      */
     protected void closeChannelPool() {
         channelPool.close();
+        eventLoopGroup.close();
     }
 }

Reply via email to