This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 288031f5f6 NIFI-11210 Added read timeout to
DistributedMapCacheClientService
288031f5f6 is described below
commit 288031f5f699ed952d5a53a45fc59f417dc05eaa
Author: Paul Grey <[email protected]>
AuthorDate: Fri Feb 24 17:52:05 2023 -0500
NIFI-11210 Added read timeout to DistributedMapCacheClientService
This closes #6994
Co-authored-by: David Handermann <[email protected]>
Signed-off-by: David Handermann <[email protected]>
---
.../event/transport/netty/NettyEventServer.java | 3 +-
.../client/CacheClientChannelInitializer.java | 2 +-
.../cache/client/CacheClientRequestHandler.java | 16 ++-
.../client/DistributedMapCacheClientService.java | 3 +-
.../client/NettyDistributedMapCacheClient.java | 15 ++-
.../client/adapter/BooleanInboundAdapter.java | 2 +-
.../map/TestDistributedMapCacheClientService.java | 119 +++++++++++++++++++++
7 files changed, 153 insertions(+), 7 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventServer.java
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventServer.java
index fb8aae59de..16ccd9d327 100644
---
a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventServer.java
+++
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventServer.java
@@ -79,7 +79,8 @@ class NettyEventServer implements EventServer {
final String message = String.format("Close channel interrupted:
Remote Address [%s]", channel.remoteAddress());
throw new EventException(message, e);
} finally {
- group.shutdownGracefully(shutdownQuietPeriod.toMillis(),
shutdownTimeout.toMillis(), TimeUnit.MILLISECONDS).syncUninterruptibly();
+ group.shutdownGracefully(shutdownQuietPeriod.toMillis(),
shutdownTimeout.toMillis(), TimeUnit.MILLISECONDS)
+ .awaitUninterruptibly(shutdownTimeout.toMillis(),
TimeUnit.MILLISECONDS);
}
}
}
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientChannelInitializer.java
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientChannelInitializer.java
index 13e50bd9f7..0c4c47ece2 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientChannelInitializer.java
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientChannelInitializer.java
@@ -79,7 +79,7 @@ public class CacheClientChannelInitializer extends
ChannelInitializer<Channel> {
channelPipeline.addFirst(new
IdleStateHandler(idleTimeout.getSeconds(), idleTimeout.getSeconds(),
idleTimeout.getSeconds(), TimeUnit.SECONDS));
channelPipeline.addLast(new
WriteTimeoutHandler(writeTimeout.toMillis(), TimeUnit.MILLISECONDS));
channelPipeline.addLast(new CacheClientHandshakeHandler(channel,
versionNegotiator, writeTimeout.toMillis()));
- channelPipeline.addLast(new CacheClientRequestHandler());
+ channelPipeline.addLast(new
CacheClientRequestHandler(writeTimeout.toMillis()));
channelPipeline.addLast(new CloseContextIdleStateHandler());
}
}
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientRequestHandler.java
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientRequestHandler.java
index 50eeb0de4a..7104768c09 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientRequestHandler.java
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientRequestHandler.java
@@ -27,6 +27,8 @@ import
org.apache.nifi.distributed.cache.client.adapter.NullInboundAdapter;
import org.apache.nifi.distributed.cache.client.adapter.OutboundAdapter;
import java.io.IOException;
+import java.net.SocketTimeoutException;
+import java.util.concurrent.TimeUnit;
/**
* The {@link io.netty.channel.ChannelHandler} responsible for sending client
requests and receiving server responses
@@ -44,6 +46,15 @@ public class CacheClientRequestHandler extends
ChannelInboundHandlerAdapter {
*/
private ChannelPromise channelPromise;
+ /**
+ * THe network timeout associated with the connection
+ */
+ private final long timeoutMillis;
+
+ public CacheClientRequestHandler(final long timeoutMillis) {
+ this.timeoutMillis = timeoutMillis;
+ }
+
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg)
{
final ByteBuf byteBuf = (ByteBuf) msg;
@@ -97,7 +108,10 @@ public class CacheClientRequestHandler extends
ChannelInboundHandlerAdapter {
this.inboundAdapter = inboundAdapter;
channelPromise = channel.newPromise();
channel.writeAndFlush(Unpooled.wrappedBuffer(outboundAdapter.toBytes()));
- channelPromise.awaitUninterruptibly();
+ final boolean completed =
channelPromise.awaitUninterruptibly(timeoutMillis, TimeUnit.MILLISECONDS);
+ if (!completed) {
+ throw new SocketTimeoutException(String.format("Request
invocation timeout [%d ms] to remote address [%s]", timeoutMillis,
channel.remoteAddress()));
+ }
this.inboundAdapter = new NullInboundAdapter();
if (channelPromise.cause() != null) {
throw new IOException("Request invocation failed",
channelPromise.cause());
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
index f671997e14..d46c79feb4 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
@@ -113,7 +113,8 @@ public class DistributedMapCacheClientService extends
AbstractControllerService
context.getProperty(PORT).asInteger(),
context.getProperty(COMMUNICATIONS_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(),
context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class),
- versionNegotiatorFactory);
+ versionNegotiatorFactory,
+ getLogger());
}
@OnShutdown
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/NettyDistributedMapCacheClient.java
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/NettyDistributedMapCacheClient.java
index ed25053319..5be429c1b3 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/NettyDistributedMapCacheClient.java
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/NettyDistributedMapCacheClient.java
@@ -27,12 +27,14 @@ import
org.apache.nifi.distributed.cache.client.adapter.ValueInboundAdapter;
import org.apache.nifi.distributed.cache.client.adapter.VoidInboundAdapter;
import org.apache.nifi.distributed.cache.operations.MapOperation;
import org.apache.nifi.distributed.cache.protocol.ProtocolVersion;
+import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.remote.VersionNegotiatorFactory;
import org.apache.nifi.ssl.SSLContextService;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
/**
@@ -40,6 +42,7 @@ import java.util.Set;
* communication services.
*/
public class NettyDistributedMapCacheClient extends DistributedCacheClient {
+ private final ComponentLog log;
/**
* Constructor.
@@ -50,14 +53,18 @@ public class NettyDistributedMapCacheClient extends
DistributedCacheClient {
* @param sslContextService the SSL context (if any) associated with
requests to the service; if not specified,
* communications will not be encrypted
* @param factory creator of object used to broker the version
of the distributed cache protocol with the service
+ * @param log Component Log from instantiating Services
*/
public NettyDistributedMapCacheClient(
final String hostname,
final int port,
final int timeoutMillis,
final SSLContextService sslContextService,
- final VersionNegotiatorFactory factory) {
+ final VersionNegotiatorFactory factory,
+ final ComponentLog log
+ ) {
super(hostname, port, timeoutMillis, sslContextService, factory);
+ this.log = Objects.requireNonNull(log, "Component Log required");
}
/**
@@ -312,7 +319,11 @@ public class NettyDistributedMapCacheClient extends
DistributedCacheClient {
* @throws IOException if unable to communicate with the remote instance
*/
public void close() throws IOException {
- invoke(new OutboundAdapter().write(MapOperation.CLOSE.value()), new
VoidInboundAdapter());
+ try {
+ invoke(new OutboundAdapter().write(MapOperation.CLOSE.value()),
new VoidInboundAdapter());
+ } catch (final Exception e) {
+ log.warn("Sending close command failed: closing channel", e);
+ }
closeChannelPool();
}
}
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/adapter/BooleanInboundAdapter.java
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/adapter/BooleanInboundAdapter.java
index eb64c88362..191c3c2225 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/adapter/BooleanInboundAdapter.java
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/adapter/BooleanInboundAdapter.java
@@ -48,7 +48,7 @@ public class BooleanInboundAdapter implements InboundAdapter {
* @return the service method response value
*/
public boolean getResult() {
- return result;
+ return (result != null) && result;
}
@Override
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestDistributedMapCacheClientService.java
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestDistributedMapCacheClientService.java
new file mode 100644
index 0000000000..465b76cb42
--- /dev/null
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestDistributedMapCacheClientService.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server.map;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import
org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.distributed.cache.operations.MapOperation;
+import org.apache.nifi.distributed.cache.protocol.ProtocolVersion;
+import
org.apache.nifi.distributed.cache.server.codec.CacheVersionRequestHandler;
+import
org.apache.nifi.distributed.cache.server.codec.CacheVersionResponseEncoder;
+import org.apache.nifi.distributed.cache.server.codec.MapCacheRequestDecoder;
+import org.apache.nifi.event.transport.EventServer;
+import org.apache.nifi.event.transport.configuration.ShutdownQuietPeriod;
+import org.apache.nifi.event.transport.configuration.TransportProtocol;
+import org.apache.nifi.event.transport.netty.NettyEventServerFactory;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.remote.StandardVersionNegotiator;
+import org.apache.nifi.remote.VersionNegotiator;
+import org.apache.nifi.remote.io.socket.NetworkUtils;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.NoOpProcessor;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.SocketTimeoutException;
+import java.net.UnknownHostException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class TestDistributedMapCacheClientService {
+ private static final String LOCALHOST = "127.0.0.1";
+
+ private static final int MAX_REQUEST_LENGTH = 64;
+
+ private final Serializer<String> serializer = new StringSerializer();
+
+ private int port;
+
+ private EventServer server;
+
+ private TestRunner runner;
+
+ @BeforeEach
+ public void setRunner() throws UnknownHostException {
+ runner = TestRunners.newTestRunner(NoOpProcessor.class);
+ port = NetworkUtils.getAvailableTcpPort();
+
+ final InetAddress serverAddress = InetAddress.getByName(LOCALHOST);
+ final NettyEventServerFactory serverFactory = new
NettyEventServerFactory(serverAddress, port, TransportProtocol.TCP);
+
serverFactory.setShutdownQuietPeriod(ShutdownQuietPeriod.QUICK.getDuration());
+
serverFactory.setShutdownTimeout(ShutdownQuietPeriod.QUICK.getDuration());
+ final ComponentLog log = runner.getLogger();
+ final VersionNegotiator versionNegotiator = new
StandardVersionNegotiator(ProtocolVersion.V3.value());
+
+ serverFactory.setHandlerSupplier(() -> Arrays.asList(
+ new CacheVersionResponseEncoder(),
+ new MapCacheRequestDecoder(log, MAX_REQUEST_LENGTH,
MapOperation.values()),
+ new CacheVersionRequestHandler(log, versionNegotiator)
+ ));
+ server = serverFactory.getEventServer();
+ }
+
+ @AfterEach
+ public void shutdownServer() {
+ server.shutdown();
+ }
+
+ /**
+ * Service will hold request long enough for client timeout to be
triggered, thus causing the request to fail.
+ */
+ @Test
+ public void testClientTimeoutOnServerNetworkFailure() throws
InitializationException {
+ final String clientId =
DistributedMapCacheClientService.class.getSimpleName();
+ final DistributedMapCacheClientService clientService = new
DistributedMapCacheClientService();
+
+ runner.addControllerService(clientId, clientService);
+ runner.setProperty(clientService,
DistributedMapCacheClientService.HOSTNAME, LOCALHOST);
+ runner.setProperty(clientService,
DistributedMapCacheClientService.PORT, String.valueOf(port));
+ runner.setProperty(clientService,
DistributedMapCacheClientService.COMMUNICATIONS_TIMEOUT, "500 ms");
+ runner.enableControllerService(clientService);
+ runner.assertValid();
+
+ try {
+ assertThrows(SocketTimeoutException.class, () ->
clientService.put("key", "value", serializer, serializer));
+ } finally {
+ runner.disableControllerService(clientService);
+ }
+ }
+
+ private static class StringSerializer implements Serializer<String> {
+ @Override
+ public void serialize(final String value, final OutputStream output)
throws IOException {
+ output.write(value.getBytes(StandardCharsets.UTF_8));
+ }
+ }
+}