This is an automated email from the ASF dual-hosted git repository. exceptionfactory pushed a commit to branch support/nifi-1.x in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 853586fedba112fd3cedbb49adacbd555cd63ceb Author: Paul Grey <[email protected]> AuthorDate: Fri Feb 24 17:52:05 2023 -0500 NIFI-11210 Added read timeout to DistributedMapCacheClientService This closes #6994 Co-authored-by: David Handermann <[email protected]> Signed-off-by: David Handermann <[email protected]> --- .../event/transport/netty/NettyEventServer.java | 3 +- .../client/CacheClientChannelInitializer.java | 2 +- .../cache/client/CacheClientRequestHandler.java | 16 ++- .../client/DistributedMapCacheClientService.java | 3 +- .../client/NettyDistributedMapCacheClient.java | 15 ++- .../client/adapter/BooleanInboundAdapter.java | 2 +- .../map/TestDistributedMapCacheClientService.java | 119 +++++++++++++++++++++ 7 files changed, 153 insertions(+), 7 deletions(-) diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventServer.java b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventServer.java index fb8aae59de..16ccd9d327 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventServer.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventServer.java @@ -79,7 +79,8 @@ class NettyEventServer implements EventServer { final String message = String.format("Close channel interrupted: Remote Address [%s]", channel.remoteAddress()); throw new EventException(message, e); } finally { - group.shutdownGracefully(shutdownQuietPeriod.toMillis(), shutdownTimeout.toMillis(), TimeUnit.MILLISECONDS).syncUninterruptibly(); + group.shutdownGracefully(shutdownQuietPeriod.toMillis(), shutdownTimeout.toMillis(), TimeUnit.MILLISECONDS) + .awaitUninterruptibly(shutdownTimeout.toMillis(), TimeUnit.MILLISECONDS); } } } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientChannelInitializer.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientChannelInitializer.java index 13e50bd9f7..0c4c47ece2 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientChannelInitializer.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientChannelInitializer.java @@ -79,7 +79,7 @@ public class CacheClientChannelInitializer extends ChannelInitializer<Channel> { channelPipeline.addFirst(new IdleStateHandler(idleTimeout.getSeconds(), idleTimeout.getSeconds(), idleTimeout.getSeconds(), TimeUnit.SECONDS)); channelPipeline.addLast(new WriteTimeoutHandler(writeTimeout.toMillis(), TimeUnit.MILLISECONDS)); channelPipeline.addLast(new CacheClientHandshakeHandler(channel, versionNegotiator, writeTimeout.toMillis())); - channelPipeline.addLast(new CacheClientRequestHandler()); + channelPipeline.addLast(new CacheClientRequestHandler(writeTimeout.toMillis())); channelPipeline.addLast(new CloseContextIdleStateHandler()); } } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientRequestHandler.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientRequestHandler.java index 50eeb0de4a..7104768c09 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientRequestHandler.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientRequestHandler.java @@ -27,6 +27,8 @@ import org.apache.nifi.distributed.cache.client.adapter.NullInboundAdapter; import org.apache.nifi.distributed.cache.client.adapter.OutboundAdapter; import java.io.IOException; +import java.net.SocketTimeoutException; +import java.util.concurrent.TimeUnit; /** * The {@link io.netty.channel.ChannelHandler} responsible for sending client requests and receiving server responses @@ -44,6 +46,15 @@ public class CacheClientRequestHandler extends ChannelInboundHandlerAdapter { */ private ChannelPromise channelPromise; + /** + * THe network timeout associated with the connection + */ + private final long timeoutMillis; + + public CacheClientRequestHandler(final long timeoutMillis) { + this.timeoutMillis = timeoutMillis; + } + @Override public void channelRead(final ChannelHandlerContext ctx, final Object msg) { final ByteBuf byteBuf = (ByteBuf) msg; @@ -97,7 +108,10 @@ public class CacheClientRequestHandler extends ChannelInboundHandlerAdapter { this.inboundAdapter = inboundAdapter; channelPromise = channel.newPromise(); channel.writeAndFlush(Unpooled.wrappedBuffer(outboundAdapter.toBytes())); - channelPromise.awaitUninterruptibly(); + final boolean completed = channelPromise.awaitUninterruptibly(timeoutMillis, TimeUnit.MILLISECONDS); + if (!completed) { + throw new SocketTimeoutException(String.format("Request invocation timeout [%d ms] to remote address [%s]", timeoutMillis, channel.remoteAddress())); + } this.inboundAdapter = new NullInboundAdapter(); if (channelPromise.cause() != null) { throw new IOException("Request invocation failed", channelPromise.cause()); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java index f671997e14..d46c79feb4 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java @@ -113,7 +113,8 @@ public class DistributedMapCacheClientService extends AbstractControllerService context.getProperty(PORT).asInteger(), context.getProperty(COMMUNICATIONS_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(), context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class), - versionNegotiatorFactory); + versionNegotiatorFactory, + getLogger()); } @OnShutdown diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/NettyDistributedMapCacheClient.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/NettyDistributedMapCacheClient.java index ed25053319..5be429c1b3 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/NettyDistributedMapCacheClient.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/NettyDistributedMapCacheClient.java @@ -27,12 +27,14 @@ import org.apache.nifi.distributed.cache.client.adapter.ValueInboundAdapter; import org.apache.nifi.distributed.cache.client.adapter.VoidInboundAdapter; import org.apache.nifi.distributed.cache.operations.MapOperation; import org.apache.nifi.distributed.cache.protocol.ProtocolVersion; +import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.remote.VersionNegotiatorFactory; import org.apache.nifi.ssl.SSLContextService; import java.io.IOException; import java.util.Collection; import java.util.Map; +import java.util.Objects; import java.util.Set; /** @@ -40,6 +42,7 @@ import java.util.Set; * communication services. */ public class NettyDistributedMapCacheClient extends DistributedCacheClient { + private final ComponentLog log; /** * Constructor. @@ -50,14 +53,18 @@ public class NettyDistributedMapCacheClient extends DistributedCacheClient { * @param sslContextService the SSL context (if any) associated with requests to the service; if not specified, * communications will not be encrypted * @param factory creator of object used to broker the version of the distributed cache protocol with the service + * @param log Component Log from instantiating Services */ public NettyDistributedMapCacheClient( final String hostname, final int port, final int timeoutMillis, final SSLContextService sslContextService, - final VersionNegotiatorFactory factory) { + final VersionNegotiatorFactory factory, + final ComponentLog log + ) { super(hostname, port, timeoutMillis, sslContextService, factory); + this.log = Objects.requireNonNull(log, "Component Log required"); } /** @@ -312,7 +319,11 @@ public class NettyDistributedMapCacheClient extends DistributedCacheClient { * @throws IOException if unable to communicate with the remote instance */ public void close() throws IOException { - invoke(new OutboundAdapter().write(MapOperation.CLOSE.value()), new VoidInboundAdapter()); + try { + invoke(new OutboundAdapter().write(MapOperation.CLOSE.value()), new VoidInboundAdapter()); + } catch (final Exception e) { + log.warn("Sending close command failed: closing channel", e); + } closeChannelPool(); } } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/adapter/BooleanInboundAdapter.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/adapter/BooleanInboundAdapter.java index eb64c88362..191c3c2225 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/adapter/BooleanInboundAdapter.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/adapter/BooleanInboundAdapter.java @@ -48,7 +48,7 @@ public class BooleanInboundAdapter implements InboundAdapter { * @return the service method response value */ public boolean getResult() { - return result; + return (result != null) && result; } @Override diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestDistributedMapCacheClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestDistributedMapCacheClientService.java new file mode 100644 index 0000000000..465b76cb42 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestDistributedMapCacheClientService.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.distributed.cache.server.map; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.distributed.cache.operations.MapOperation; +import org.apache.nifi.distributed.cache.protocol.ProtocolVersion; +import org.apache.nifi.distributed.cache.server.codec.CacheVersionRequestHandler; +import org.apache.nifi.distributed.cache.server.codec.CacheVersionResponseEncoder; +import org.apache.nifi.distributed.cache.server.codec.MapCacheRequestDecoder; +import org.apache.nifi.event.transport.EventServer; +import org.apache.nifi.event.transport.configuration.ShutdownQuietPeriod; +import org.apache.nifi.event.transport.configuration.TransportProtocol; +import org.apache.nifi.event.transport.netty.NettyEventServerFactory; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.remote.StandardVersionNegotiator; +import org.apache.nifi.remote.VersionNegotiator; +import org.apache.nifi.remote.io.socket.NetworkUtils; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.NoOpProcessor; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetAddress; +import java.net.SocketTimeoutException; +import java.net.UnknownHostException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; + +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class TestDistributedMapCacheClientService { + private static final String LOCALHOST = "127.0.0.1"; + + private static final int MAX_REQUEST_LENGTH = 64; + + private final Serializer<String> serializer = new StringSerializer(); + + private int port; + + private EventServer server; + + private TestRunner runner; + + @BeforeEach + public void setRunner() throws UnknownHostException { + runner = TestRunners.newTestRunner(NoOpProcessor.class); + port = NetworkUtils.getAvailableTcpPort(); + + final InetAddress serverAddress = InetAddress.getByName(LOCALHOST); + final NettyEventServerFactory serverFactory = new NettyEventServerFactory(serverAddress, port, TransportProtocol.TCP); + serverFactory.setShutdownQuietPeriod(ShutdownQuietPeriod.QUICK.getDuration()); + serverFactory.setShutdownTimeout(ShutdownQuietPeriod.QUICK.getDuration()); + final ComponentLog log = runner.getLogger(); + final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(ProtocolVersion.V3.value()); + + serverFactory.setHandlerSupplier(() -> Arrays.asList( + new CacheVersionResponseEncoder(), + new MapCacheRequestDecoder(log, MAX_REQUEST_LENGTH, MapOperation.values()), + new CacheVersionRequestHandler(log, versionNegotiator) + )); + server = serverFactory.getEventServer(); + } + + @AfterEach + public void shutdownServer() { + server.shutdown(); + } + + /** + * Service will hold request long enough for client timeout to be triggered, thus causing the request to fail. + */ + @Test + public void testClientTimeoutOnServerNetworkFailure() throws InitializationException { + final String clientId = DistributedMapCacheClientService.class.getSimpleName(); + final DistributedMapCacheClientService clientService = new DistributedMapCacheClientService(); + + runner.addControllerService(clientId, clientService); + runner.setProperty(clientService, DistributedMapCacheClientService.HOSTNAME, LOCALHOST); + runner.setProperty(clientService, DistributedMapCacheClientService.PORT, String.valueOf(port)); + runner.setProperty(clientService, DistributedMapCacheClientService.COMMUNICATIONS_TIMEOUT, "500 ms"); + runner.enableControllerService(clientService); + runner.assertValid(); + + try { + assertThrows(SocketTimeoutException.class, () -> clientService.put("key", "value", serializer, serializer)); + } finally { + runner.disableControllerService(clientService); + } + } + + private static class StringSerializer implements Serializer<String> { + @Override + public void serialize(final String value, final OutputStream output) throws IOException { + output.write(value.getBytes(StandardCharsets.UTF_8)); + } + } +}
