exceptionfactory commented on code in PR #6414:
URL: https://github.com/apache/nifi/pull/6414#discussion_r982858938
##########
nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestDistributedMapServerAndClient.java:
##########
@@ -298,6 +301,45 @@ public void testLimitServiceReadSize() throws
InitializationException, IOExcepti
}
}
+ @Test
+ public void testIncompleteHandshakeScenario() throws
InitializationException, IOException {
+ // Default port used by Distributed Server and Client
+ final int port = 4557;
+
+ // This is used to simulate a DistributedCacheServer that does not
complete the handshake response
+ final ServerSocketChannel serverSocketChannel =
ServerSocketChannel.open();
+ serverSocketChannel.configureBlocking(true);
+ serverSocketChannel.bind(new InetSocketAddress(port));
+ final Thread thread = startServerSocket(serverSocketChannel);
Review Comment:
This approach is somewhat brittle as failures could lead to the Thread
continuing to run. See the `StringNettyEventSenderFactoryTest` for one example
of a Netty-based socket server that could be used for testing.
##########
nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestDistributedMapServerAndClient.java:
##########
@@ -312,6 +354,26 @@ private DistributedMapCacheClientService
createClient(final int port) throws Ini
return client;
}
+ private Thread startServerSocket(ServerSocketChannel serverSocketChannel) {
+ final Runnable runnable = new Runnable() {
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ serverSocketChannel.accept();
+ } catch (final IOException e) {
+ return;
+ }
+ }
+ }
+ };
+ final Thread thread = new Thread(runnable);
+ thread.setDaemon(true);
+ thread.setName("Test Server Socket");
+ thread.start();
+ return thread;
+ }
Review Comment:
As mentioned above, it would be better to replace this approach with a test
socket server using Netty, which provides a more robust approach to managing
threads.
##########
nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestDistributedMapServerAndClient.java:
##########
@@ -298,6 +301,45 @@ public void testLimitServiceReadSize() throws
InitializationException, IOExcepti
}
}
+ @Test
+ public void testIncompleteHandshakeScenario() throws
InitializationException, IOException {
+ // Default port used by Distributed Server and Client
+ final int port = 4557;
+
+ // This is used to simulate a DistributedCacheServer that does not
complete the handshake response
+ final ServerSocketChannel serverSocketChannel =
ServerSocketChannel.open();
+ serverSocketChannel.configureBlocking(true);
+ serverSocketChannel.bind(new InetSocketAddress(port));
+ final Thread thread = startServerSocket(serverSocketChannel);
+
+ DistributedMapCacheClientService client = new
DistributedMapCacheClientService();
+ MockControllerServiceInitializationContext clientInitContext = new
MockControllerServiceInitializationContext(client, "client");
+ client.initialize(clientInitContext);
+
+ final Map<PropertyDescriptor, String> clientProperties = new
HashMap<>();
+ clientProperties.put(DistributedMapCacheClientService.HOSTNAME,
"localhost");
+ clientProperties.put(DistributedMapCacheClientService.PORT,
String.valueOf(port));
+
clientProperties.put(DistributedMapCacheClientService.COMMUNICATIONS_TIMEOUT,
"1 sec");
+
+ MockConfigurationContext clientContext = new
MockConfigurationContext(clientProperties,
clientInitContext.getControllerServiceLookup());
+ client.onEnabled(clientContext);
Review Comment:
This approach should be changed to use the NiFi Mock `TestRunner`, with
`setProperty()` and `enableControllerService()` instead.
##########
nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestDistributedMapServerAndClient.java:
##########
@@ -298,6 +301,45 @@ public void testLimitServiceReadSize() throws
InitializationException, IOExcepti
}
}
+ @Test
+ public void testIncompleteHandshakeScenario() throws
InitializationException, IOException {
+ // Default port used by Distributed Server and Client
+ final int port = 4557;
+
+ // This is used to simulate a DistributedCacheServer that does not
complete the handshake response
+ final ServerSocketChannel serverSocketChannel =
ServerSocketChannel.open();
+ serverSocketChannel.configureBlocking(true);
+ serverSocketChannel.bind(new InetSocketAddress(port));
+ final Thread thread = startServerSocket(serverSocketChannel);
+
+ DistributedMapCacheClientService client = new
DistributedMapCacheClientService();
+ MockControllerServiceInitializationContext clientInitContext = new
MockControllerServiceInitializationContext(client, "client");
+ client.initialize(clientInitContext);
+
+ final Map<PropertyDescriptor, String> clientProperties = new
HashMap<>();
+ clientProperties.put(DistributedMapCacheClientService.HOSTNAME,
"localhost");
+ clientProperties.put(DistributedMapCacheClientService.PORT,
String.valueOf(port));
+
clientProperties.put(DistributedMapCacheClientService.COMMUNICATIONS_TIMEOUT,
"1 sec");
+
+ MockConfigurationContext clientContext = new
MockConfigurationContext(clientProperties,
clientInitContext.getControllerServiceLookup());
+ client.onEnabled(clientContext);
+
+ final Serializer<String> valueSerializer = new StringSerializer();
+ final Serializer<String> keySerializer = new StringSerializer();
+ final Deserializer<String> deserializer = new StringDeserializer();
+
+ try {
+ client.getAndPutIfAbsent("testKey", "test", keySerializer,
valueSerializer, deserializer);
+ fail("Client operation should have failed due to it not being
connected to a DistributedCacheServer");
+ } catch (IOException e) {
+ // Verify cause of exception was handshake completion timeout
+ assertEquals("Handshake timed out before completion.",
e.getCause().getMessage());
+ } finally {
+ thread.interrupt();
+ serverSocketChannel.close();
+ }
Review Comment:
Instead of try-catch with `fail()`, it would be better to use
`assertThrows()`. It is also best to avoid check for the exact error message.
##########
nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestDistributedMapServerAndClient.java:
##########
@@ -298,6 +301,45 @@ public void testLimitServiceReadSize() throws
InitializationException, IOExcepti
}
}
+ @Test
+ public void testIncompleteHandshakeScenario() throws
InitializationException, IOException {
+ // Default port used by Distributed Server and Client
+ final int port = 4557;
Review Comment:
This should be changed to use a dynamic port. The
`NetworkUtils.getAvailableTcpPort()` method can be used.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]