Repository: incubator-nifi Updated Branches: refs/heads/develop 739baa2e5 -> 315af02c5
NIFI-454: Use random ports instead of specific ports for running unit tests; updated abstract class and interface to expose the port being used Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/315af02c Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/315af02c Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/315af02c Branch: refs/heads/develop Commit: 315af02c595fdf62881b839f44227d121cbda46d Parents: 739baa2 Author: Mark Payne <[email protected]> Authored: Fri Jun 5 13:56:45 2015 -0400 Committer: Mark Payne <[email protected]> Committed: Wed Jun 17 08:15:10 2015 -0400 ---------------------------------------------------------------------- .../cache/server/AbstractCacheServer.java | 8 +- .../distributed/cache/server/CacheServer.java | 2 + .../cache/server/DistributedCacheServer.java | 65 +++-- .../cache/server/TestServerAndClient.java | 292 +++++++++---------- 4 files changed, 185 insertions(+), 182 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/315af02c/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java index 10f53b2..5c5a9cb 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java @@ -52,7 +52,6 @@ public abstract class AbstractCacheServer implements CacheServer { private final SSLContext sslContext; protected volatile boolean stopped = false; private final Set<Thread> processInputThreads = new CopyOnWriteArraySet<>(); - ; private volatile ServerSocketChannel serverSocketChannel; @@ -63,6 +62,11 @@ public abstract class AbstractCacheServer implements CacheServer { } @Override + public int getPort() { + return serverSocketChannel == null ? this.port : serverSocketChannel.socket().getLocalPort(); + } + + @Override public void start() throws IOException { serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(true); @@ -117,7 +121,7 @@ public abstract class AbstractCacheServer implements CacheServer { return; } try (final InputStream in = new BufferedInputStream(rawInputStream); - final OutputStream out = new BufferedOutputStream(rawOutputStream)) { + final OutputStream out = new BufferedOutputStream(rawOutputStream)) { final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(1); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/315af02c/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheServer.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheServer.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheServer.java index fab8f13..d97c519 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheServer.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/CacheServer.java @@ -24,4 +24,6 @@ public interface CacheServer { void stop() throws IOException; + int getPort(); + } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/315af02c/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java index 5907f50..44419b5 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java @@ -35,39 +35,39 @@ public abstract class DistributedCacheServer extends AbstractControllerService { public static final String EVICTION_STRATEGY_FIFO = "First In, First Out"; public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder() - .name("Port") - .description("The port to listen on for incoming connections") - .required(true) - .addValidator(StandardValidators.PORT_VALIDATOR) - .defaultValue("4557") - .build(); + .name("Port") + .description("The port to listen on for incoming connections") + .required(true) + .addValidator(StandardValidators.PORT_VALIDATOR) + .defaultValue("4557") + .build(); public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() - .name("SSL Context Service") - .description("If specified, this service will be used to create an SSL Context that will be used " - + "to secure communications; if not specified, communications will not be secure") - .required(false) - .identifiesControllerService(SSLContextService.class) - .build(); + .name("SSL Context Service") + .description("If specified, this service will be used to create an SSL Context that will be used " + + "to secure communications; if not specified, communications will not be secure") + .required(false) + .identifiesControllerService(SSLContextService.class) + .build(); public static final PropertyDescriptor MAX_CACHE_ENTRIES = new PropertyDescriptor.Builder() - .name("Maximum Cache Entries") - .description("The maximum number of cache entries that the cache can hold") - .required(true) - .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) - .defaultValue("10000") - .build(); + .name("Maximum Cache Entries") + .description("The maximum number of cache entries that the cache can hold") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("10000") + .build(); public static final PropertyDescriptor EVICTION_POLICY = new PropertyDescriptor.Builder() - .name("Eviction Strategy") - .description("Determines which strategy should be used to evict values from the cache to make room for new entries") - .required(true) - .allowableValues(EVICTION_STRATEGY_LFU, EVICTION_STRATEGY_LRU, EVICTION_STRATEGY_FIFO) - .defaultValue(EVICTION_STRATEGY_LFU) - .build(); + .name("Eviction Strategy") + .description("Determines which strategy should be used to evict values from the cache to make room for new entries") + .required(true) + .allowableValues(EVICTION_STRATEGY_LFU, EVICTION_STRATEGY_LRU, EVICTION_STRATEGY_FIFO) + .defaultValue(EVICTION_STRATEGY_LFU) + .build(); public static final PropertyDescriptor PERSISTENCE_PATH = new PropertyDescriptor.Builder() - .name("Persistence Directory") - .description("If specified, the cache will be persisted in the given directory; if not specified, the cache will be in-memory only") - .required(false) - .addValidator(StandardValidators.createDirectoryExistsValidator(true, true)) - .build(); + .name("Persistence Directory") + .description("If specified, the cache will be persisted in the given directory; if not specified, the cache will be in-memory only") + .required(false) + .addValidator(StandardValidators.createDirectoryExistsValidator(true, true)) + .build(); private volatile CacheServer cacheServer; @@ -103,5 +103,12 @@ public abstract class DistributedCacheServer extends AbstractControllerService { shutdownServer(); } + /** + * @return the port that the server is listening on, or -1 if the server has not been started + */ + public int getPort() { + return cacheServer == null ? -1 : cacheServer.getPort(); + } + protected abstract CacheServer createCacheServer(ConfigurationContext context); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/315af02c/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java index 42698b8..82e4a99 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java +++ b/nifi/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java @@ -16,8 +16,6 @@ */ package org.apache.nifi.distributed.cache.server; -import org.apache.commons.lang3.SystemUtils; -import org.apache.nifi.distributed.cache.server.DistributedSetCacheServer; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -29,8 +27,11 @@ import java.io.OutputStream; import java.net.ConnectException; import java.nio.charset.StandardCharsets; import java.util.HashMap; +import java.util.List; import java.util.Map; +import org.apache.commons.lang3.SerializationException; +import org.apache.commons.lang3.SystemUtils; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.distributed.cache.client.Deserializer; import org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService; @@ -38,16 +39,16 @@ import org.apache.nifi.distributed.cache.client.DistributedSetCacheClientService import org.apache.nifi.distributed.cache.client.Serializer; import org.apache.nifi.distributed.cache.client.exception.DeserializationException; import org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer; +import org.apache.nifi.processor.Processor; +import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.reporting.InitializationException; -import org.apache.nifi.ssl.SSLContextService.ClientAuth; -import org.apache.nifi.ssl.StandardSSLContextService; import org.apache.nifi.util.MockConfigurationContext; import org.apache.nifi.util.MockControllerServiceInitializationContext; - -import org.apache.commons.lang3.SerializationException; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; import org.junit.Assume; -import org.junit.Ignore; import org.junit.Test; +import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,20 +66,24 @@ public class TestServerAndClient { LOGGER = LoggerFactory.getLogger(TestServerAndClient.class); } - @Ignore("Test fails when in a maven parallel build due to address/port already taken - need to vary these so tests can run in parallel") @Test public void testNonPersistentSetServerAndClient() throws InitializationException, IOException { + + /** + * This bypasses the test for build environments in OS X running Java 1.8 due to a JVM bug + * See: https://issues.apache.org/jira/browse/NIFI-437 + */ + Assume.assumeFalse("test is skipped due to build environment being OS X with JDK 1.8. See https://issues.apache.org/jira/browse/NIFI-437", + SystemUtils.IS_OS_MAC && SystemUtils.IS_JAVA_1_8); + LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName()); // Create server - final DistributedSetCacheServer server = new DistributedSetCacheServer(); - MockControllerServiceInitializationContext serverInitContext = new MockControllerServiceInitializationContext(server, "server"); - server.initialize(serverInitContext); - - final Map<PropertyDescriptor, String> serverProperties = new HashMap<>(); - final MockConfigurationContext serverContext = new MockConfigurationContext(serverProperties, serverInitContext.getControllerServiceLookup()); - server.startServer(serverContext); + final TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class)); + final DistributedSetCacheServer server = new SetServer(); + runner.addControllerService("server", server); + runner.enableControllerService(server); - final DistributedSetCacheClientService client = createClient(); + final DistributedSetCacheClientService client = createClient(server.getPort()); final Serializer<String> serializer = new StringSerializer(); final boolean added = client.addIfAbsent("test", serializer); assertTrue(added); @@ -98,24 +103,28 @@ public class TestServerAndClient { server.shutdownServer(); } - @Ignore("Test fails when in a maven parallel build due to address/port already taken - need to vary these so tests can run in parallel") @Test public void testPersistentSetServerAndClient() throws InitializationException, IOException { + /** + * This bypasses the test for build environments in OS X running Java 1.8 due to a JVM bug + * See: https://issues.apache.org/jira/browse/NIFI-437 + */ + Assume.assumeFalse("test is skipped due to build environment being OS X with JDK 1.8. See https://issues.apache.org/jira/browse/NIFI-437", + SystemUtils.IS_OS_MAC && SystemUtils.IS_JAVA_1_8); + LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName()); - // Create server - final DistributedSetCacheServer server = new DistributedSetCacheServer(); - MockControllerServiceInitializationContext serverInitContext = new MockControllerServiceInitializationContext(server, "server"); - server.initialize(serverInitContext); final File dataFile = new File("target/cache-data"); deleteRecursively(dataFile); - final Map<PropertyDescriptor, String> serverProperties = new HashMap<>(); - serverProperties.put(DistributedSetCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath()); - final MockConfigurationContext serverContext = new MockConfigurationContext(serverProperties, serverInitContext.getControllerServiceLookup()); - server.startServer(serverContext); + // Create server + final TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class)); + final DistributedSetCacheServer server = new SetServer(); + runner.addControllerService("server", server); + runner.setProperty(server, DistributedSetCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath()); + runner.enableControllerService(server); - final DistributedSetCacheClientService client = createClient(); + DistributedSetCacheClientService client = createClient(server.getPort()); final Serializer<String> serializer = new StringSerializer(); final boolean added = client.addIfAbsent("test", serializer); final boolean added2 = client.addIfAbsent("test2", serializer); @@ -137,41 +146,45 @@ public class TestServerAndClient { assertFalse(containedAfterRemove); server.shutdownServer(); + client.close(); - final DistributedSetCacheServer newServer = new DistributedSetCacheServer(); - MockControllerServiceInitializationContext newServerInitContext = new MockControllerServiceInitializationContext(newServer, "server2"); - newServer.initialize(newServerInitContext); - - final MockConfigurationContext newServerContext = new MockConfigurationContext(serverProperties, - newServerInitContext.getControllerServiceLookup()); - newServer.startServer(newServerContext); + final DistributedSetCacheServer newServer = new SetServer(); + runner.addControllerService("server2", newServer); + runner.setProperty(newServer, DistributedSetCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath()); + runner.enableControllerService(newServer); + client = createClient(newServer.getPort()); assertFalse(client.contains("test", serializer)); assertTrue(client.contains("test2", serializer)); newServer.shutdownServer(); + client.close(); } - @Ignore("Test fails when in a maven parallel build due to address/port already taken - need to vary these so tests can run in parallel") @Test public void testPersistentSetServerAndClientWithLFUEvictions() throws InitializationException, IOException { + /** + * This bypasses the test for build environments in OS X running Java 1.8 due to a JVM bug + * See: https://issues.apache.org/jira/browse/NIFI-437 + */ + Assume.assumeFalse("test is skipped due to build environment being OS X with JDK 1.8. See https://issues.apache.org/jira/browse/NIFI-437", + SystemUtils.IS_OS_MAC && SystemUtils.IS_JAVA_1_8); + LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName()); // Create server - final DistributedSetCacheServer server = new DistributedSetCacheServer(); - MockControllerServiceInitializationContext serverInitContext = new MockControllerServiceInitializationContext(server, "server"); - server.initialize(serverInitContext); - final File dataFile = new File("target/cache-data"); deleteRecursively(dataFile); - final Map<PropertyDescriptor, String> serverProperties = new HashMap<>(); - serverProperties.put(DistributedSetCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath()); - serverProperties.put(DistributedSetCacheServer.MAX_CACHE_ENTRIES, "3"); - - final MockConfigurationContext serverContext = new MockConfigurationContext(serverProperties, serverInitContext.getControllerServiceLookup()); - server.startServer(serverContext); - - final DistributedSetCacheClientService client = createClient(); + // Create server + final TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class)); + final DistributedSetCacheServer server = new SetServer(); + runner.addControllerService("server", server); + runner.setProperty(server, DistributedSetCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath()); + runner.setProperty(server, DistributedSetCacheServer.MAX_CACHE_ENTRIES, "3"); + runner.setProperty(server, DistributedSetCacheServer.EVICTION_POLICY, DistributedSetCacheServer.EVICTION_STRATEGY_LFU); + runner.enableControllerService(server); + + DistributedSetCacheClientService client = createClient(server.getPort()); final Serializer<String> serializer = new StringSerializer(); final boolean added = client.addIfAbsent("test", serializer); waitABit(); @@ -199,13 +212,13 @@ public class TestServerAndClient { server.shutdownServer(); - final DistributedSetCacheServer newServer = new DistributedSetCacheServer(); - MockControllerServiceInitializationContext newServerInitContext = new MockControllerServiceInitializationContext(newServer, "server2"); - newServer.initialize(newServerInitContext); - final MockConfigurationContext newServerContext = new MockConfigurationContext(serverProperties, - newServerInitContext.getControllerServiceLookup()); - newServer.startServer(newServerContext); + final DistributedSetCacheServer newServer = new SetServer(); + runner.addControllerService("server2", newServer); + runner.setProperty(newServer, DistributedSetCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath()); + runner.enableControllerService(newServer); + client.close(); + client = createClient(newServer.getPort()); assertTrue(client.contains("test", serializer)); assertTrue(client.contains("test2", serializer)); @@ -213,29 +226,33 @@ public class TestServerAndClient { assertTrue(client.contains("test4", serializer)); newServer.shutdownServer(); + client.close(); } - @Ignore("Test fails when in a maven parallel build due to address/port already taken - need to vary these so tests can run in parallel") @Test public void testPersistentSetServerAndClientWithFIFOEvictions() throws InitializationException, IOException { + /** + * This bypasses the test for build environments in OS X running Java 1.8 due to a JVM bug + * See: https://issues.apache.org/jira/browse/NIFI-437 + */ + Assume.assumeFalse("test is skipped due to build environment being OS X with JDK 1.8. See https://issues.apache.org/jira/browse/NIFI-437", + SystemUtils.IS_OS_MAC && SystemUtils.IS_JAVA_1_8); + LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName()); - // Create server - final DistributedSetCacheServer server = new DistributedSetCacheServer(); - MockControllerServiceInitializationContext serverInitContext = new MockControllerServiceInitializationContext(server, "server"); - server.initialize(serverInitContext); final File dataFile = new File("target/cache-data"); deleteRecursively(dataFile); - final Map<PropertyDescriptor, String> serverProperties = new HashMap<>(); - serverProperties.put(DistributedSetCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath()); - serverProperties.put(DistributedSetCacheServer.MAX_CACHE_ENTRIES, "3"); - serverProperties.put(DistributedSetCacheServer.EVICTION_POLICY, DistributedSetCacheServer.EVICTION_STRATEGY_FIFO); - - final MockConfigurationContext serverContext = new MockConfigurationContext(serverProperties, serverInitContext.getControllerServiceLookup()); - server.startServer(serverContext); - - final DistributedSetCacheClientService client = createClient(); + // Create server + final TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class)); + final DistributedSetCacheServer server = new SetServer(); + runner.addControllerService("server", server); + runner.setProperty(server, DistributedSetCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath()); + runner.setProperty(server, DistributedSetCacheServer.MAX_CACHE_ENTRIES, "3"); + runner.setProperty(server, DistributedSetCacheServer.EVICTION_POLICY, DistributedSetCacheServer.EVICTION_STRATEGY_FIFO); + runner.enableControllerService(server); + + DistributedSetCacheClientService client = createClient(server.getPort()); final Serializer<String> serializer = new StringSerializer(); // add 3 entries to the cache. But, if we add too fast, we'll have the same millisecond @@ -267,35 +284,42 @@ public class TestServerAndClient { assertTrue(client.contains("test3", serializer)); server.shutdownServer(); + client.close(); - final DistributedSetCacheServer newServer = new DistributedSetCacheServer(); - MockControllerServiceInitializationContext newServerInitContext = new MockControllerServiceInitializationContext(newServer, "server2"); - newServer.initialize(newServerInitContext); - final MockConfigurationContext newServerContext = new MockConfigurationContext(serverProperties, - newServerInitContext.getControllerServiceLookup()); - newServer.startServer(newServerContext); + final DistributedSetCacheServer newServer = new SetServer(); + runner.addControllerService("server2", newServer); + runner.setProperty(newServer, DistributedSetCacheServer.PERSISTENCE_PATH, dataFile.getAbsolutePath()); + runner.setProperty(newServer, DistributedSetCacheServer.MAX_CACHE_ENTRIES, "3"); + runner.setProperty(newServer, DistributedSetCacheServer.EVICTION_POLICY, DistributedSetCacheServer.EVICTION_STRATEGY_FIFO); + runner.enableControllerService(newServer); + client = createClient(newServer.getPort()); assertFalse(client.contains("test", serializer)); assertTrue(client.contains("test2", serializer)); assertTrue(client.contains("test3", serializer)); assertTrue(client.contains("test4", serializer)); newServer.shutdownServer(); + client.close(); } - @Ignore("Test fails when in a maven parallel build due to address/port already taken - need to vary these so tests can run in parallel") @Test public void testNonPersistentMapServerAndClient() throws InitializationException, IOException, InterruptedException { + /** + * This bypasses the test for build environments in OS X running Java 1.8 due to a JVM bug + * See: https://issues.apache.org/jira/browse/NIFI-437 + */ + Assume.assumeFalse("test is skipped due to build environment being OS X with JDK 1.8. See https://issues.apache.org/jira/browse/NIFI-437", + SystemUtils.IS_OS_MAC && SystemUtils.IS_JAVA_1_8); + LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName()); - // Create server - final DistributedMapCacheServer server = new DistributedMapCacheServer(); - MockControllerServiceInitializationContext serverInitContext = new MockControllerServiceInitializationContext(server, "server"); - server.initialize(serverInitContext); - final Map<PropertyDescriptor, String> serverProperties = new HashMap<>(); - final MockConfigurationContext serverContext = new MockConfigurationContext(serverProperties, serverInitContext.getControllerServiceLookup()); - server.startServer(serverContext); + // Create server + final DistributedMapCacheServer server = new MapServer(); + final TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class)); + runner.addControllerService("server", server); + runner.enableControllerService(server); DistributedMapCacheClientService client = new DistributedMapCacheClientService(); MockControllerServiceInitializationContext clientInitContext = new MockControllerServiceInitializationContext(client, "client"); @@ -303,6 +327,7 @@ public class TestServerAndClient { final Map<PropertyDescriptor, String> clientProperties = new HashMap<>(); clientProperties.put(DistributedMapCacheClientService.HOSTNAME, "localhost"); + clientProperties.put(DistributedMapCacheClientService.PORT, String.valueOf(server.getPort())); clientProperties.put(DistributedMapCacheClientService.COMMUNICATIONS_TIMEOUT, "360 secs"); MockConfigurationContext clientContext = new MockConfigurationContext(clientProperties, clientInitContext.getControllerServiceLookup()); client.cacheConfig(clientContext); @@ -338,7 +363,7 @@ public class TestServerAndClient { try { client.containsKey("testKey", keySerializer); fail("Should be closed and not accessible"); - } catch (Exception e) { + } catch (final Exception e) { } client = null; @@ -346,12 +371,11 @@ public class TestServerAndClient { clientContext = null; DistributedMapCacheClientService client2 = new DistributedMapCacheClientService(); - MockControllerServiceInitializationContext clientInitContext2 = new MockControllerServiceInitializationContext(client2, "client2"); client2.initialize(clientInitContext2); MockConfigurationContext clientContext2 = new MockConfigurationContext(clientProperties, - clientInitContext2.getControllerServiceLookup()); + clientInitContext2.getControllerServiceLookup()); client2.cacheConfig(clientContext2); assertFalse(client2.putIfAbsent("testKey", "test", keySerializer, valueSerializer)); assertTrue(client2.containsKey("testKey", keySerializer)); @@ -360,13 +384,11 @@ public class TestServerAndClient { try { client2.containsKey("testKey", keySerializer); fail("Should have blown exception!"); - } catch (ConnectException e) { + } catch (final ConnectException e) { client2 = null; clientContext2 = null; clientInitContext2 = null; } - Thread.sleep(2000); - System.gc(); LOGGER.debug("end testNonPersistentMapServerAndClient"); } @@ -377,12 +399,12 @@ public class TestServerAndClient { * This bypasses the test for build environments in OS X running Java 1.8 due to a JVM bug See: https://issues.apache.org/jira/browse/NIFI-437 */ Assume.assumeFalse("testClientTermination is skipped due to build environment being OS X with JDK 1.8. See https://issues.apache.org/jira/browse/NIFI-437", - SystemUtils.IS_OS_MAC && SystemUtils.IS_JAVA_1_8); + SystemUtils.IS_OS_MAC && SystemUtils.IS_JAVA_1_8); LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName()); // Create server - final DistributedMapCacheServer server = new DistributedMapCacheServer(); - MockControllerServiceInitializationContext serverInitContext = new MockControllerServiceInitializationContext(server, "server"); + final DistributedMapCacheServer server = new MapServer(); + final MockControllerServiceInitializationContext serverInitContext = new MockControllerServiceInitializationContext(server, "server"); server.initialize(serverInitContext); final Map<PropertyDescriptor, String> serverProperties = new HashMap<>(); @@ -428,65 +450,6 @@ public class TestServerAndClient { server.shutdownServer(); } - @Ignore - @Test - public void testSSLWith2RequestsWithServerTimeout() throws InitializationException, IOException, InterruptedException { - LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName()); - // Create SSLContext Service - final StandardSSLContextService sslService = new StandardSSLContextService(); - final MockControllerServiceInitializationContext sslServerInitContext = new MockControllerServiceInitializationContext(sslService, - "ssl-context"); - sslService.initialize(sslServerInitContext); - - final Map<PropertyDescriptor, String> sslServerProps = new HashMap<>(); - sslServerProps.put(StandardSSLContextService.KEYSTORE, "src/test/resources/localhost-ks.jks"); - sslServerProps.put(StandardSSLContextService.KEYSTORE_PASSWORD, "localtest"); - sslServerProps.put(StandardSSLContextService.KEYSTORE_TYPE, "JKS"); - sslServerProps.put(StandardSSLContextService.TRUSTSTORE, "src/test/resources/localhost-ts.jks"); - sslServerProps.put(StandardSSLContextService.TRUSTSTORE_PASSWORD, "localtest"); - sslServerProps.put(StandardSSLContextService.TRUSTSTORE_TYPE, "JKS"); - MockConfigurationContext sslServerContext = new MockConfigurationContext(sslServerProps, sslServerInitContext); - sslService.onConfigured(sslServerContext); - sslService.createSSLContext(ClientAuth.REQUIRED); - // Create server - final DistributedMapCacheServer server = new DistributedMapCacheServer(); - final MockControllerServiceInitializationContext serverInitContext = new MockControllerServiceInitializationContext(server, "server"); - server.initialize(serverInitContext); - - final Map<PropertyDescriptor, String> serverProperties = new HashMap<>(); - serverProperties.put(DistributedMapCacheServer.SSL_CONTEXT_SERVICE, "ssl-context"); - final MockConfigurationContext serverContext = new MockConfigurationContext(serverProperties, serverInitContext.getControllerServiceLookup()); - server.startServer(serverContext); - - DistributedMapCacheClientService client = new DistributedMapCacheClientService(); - MockControllerServiceInitializationContext clientInitContext = new MockControllerServiceInitializationContext(client, "client"); - client.initialize(clientInitContext); - - final Map<PropertyDescriptor, String> clientProperties = new HashMap<>(); - clientProperties.put(DistributedMapCacheClientService.HOSTNAME, "localhost"); - clientProperties.put(DistributedMapCacheClientService.COMMUNICATIONS_TIMEOUT, "360 secs"); - clientProperties.put(DistributedMapCacheClientService.SSL_CONTEXT_SERVICE, "ssl-context"); - MockConfigurationContext clientContext = new MockConfigurationContext(clientProperties, clientInitContext.getControllerServiceLookup()); - client.cacheConfig(clientContext); - final Serializer<String> valueSerializer = new StringSerializer(); - final Serializer<String> keySerializer = new StringSerializer(); - final Deserializer<String> deserializer = new StringDeserializer(); - - final String original = client.getAndPutIfAbsent("testKey", "test", keySerializer, valueSerializer, deserializer); - assertEquals(null, original); - - Thread.sleep(30000); - try { - final boolean contains = client.containsKey("testKey", keySerializer); - assertTrue(contains); - } catch (IOException e) { - // this is due to the server timing out in the middle of this request - assertTrue(e.getMessage().contains("Channel is closed")); - } - - server.shutdownServer(); - } - private void waitABit() { try { Thread.sleep(10L); @@ -494,13 +457,14 @@ public class TestServerAndClient { } } - private DistributedSetCacheClientService createClient() throws InitializationException { + private DistributedSetCacheClientService createClient(final int port) throws InitializationException { final DistributedSetCacheClientService client = new DistributedSetCacheClientService(); - MockControllerServiceInitializationContext clientInitContext = new MockControllerServiceInitializationContext(client, "client"); + final MockControllerServiceInitializationContext clientInitContext = new MockControllerServiceInitializationContext(client, "client"); client.initialize(clientInitContext); final Map<PropertyDescriptor, String> clientProperties = new HashMap<>(); clientProperties.put(DistributedSetCacheClientService.HOSTNAME, "localhost"); + clientProperties.put(DistributedSetCacheClientService.PORT, String.valueOf(port)); final MockConfigurationContext clientContext = new MockConfigurationContext(clientProperties, clientInitContext.getControllerServiceLookup()); client.onConfigured(clientContext); @@ -519,7 +483,7 @@ public class TestServerAndClient { @Override public String deserialize(final byte[] input) throws DeserializationException, IOException { - return (input.length == 0) ? null : new String(input, StandardCharsets.UTF_8); + return input.length == 0 ? null : new String(input, StandardCharsets.UTF_8); } } @@ -543,4 +507,30 @@ public class TestServerAndClient { } } } + + private static List<PropertyDescriptor> replacePortDescriptor(final List<PropertyDescriptor> descriptors) { + descriptors.remove(DistributedCacheServer.PORT); + descriptors.add(new PropertyDescriptor.Builder() + .name("Port") + .description("The port to listen on for incoming connections") + .required(true) + .addValidator(StandardValidators.createLongValidator(0L, 65535L, true)) + .defaultValue("0") + .build()); + return descriptors; + } + + private static class SetServer extends DistributedSetCacheServer { + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return replacePortDescriptor(super.getSupportedPropertyDescriptors()); + } + } + + private static class MapServer extends DistributedMapCacheServer { + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return replacePortDescriptor(super.getSupportedPropertyDescriptors()); + } + } }
