This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 40d7d88bc0 NIFI-10232 Set write and idle timeouts in
DistributedCacheClient
40d7d88bc0 is described below
commit 40d7d88bc0b1513604a021f9dad662906b624b70
Author: Nathan Gough <[email protected]>
AuthorDate: Mon Jul 18 18:39:53 2022 -0400
NIFI-10232 Set write and idle timeouts in DistributedCacheClient
- Added OnShutdown annotation to Cache Server and Cache Client Service
methods
This closes #6221
Signed-off-by: David Handermann <[email protected]>
---
.../cache/client/CacheClientChannelInitializer.java | 15 ++++++++++++++-
.../cache/client/CacheClientChannelPoolFactory.java | 3 ++-
.../distributed/cache/client/DistributedCacheClient.java | 2 +-
.../cache/client/DistributedMapCacheClientService.java | 15 +++++----------
.../distributed/cache/server/DistributedCacheServer.java | 2 ++
5 files changed, 24 insertions(+), 13 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientChannelInitializer.java
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientChannelInitializer.java
index 6c8f334197..4f891f448b 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientChannelInitializer.java
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientChannelInitializer.java
@@ -20,11 +20,16 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.ssl.SslHandler;
+import io.netty.handler.timeout.IdleStateHandler;
+import io.netty.handler.timeout.WriteTimeoutHandler;
+import org.apache.nifi.event.transport.netty.CloseContextIdleStateHandler;
import org.apache.nifi.remote.VersionNegotiator;
import org.apache.nifi.remote.VersionNegotiatorFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
/**
* Bootstrap a new netty connection. This performs the socket handshake used
by the nifi distributed set /
@@ -43,15 +48,20 @@ public class CacheClientChannelInitializer extends
ChannelInitializer<Channel> {
*/
private final VersionNegotiatorFactory versionNegotiatorFactory;
+ private final Duration idleTimeout;
+ private final Duration writeTimeout;
+
/**
* Constructor.
*
* @param sslContext the secure context (if any) to be associated with the
channel
* @param factory creator of object used to broker the version of the
distributed cache protocol with the service
*/
- public CacheClientChannelInitializer(final SSLContext sslContext, final
VersionNegotiatorFactory factory) {
+ public CacheClientChannelInitializer(final SSLContext sslContext, final
VersionNegotiatorFactory factory, final Duration idleTimeout, final Duration
writeTimeout) {
this.sslContext = sslContext;
this.versionNegotiatorFactory = factory;
+ this.idleTimeout = idleTimeout;
+ this.writeTimeout = writeTimeout;
}
@Override
@@ -66,7 +76,10 @@ public class CacheClientChannelInitializer extends
ChannelInitializer<Channel> {
}
final VersionNegotiator versionNegotiator =
versionNegotiatorFactory.create();
+ channelPipeline.addFirst(new
IdleStateHandler(idleTimeout.getSeconds(), idleTimeout.getSeconds(),
idleTimeout.getSeconds(), TimeUnit.SECONDS));
+ channelPipeline.addLast(new
WriteTimeoutHandler(writeTimeout.toMillis(), TimeUnit.MILLISECONDS));
channelPipeline.addLast(new CacheClientHandshakeHandler(channel,
versionNegotiator));
channelPipeline.addLast(new CacheClientRequestHandler());
+ channelPipeline.addLast(new CloseContextIdleStateHandler());
}
}
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientChannelPoolFactory.java
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientChannelPoolFactory.java
index 98a7e9f8b4..7499c68d52 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientChannelPoolFactory.java
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientChannelPoolFactory.java
@@ -31,6 +31,7 @@ import org.apache.nifi.remote.VersionNegotiatorFactory;
import org.apache.nifi.ssl.SSLContextService;
import javax.net.ssl.SSLContext;
+import java.time.Duration;
/**
* Factory for construction of new {@link ChannelPool}, used by distributed
cache clients to invoke service
@@ -71,7 +72,7 @@ public class CacheClientChannelPoolFactory {
final SSLContext sslContext = (sslContextService == null) ? null :
sslContextService.createContext();
final EventLoopGroup group = new NioEventLoopGroup();
final Bootstrap bootstrap = new Bootstrap();
- final CacheClientChannelInitializer initializer = new
CacheClientChannelInitializer(sslContext, factory);
+ final CacheClientChannelInitializer initializer = new
CacheClientChannelInitializer(sslContext, factory,
Duration.ofMillis(timeoutMillis), Duration.ofMillis(timeoutMillis));
bootstrap.group(group)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeoutMillis)
.remoteAddress(hostname, port)
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedCacheClient.java
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedCacheClient.java
index 60b7bf40f2..b2bfac78e8 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedCacheClient.java
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedCacheClient.java
@@ -65,7 +65,7 @@ public class DistributedCacheClient {
protected void invoke(final OutboundAdapter outboundAdapter, final
InboundAdapter inboundAdapter) throws IOException {
final Channel channel =
channelPool.acquire().syncUninterruptibly().getNow();
try {
- final CacheClientRequestHandler requestHandler =
(CacheClientRequestHandler) channel.pipeline().last();
+ final CacheClientRequestHandler requestHandler =
channel.pipeline().get(CacheClientRequestHandler.class);
requestHandler.invoke(channel, outboundAdapter, inboundAdapter);
} finally {
channelPool.release(channel).syncUninterruptibly();
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
index c62b63bb3d..f671997e14 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
@@ -21,7 +21,7 @@ import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
-import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
@@ -116,21 +116,16 @@ public class DistributedMapCacheClientService extends
AbstractControllerService
versionNegotiatorFactory);
}
+ @OnShutdown
@OnDisabled
public void onDisabled() throws IOException {
- getLogger().debug("Disabling Map Cache Client Service");
- this.cacheClient.close();
+ if (cacheClient != null) {
+ this.cacheClient.close();
+ }
this.versionNegotiatorFactory = null;
this.cacheClient = null;
}
- @OnStopped
- public void onStopped() throws IOException {
- if (isEnabled()) {
- onDisabled();
- }
- }
-
@Override
public <K, V> boolean putIfAbsent(final K key, final V value, final
Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws
IOException {
final byte[] bytesKey = CacheClientSerde.serialize(key, keySerializer);
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java
index 9d480964ea..573d95454c 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java
@@ -22,6 +22,7 @@ import java.util.List;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
@@ -99,6 +100,7 @@ public abstract class DistributedCacheServer extends
AbstractControllerService {
}
}
+ @OnShutdown
@OnDisabled
public void shutdownServer() throws IOException {
if (cacheServer != null) {