This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 288031f5f6 NIFI-11210 Added read timeout to 
DistributedMapCacheClientService
288031f5f6 is described below

commit 288031f5f699ed952d5a53a45fc59f417dc05eaa
Author: Paul Grey <[email protected]>
AuthorDate: Fri Feb 24 17:52:05 2023 -0500

    NIFI-11210 Added read timeout to DistributedMapCacheClientService
    
    This closes #6994
    
    Co-authored-by: David Handermann <[email protected]>
    Signed-off-by: David Handermann <[email protected]>
---
 .../event/transport/netty/NettyEventServer.java    |   3 +-
 .../client/CacheClientChannelInitializer.java      |   2 +-
 .../cache/client/CacheClientRequestHandler.java    |  16 ++-
 .../client/DistributedMapCacheClientService.java   |   3 +-
 .../client/NettyDistributedMapCacheClient.java     |  15 ++-
 .../client/adapter/BooleanInboundAdapter.java      |   2 +-
 .../map/TestDistributedMapCacheClientService.java  | 119 +++++++++++++++++++++
 7 files changed, 153 insertions(+), 7 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventServer.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventServer.java
index fb8aae59de..16ccd9d327 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventServer.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventServer.java
@@ -79,7 +79,8 @@ class NettyEventServer implements EventServer {
             final String message = String.format("Close channel interrupted: 
Remote Address [%s]", channel.remoteAddress());
             throw new EventException(message, e);
         } finally {
-            group.shutdownGracefully(shutdownQuietPeriod.toMillis(), 
shutdownTimeout.toMillis(), TimeUnit.MILLISECONDS).syncUninterruptibly();
+            group.shutdownGracefully(shutdownQuietPeriod.toMillis(), 
shutdownTimeout.toMillis(), TimeUnit.MILLISECONDS)
+                    .awaitUninterruptibly(shutdownTimeout.toMillis(), 
TimeUnit.MILLISECONDS);
         }
     }
 }
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientChannelInitializer.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientChannelInitializer.java
index 13e50bd9f7..0c4c47ece2 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientChannelInitializer.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientChannelInitializer.java
@@ -79,7 +79,7 @@ public class CacheClientChannelInitializer extends 
ChannelInitializer<Channel> {
         channelPipeline.addFirst(new 
IdleStateHandler(idleTimeout.getSeconds(), idleTimeout.getSeconds(), 
idleTimeout.getSeconds(), TimeUnit.SECONDS));
         channelPipeline.addLast(new 
WriteTimeoutHandler(writeTimeout.toMillis(), TimeUnit.MILLISECONDS));
         channelPipeline.addLast(new CacheClientHandshakeHandler(channel, 
versionNegotiator, writeTimeout.toMillis()));
-        channelPipeline.addLast(new CacheClientRequestHandler());
+        channelPipeline.addLast(new 
CacheClientRequestHandler(writeTimeout.toMillis()));
         channelPipeline.addLast(new CloseContextIdleStateHandler());
     }
 }
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientRequestHandler.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientRequestHandler.java
index 50eeb0de4a..7104768c09 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientRequestHandler.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientRequestHandler.java
@@ -27,6 +27,8 @@ import 
org.apache.nifi.distributed.cache.client.adapter.NullInboundAdapter;
 import org.apache.nifi.distributed.cache.client.adapter.OutboundAdapter;
 
 import java.io.IOException;
+import java.net.SocketTimeoutException;
+import java.util.concurrent.TimeUnit;
 
 /**
  * The {@link io.netty.channel.ChannelHandler} responsible for sending client 
requests and receiving server responses
@@ -44,6 +46,15 @@ public class CacheClientRequestHandler extends 
ChannelInboundHandlerAdapter {
      */
     private ChannelPromise channelPromise;
 
