This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 40d7d88bc0 NIFI-10232 Set write and idle timeouts in 
DistributedCacheClient
40d7d88bc0 is described below

commit 40d7d88bc0b1513604a021f9dad662906b624b70
Author: Nathan Gough <[email protected]>
AuthorDate: Mon Jul 18 18:39:53 2022 -0400

    NIFI-10232 Set write and idle timeouts in DistributedCacheClient
    
    - Added OnShutdown annotation to Cache Server and Cache Client Service 
methods
    
    This closes #6221
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../cache/client/CacheClientChannelInitializer.java       | 15 ++++++++++++++-
 .../cache/client/CacheClientChannelPoolFactory.java       |  3 ++-
 .../distributed/cache/client/DistributedCacheClient.java  |  2 +-
 .../cache/client/DistributedMapCacheClientService.java    | 15 +++++----------
 .../distributed/cache/server/DistributedCacheServer.java  |  2 ++
 5 files changed, 24 insertions(+), 13 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientChannelInitializer.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientChannelInitializer.java
index 6c8f334197..4f891f448b 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientChannelInitializer.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientChannelInitializer.java
@@ -20,11 +20,16 @@ import io.netty.channel.Channel;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelPipeline;
 import io.netty.handler.ssl.SslHandler;
+import io.netty.handler.timeout.IdleStateHandler;
+import io.netty.handler.timeout.WriteTimeoutHandler;
+import org.apache.nifi.event.transport.netty.CloseContextIdleStateHandler;
 import org.apache.nifi.remote.VersionNegotiator;
 import org.apache.nifi.remote.VersionNegotiatorFactory;
 
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Bootstrap a new netty connection.  This performs the socket handshake used 
by the nifi distributed set /
@@ -43,15 +48,20 @@ public class CacheClientChannelInitializer extends 
ChannelInitializer<Channel> {
      */
     private final VersionNegotiatorFactory versionNegotiatorFactory;
 
+    private final Duration idleTimeout;
+    private final Duration writeTimeout;
+
     /**
      * Constructor.
      *
      * @param sslContext the secure context (if any) to be associated with the 
channel
      * @param factory    creator of object used to broker the version of the 
distributed cache protocol with the service
      */
-    public CacheClientChannelInitializer(final SSLContext sslContext, final 
VersionNegotiatorFactory factory) {
+    public CacheClientChannelInitializer(final SSLContext sslContext, final 
VersionNegotiatorFactory factory, final Duration idleTimeout, final Duration 
writeTimeout) {
         this.sslContext = sslContext;
         this.versionNegotiatorFactory = factory;
+        this.idleTimeout = idleTimeout;
+        this.writeTimeout = writeTimeout;
     }
 
     @Override
@@ -66,7 +76,10 @@ public class CacheClientChannelInitializer extends 
ChannelInitializer<Channel> {
         }
 
         final VersionNegotiator versionNegotiator = 
versionNegotiatorFactory.create();
+        channelPipeline.addFirst(new 
IdleStateHandler(idleTimeout.getSeconds(), idleTimeout.getSeconds(), 
idleTimeout.getSeconds(), TimeUnit.SECONDS));
+        channelPipeline.addLast(new 
WriteTimeoutHandler(writeTimeout.toMillis(), TimeUnit.MILLISECONDS));
         channelPipeline.addLast(new CacheClientHandshakeHandler(channel, 
versionNegotiator));
         channelPipeline.addLast(new CacheClientRequestHandler());
+        channelPipeline.addLast(new CloseContextIdleStateHandler());
     }
 }
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientChannelPoolFactory.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientChannelPoolFactory.java
index 98a7e9f8b4..7499c68d52 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientChannelPoolFactory.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientChannelPoolFactory.java
@@ -31,6 +31,7 @@ import org.apache.nifi.remote.VersionNegotiatorFactory;
 import org.apache.nifi.ssl.SSLContextService;
 
 import javax.net.ssl.SSLContext;
+import java.time.Duration;
 
 /**
  * Factory for construction of new {@link ChannelPool}, used by distributed 
cache clients to invoke service
@@ -71,7 +72,7 @@ public class CacheClientChannelPoolFactory {
         final SSLContext sslContext = (sslContextService == null) ? null : 
sslContextService.createContext();
         final EventLoopGroup group = new NioEventLoopGroup();
         final Bootstrap bootstrap = new Bootstrap();
-        final CacheClientChannelInitializer initializer = new 
CacheClientChannelInitializer(sslContext, factory);
+        final CacheClientChannelInitializer initializer = new 
CacheClientChannelInitializer(sslContext, factory, 
Duration.ofMillis(timeoutMillis), Duration.ofMillis(timeoutMillis));
         bootstrap.group(group)
                 .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeoutMillis)
                 .remoteAddress(hostname, port)
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedCacheClient.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedCacheClient.java
index 60b7bf40f2..b2bfac78e8 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedCacheClient.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedCacheClient.java
@@ -65,7 +65,7 @@ public class DistributedCacheClient {
     protected void invoke(final OutboundAdapter outboundAdapter, final 
InboundAdapter inboundAdapter) throws IOException {
         final Channel channel = 
channelPool.acquire().syncUninterruptibly().getNow();
         try {
-            final CacheClientRequestHandler requestHandler = 
(CacheClientRequestHandler) channel.pipeline().last();
+            final CacheClientRequestHandler requestHandler = 
channel.pipeline().get(CacheClientRequestHandler.class);
             requestHandler.invoke(channel, outboundAdapter, inboundAdapter);
         } finally {
             channelPool.release(channel).syncUninterruptibly();
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
index c62b63bb3d..f671997e14 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
@@ -21,7 +21,7 @@ import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnDisabled;
 import org.apache.nifi.annotation.lifecycle.OnEnabled;
-import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.controller.ConfigurationContext;
@@ -116,21 +116,16 @@ public class DistributedMapCacheClientService extends 
AbstractControllerService
                 versionNegotiatorFactory);
     }
 
+    @OnShutdown
     @OnDisabled
     public void onDisabled() throws IOException {
-        getLogger().debug("Disabling Map Cache Client Service");
-        this.cacheClient.close();
+        if (cacheClient != null) {
+            this.cacheClient.close();
+        }
         this.versionNegotiatorFactory = null;
         this.cacheClient = null;
     }
 
-    @OnStopped
-    public void onStopped() throws IOException {
-        if (isEnabled()) {
-            onDisabled();
-        }
-    }
-
     @Override
     public <K, V> boolean putIfAbsent(final K key, final V value, final 
Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws 
IOException {
         final byte[] bytesKey = CacheClientSerde.serialize(key, keySerializer);
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java
index 9d480964ea..573d95454c 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java
@@ -22,6 +22,7 @@ import java.util.List;
 
 import org.apache.nifi.annotation.lifecycle.OnDisabled;
 import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.controller.ConfigurationContext;
@@ -99,6 +100,7 @@ public abstract class DistributedCacheServer extends 
AbstractControllerService {
         }
     }
 
+    @OnShutdown
     @OnDisabled
     public void shutdownServer() throws IOException {
         if (cacheServer != null) {

Reply via email to