+    /**
+     * THe network timeout associated with the connection
+     */
+    private final long timeoutMillis;
+
+    public CacheClientRequestHandler(final long timeoutMillis) {
+        this.timeoutMillis = timeoutMillis;
+    }
+
     @Override
     public void channelRead(final ChannelHandlerContext ctx, final Object msg) 
{
         final ByteBuf byteBuf = (ByteBuf) msg;
@@ -97,7 +108,10 @@ public class CacheClientRequestHandler extends 
ChannelInboundHandlerAdapter {
             this.inboundAdapter = inboundAdapter;
             channelPromise = channel.newPromise();
             
channel.writeAndFlush(Unpooled.wrappedBuffer(outboundAdapter.toBytes()));
-            channelPromise.awaitUninterruptibly();
+            final boolean completed = 
channelPromise.awaitUninterruptibly(timeoutMillis, TimeUnit.MILLISECONDS);
+            if (!completed) {
+                throw new SocketTimeoutException(String.format("Request 
invocation timeout [%d ms] to remote address [%s]", timeoutMillis, 
channel.remoteAddress()));
+            }
             this.inboundAdapter = new NullInboundAdapter();
             if (channelPromise.cause() != null) {
                 throw new IOException("Request invocation failed", 
channelPromise.cause());
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
index f671997e14..d46c79feb4 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
@@ -113,7 +113,8 @@ public class DistributedMapCacheClientService extends 
AbstractControllerService
                 context.getProperty(PORT).asInteger(),
                 
context.getProperty(COMMUNICATIONS_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(),
                 
context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class),
-                versionNegotiatorFactory);
+                versionNegotiatorFactory,
+                getLogger());
     }
 
     @OnShutdown
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/NettyDistributedMapCacheClient.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/NettyDistributedMapCacheClient.java
index ed25053319..5be429c1b3 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/NettyDistributedMapCacheClient.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/NettyDistributedMapCacheClient.java
@@ -27,12 +27,14 @@ import 
org.apache.nifi.distributed.cache.client.adapter.ValueInboundAdapter;
 import org.apache.nifi.distributed.cache.client.adapter.VoidInboundAdapter;
 import org.apache.nifi.distributed.cache.operations.MapOperation;
 import org.apache.nifi.distributed.cache.protocol.ProtocolVersion;
+import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.remote.VersionNegotiatorFactory;
 import org.apache.nifi.ssl.SSLContextService;
 
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 
 /**
@@ -40,6 +42,7 @@ import java.util.Set;
  * communication services.
  */
 public class NettyDistributedMapCacheClient extends DistributedCacheClient {
+    private final ComponentLog log;
 
     /**
      * Constructor.
@@ -50,14 +53,18 @@ public class NettyDistributedMapCacheClient extends 
DistributedCacheClient {
      * @param sslContextService the SSL context (if any) associated with 
requests to the service; if not specified,
      *                          communications will not be encrypted
      * @param factory           creator of object used to broker the version 
of the distributed cache protocol with the service
+     * @param log               Component Log from instantiating Services
      */
     public NettyDistributedMapCacheClient(
             final String hostname,
             final int port,
             final int timeoutMillis,
             final SSLContextService sslContextService,
-            final VersionNegotiatorFactory factory) {
+            final VersionNegotiatorFactory factory,
+            final ComponentLog log
+    ) {
         super(hostname, port, timeoutMillis, sslContextService, factory);
+        this.log = Objects.requireNonNull(log, "Component Log required");
     }
 
     /**
@@ -312,7 +319,11 @@ public class NettyDistributedMapCacheClient extends 
DistributedCacheClient {
      * @throws IOException if unable to communicate with the remote instance
      */
     public void close() throws IOException {
-        invoke(new OutboundAdapter().write(MapOperation.CLOSE.value()), new 
VoidInboundAdapter());
+        try {
+            invoke(new OutboundAdapter().write(MapOperation.CLOSE.value()), 
new VoidInboundAdapter());
+        } catch (final Exception e) {
+            log.warn("Sending close command failed: closing channel", e);
+        }
         closeChannelPool();
     }
 }
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/adapter/BooleanInboundAdapter.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/adapter/BooleanInboundAdapter.java
index eb64c88362..191c3c2225 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/adapter/BooleanInboundAdapter.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/adapter/BooleanInboundAdapter.java
@@ -48,7 +48,7 @@ public class BooleanInboundAdapter implements InboundAdapter {
      * @return the service method response value
      */
     public boolean getResult() {
-        return result;
+        return (result != null) && result;
     }
 
     @Override
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestDistributedMapCacheClientService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestDistributedMapCacheClientService.java
new file mode 100644
index 0000000000..465b76cb42
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestDistributedMapCacheClientService.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server.map;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import 
org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.distributed.cache.operations.MapOperation;
+import org.apache.nifi.distributed.cache.protocol.ProtocolVersion;
+import 
org.apache.nifi.distributed.cache.server.codec.CacheVersionRequestHandler;
+import 
org.apache.nifi.distributed.cache.server.codec.CacheVersionResponseEncoder;
+import org.apache.nifi.distributed.cache.server.codec.MapCacheRequestDecoder;
+import org.apache.nifi.event.transport.EventServer;
+import org.apache.nifi.event.transport.configuration.ShutdownQuietPeriod;
+import org.apache.nifi.event.transport.configuration.TransportProtocol;
+import org.apache.nifi.event.transport.netty.NettyEventServerFactory;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.remote.StandardVersionNegotiator;
+import org.apache.nifi.remote.VersionNegotiator;
+import org.apache.nifi.remote.io.socket.NetworkUtils;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.NoOpProcessor;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.SocketTimeoutException;
+import java.net.UnknownHostException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class TestDistributedMapCacheClientService {
+    private static final String LOCALHOST = "127.0.0.1";
+
+    private static final int MAX_REQUEST_LENGTH = 64;
+
+    private final Serializer<String> serializer = new StringSerializer();
+
+    private int port;
+
+    private EventServer server;
+
+    private TestRunner runner;
+
+    @BeforeEach
+    public void setRunner() throws UnknownHostException {
+        runner = TestRunners.newTestRunner(NoOpProcessor.class);
+        port = NetworkUtils.getAvailableTcpPort();
+
+        final InetAddress serverAddress = InetAddress.getByName(LOCALHOST);
+        final NettyEventServerFactory serverFactory = new 
NettyEventServerFactory(serverAddress, port, TransportProtocol.TCP);
+        
serverFactory.setShutdownQuietPeriod(ShutdownQuietPeriod.QUICK.getDuration());
+        
serverFactory.setShutdownTimeout(ShutdownQuietPeriod.QUICK.getDuration());
+        final ComponentLog log = runner.getLogger();
+        final VersionNegotiator versionNegotiator = new 
StandardVersionNegotiator(ProtocolVersion.V3.value());
+
+        serverFactory.setHandlerSupplier(() -> Arrays.asList(
+                new CacheVersionResponseEncoder(),
+                new MapCacheRequestDecoder(log, MAX_REQUEST_LENGTH, 
MapOperation.values()),
+                new CacheVersionRequestHandler(log, versionNegotiator)
+        ));
+        server = serverFactory.getEventServer();
+    }
+
+    @AfterEach
+    public void shutdownServer() {
+        server.shutdown();
+    }
+
+    /**
+     * Service will hold request long enough for client timeout to be 
triggered, thus causing the request to fail.
+     */
+    @Test
+    public void testClientTimeoutOnServerNetworkFailure() throws 
InitializationException {
+        final String clientId = 
DistributedMapCacheClientService.class.getSimpleName();
+        final DistributedMapCacheClientService clientService = new 
DistributedMapCacheClientService();
+
+        runner.addControllerService(clientId, clientService);
+        runner.setProperty(clientService, 
DistributedMapCacheClientService.HOSTNAME, LOCALHOST);
+        runner.setProperty(clientService, 
DistributedMapCacheClientService.PORT, String.valueOf(port));
+        runner.setProperty(clientService, 
DistributedMapCacheClientService.COMMUNICATIONS_TIMEOUT, "500 ms");
+        runner.enableControllerService(clientService);
+        runner.assertValid();
+
+        try {
+            assertThrows(SocketTimeoutException.class, () -> 
clientService.put("key", "value", serializer, serializer));
+        } finally {
+            runner.disableControllerService(clientService);
+        }
+    }
+
+    private static class StringSerializer implements Serializer<String> {
+        @Override
+        public void serialize(final String value, final OutputStream output) 
throws IOException {
+            output.write(value.getBytes(StandardCharsets.UTF_8));
+        }
+    }
+}

Reply via email to