http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/WANLocatorServerDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/WANLocatorServerDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/WANLocatorServerDUnitTest.java new file mode 100644 index 0000000..7973c05 --- /dev/null +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/WANLocatorServerDUnitTest.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.wan.misc; + +import static com.gemstone.gemfire.distributed.ConfigurationProperties.*; +import static com.gemstone.gemfire.test.dunit.Assert.*; + +import java.io.IOException; +import java.util.Properties; + +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.gemstone.gemfire.cache.Cache; +import com.gemstone.gemfire.cache.CacheFactory; +import com.gemstone.gemfire.cache.client.ClientCacheFactory; +import com.gemstone.gemfire.cache.client.PoolManager; +import com.gemstone.gemfire.cache.client.internal.Connection; +import com.gemstone.gemfire.cache.client.internal.PoolImpl; +import com.gemstone.gemfire.cache.server.CacheServer; +import com.gemstone.gemfire.cache.wan.GatewayReceiver; +import com.gemstone.gemfire.cache.wan.GatewayReceiverFactory; +import com.gemstone.gemfire.cache.wan.GatewaySender; +import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; +import com.gemstone.gemfire.internal.AvailablePortHelper; +import com.gemstone.gemfire.internal.cache.PoolFactoryImpl; +import com.gemstone.gemfire.internal.cache.wan.WANTestBase; +import com.gemstone.gemfire.test.dunit.Assert; +import com.gemstone.gemfire.test.dunit.Host; +import com.gemstone.gemfire.test.dunit.LogWriterUtils; +import com.gemstone.gemfire.test.junit.categories.DistributedTest; + +@Category(DistributedTest.class) +public class WANLocatorServerDUnitTest extends WANTestBase { + + static PoolImpl proxy; + + @Override + protected final void postSetUpWANTestBase() throws Exception { + final Host host = Host.getHost(0); + } + + @Test + public void test_3Locators_2Servers() { + + int port1 = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); + + int port2 = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); + + int port3 = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); + + vm0.invoke(() -> WANLocatorServerDUnitTest.createLocator( + port1, port2, port3, port1 )); + + vm1.invoke(() -> WANLocatorServerDUnitTest.createLocator( + port1, port2, port3, port2 )); + + vm2.invoke(() -> WANLocatorServerDUnitTest.createLocator( + port1, port2, port3, port3 )); + + vm3.invoke(() -> WANLocatorServerDUnitTest.createReceiver( + port1, port2, port3 )); + vm5.invoke(() -> WANLocatorServerDUnitTest.createClient( + port1, port2, port3 )); + + vm0.invoke(() -> WANLocatorServerDUnitTest.disconnect()); + vm1.invoke(() -> WANLocatorServerDUnitTest.disconnect()); + vm2.invoke(() -> WANLocatorServerDUnitTest.disconnect()); + + vm0.invoke(() -> WANLocatorServerDUnitTest.createLocator( + port1, port2, port3, port1 )); + + vm1.invoke(() -> WANLocatorServerDUnitTest.createLocator( + port1, port2, port3, port2 )); + + vm2.invoke(() -> WANLocatorServerDUnitTest.createLocator( + port1, port2, port3, port3 )); + + vm5.invoke(() -> WANLocatorServerDUnitTest.tryNewConnection()); + } + + public static void createLocator(Integer port1, Integer port2, Integer port3, + Integer startingPort) { + WANTestBase test = new WANTestBase(getTestMethodName()); + Properties props = test.getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(DISTRIBUTED_SYSTEM_ID, "" + 1); + props.setProperty(LOCATORS, "localhost[" + port1 + + "],localhost[" + port2 + "],localhost[" + port3 + "]"); + props.setProperty(START_LOCATOR, "localhost[" + + startingPort + + "],server=true,peer=true,hostname-for-clients=localhost"); + test.getSystem(props); + } + + public static void createReceiver(Integer port1, Integer port2, Integer port3) { + WANTestBase test = new WANTestBase(getTestMethodName()); + Properties props = test.getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(LOCATORS, "localhost[" + port1 + + "],localhost[" + port2 + "],localhost[" + port3 + "]"); + + InternalDistributedSystem ds = test.getSystem(props); + cache = CacheFactory.create(ds); + GatewayReceiverFactory fact = cache.createGatewayReceiverFactory(); + int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); + fact.setStartPort(port); + fact.setEndPort(port); + fact.setManualStart(true); + GatewayReceiver receiver = fact.create(); + try { + receiver.start(); + } + catch (IOException e) { + fail("Test " + test.getName() + " failed to start GatewayReceiver on port " + port, e); + } + } + + public static void createServer(Integer port1, Integer port2, Integer port3) { + WANTestBase test = new WANTestBase(getTestMethodName()); + Properties props = test.getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(LOCATORS, "localhost[" + port1 + + "],localhost[" + port2 + "],localhost[" + port3 + "]"); + + InternalDistributedSystem ds = test.getSystem(props); + cache = CacheFactory.create(ds); + CacheServer server = cache.addCacheServer(); + int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); + server.setPort(port); + try { + server.start(); + } + catch (IOException e) { + fail("Test " + test.getName() + " failed to start CacheServer on port " + port, e); + } + LogWriterUtils.getLogWriter().info( + "Server Started on port : " + port + " : server : " + server); + } + + public static void disconnect() { + WANTestBase test = new WANTestBase(getTestMethodName()); + test.getSystem().disconnect(); + } + + public static void createClient(Integer port1, Integer port2, Integer port3) { + ClientCacheFactory cf = new ClientCacheFactory(); + cache = (Cache)cf.create(); + PoolFactoryImpl pf = (PoolFactoryImpl)PoolManager.createFactory(); + pf.setReadTimeout(0); + pf.setIdleTimeout(-1); + pf.setMinConnections(4); + pf.setServerGroup(GatewayReceiver.RECEIVER_GROUP); + pf.addLocator("localhost", port1); + pf.addLocator("localhost", port2); + pf.addLocator("localhost", port3); + pf.init((GatewaySender)null); + proxy = ((PoolImpl)pf.create("KISHOR_POOL")); + Connection con1 = proxy.acquireConnection(); + try { + con1.close(true); + } + catch (Exception e) { + fail("createClient failed", e); + } + } + + public static void tryNewConnection() { + Connection con1 = null; + try { + con1 = proxy.acquireConnection(); + } + catch (Exception e) { + Assert.fail("No Exception expected", e); + } + + } +}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/WANSSLDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/WANSSLDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/WANSSLDUnitTest.java new file mode 100644 index 0000000..00bfa1a --- /dev/null +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/WANSSLDUnitTest.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.wan.misc; + +import org.junit.experimental.categories.Category; +import org.junit.Test; + +import static org.junit.Assert.*; + +import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase; +import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase; +import com.gemstone.gemfire.test.junit.categories.DistributedTest; + +import org.junit.experimental.categories.Category; + +import com.gemstone.gemfire.cache.Region; +import com.gemstone.gemfire.internal.cache.wan.WANTestBase; +import com.gemstone.gemfire.test.dunit.IgnoredException; +import com.gemstone.gemfire.test.dunit.Wait; +import com.gemstone.gemfire.test.dunit.WaitCriterion; +import com.gemstone.gemfire.test.junit.categories.FlakyTest; + +@Category(DistributedTest.class) +public class WANSSLDUnitTest extends WANTestBase{ + + public WANSSLDUnitTest() { + super(); + } + + @Test + public void testSenderSSLReceiverSSL(){ + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + vm2.invoke(() -> WANTestBase.createReceiverWithSSL( nyPort )); + + vm4.invoke(() -> WANTestBase.createCacheWithSSL( lnPort )); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, null, true )); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + + vm4.invoke(() -> WANTestBase.startSender( "ln" )); + + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", + 1000 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + } + + @Test + public void testSenderNoSSLReceiverSSL() { + IgnoredException.addIgnoredException("Unexpected IOException"); + IgnoredException.addIgnoredException("SSL Error"); + IgnoredException.addIgnoredException("Unrecognized SSL message"); + try { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + vm2.invoke(() -> WANTestBase.createReceiverWithSSL( nyPort )); + + vm4.invoke(() -> WANTestBase.createCache( lnPort )); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, null, true )); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + + vm4.invoke(() -> WANTestBase.startSender( "ln" )); + + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", + 1000 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + fail("Expected exception as only Receiver is SSL enabled. Not Sender"); + } + catch (Exception e) { + assertTrue(e.getCause().getMessage().contains("Server expecting SSL connection")); + } + } + + @Test + public void testSenderSSLReceiverNoSSL(){ + IgnoredException.addIgnoredException("Acceptor received unknown"); + IgnoredException.addIgnoredException("failed accepting client"); + IgnoredException.addIgnoredException("Error in connecting to peer"); + IgnoredException.addIgnoredException("Remote host closed connection during handshake"); + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2); + vm2.invoke(() -> WANTestBase.createReceiver()); + + vm4.invoke(() -> WANTestBase.createCacheWithSSL( lnPort )); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, null, true )); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + + vm4.invoke(() -> WANTestBase.startSender( "ln" )); + + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", + 1 )); + + Boolean doesSizeMatch = (Boolean)vm2.invoke(() -> WANSSLDUnitTest.ValidateSSLRegionSize( + getTestMethodName() + "_RR", 1 )); + + assertFalse(doesSizeMatch); + } + + public static boolean ValidateSSLRegionSize (String regionName, final int regionSize) { + final Region r = cache.getRegion(Region.SEPARATOR + regionName); + assertNotNull(r); + WaitCriterion wc = new WaitCriterion() { + public boolean done() { + return false; + } + + public String description() { + return null; + } + }; + Wait.waitForCriterion(wc, 2000, 500, false); + + if(r.size() == regionSize){ + return true; + } + return false; + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/WanAutoDiscoveryDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/WanAutoDiscoveryDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/WanAutoDiscoveryDUnitTest.java new file mode 100644 index 0000000..4148789 --- /dev/null +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/WanAutoDiscoveryDUnitTest.java @@ -0,0 +1,561 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.wan.misc; + +import org.junit.Ignore; +import org.junit.experimental.categories.Category; +import org.junit.Test; + +import static org.junit.Assert.*; + +import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase; +import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase; +import com.gemstone.gemfire.test.junit.categories.DistributedTest; + +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import com.gemstone.gemfire.GemFireConfigException; +import com.gemstone.gemfire.IncompatibleSystemException; +import com.gemstone.gemfire.internal.AvailablePortHelper; +import com.gemstone.gemfire.internal.OSProcess; +import com.gemstone.gemfire.internal.cache.wan.WANTestBase; +import com.gemstone.gemfire.test.dunit.Assert; +import com.gemstone.gemfire.test.dunit.AsyncInvocation; +import com.gemstone.gemfire.test.dunit.Host; +import com.gemstone.gemfire.test.dunit.LogWriterUtils; + +@Category(DistributedTest.class) +public class WanAutoDiscoveryDUnitTest extends WANTestBase { + + + public WanAutoDiscoveryDUnitTest() { + super(); + } + + @Override + protected void postSetUpWANTestBase() throws Exception { + final Host host = Host.getHost(0); + } + + /** + * Test to validate that sender can not be started without locator started. + * else GemFireConfigException will be thrown. + */ + @Test + public void test_GatewaySender_Started_Before_Locator() { + try { + int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); + vm0.invoke(() -> WANTestBase.createCache(port)); + vm0.invoke(() -> WANTestBase.createSender("ln",2,false,100,10,false,false, null, false)); + fail("Expected GemFireConfigException but not thrown"); + } + catch (Exception e) { + if (!(e.getCause() instanceof GemFireConfigException)) { + Assert.fail("Expected GemFireConfigException but received :", e); + } + } + } + + /** + * Test to validate that all locators in one DS should have same name. Though + * this test passes, it causes other below tests to fail. In this test, VM1 is + * throwing IncompatibleSystemException after startInitLocator. I think, after + * throwing this exception, locator is not stopped properly and hence other + * tests are failing. + * + * @throws Exception + */ + @Ignore + @Test + public void test_AllLocatorsInDSShouldHaveDistributedSystemId() throws Exception { + try { + Integer lnLocPort1 = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); + + Integer lnLocPort2 = (Integer)vm1.invoke(() -> WANTestBase.createSecondLocator( 2, + lnLocPort1 )); + fail("Expected IncompatibleSystemException but not thrown"); + } + catch (Exception e) { + if (!(e.getCause()instanceof IncompatibleSystemException)) { + Assert.fail("Expected IncompatibleSystemException but received :", e); + } + } + } + + /** + * Test to validate that multiple locators added on LN site and multiple + * locators on Ny site recognizes each other + * @throws Exception + */ + @Test + public void test_NY_Recognises_ALL_LN_Locators() throws Exception { + Set<InetSocketAddress> locatorPorts = new HashSet<>(); + Map<Integer, Set<InetSocketAddress>> dsVsPort = new HashMap<>(); + dsVsPort.put(1, locatorPorts); + + Integer lnLocPort1 = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); + locatorPorts.add(new InetSocketAddress("localhost", lnLocPort1)); + + Integer lnLocPort2 = (Integer)vm1.invoke(() -> WANTestBase.createSecondLocator( 1, lnLocPort1 )); + locatorPorts.add(new InetSocketAddress("localhost", lnLocPort2)); + + locatorPorts = new HashSet<>(); + dsVsPort.put(2, locatorPorts); + Integer nyLocPort1 = (Integer)vm2.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnLocPort1 )); + locatorPorts.add(new InetSocketAddress("localhost", nyLocPort1)); + + Integer nyLocPort2 = (Integer)vm3.invoke(() -> WANTestBase.createSecondRemoteLocator( + 2, nyLocPort1, lnLocPort1)); + locatorPorts.add(new InetSocketAddress("localhost", nyLocPort2)); + + vm0.invoke(() -> WANTestBase.checkAllSiteMetaData( dsVsPort )); + vm1.invoke(() -> WANTestBase.checkAllSiteMetaData( dsVsPort )); + vm2.invoke(() -> WANTestBase.checkAllSiteMetaData( dsVsPort )); + vm3.invoke(() -> WANTestBase.checkAllSiteMetaData( dsVsPort )); + } + + /** + * Test to validate that multiple locators added two sets receive eachothers + * hostname for client setting even when the locator is started through the API. + */ + @Test + public void locatorsReceiveHostnameForClientsFromRemoteSite() throws Exception { + Set<InetSocketAddress> locatorPorts = new HashSet<>(); + Map<Integer, Set<InetSocketAddress>> dsVsPort = new HashMap<>(); + dsVsPort.put(1, locatorPorts); + + Integer lnLocPort1 = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); + locatorPorts.add(new InetSocketAddress("localhost", lnLocPort1)); + + Integer lnLocPort2 = (Integer)vm1.invoke(() -> WANTestBase.createSecondLocator( 1, lnLocPort1 )); + locatorPorts.add(new InetSocketAddress("localhost", lnLocPort2)); + + locatorPorts = new HashSet<>(); + dsVsPort.put(2, locatorPorts); + Integer nyLocPort1 = (Integer)vm2.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnLocPort1 )); + locatorPorts.add(new InetSocketAddress("localhost", nyLocPort1)); + + Integer nyLocPort2 = (Integer)vm3.invoke(() -> WANTestBase.createSecondRemoteLocatorWithAPI( + 2, nyLocPort1, lnLocPort1, "localhost")); + locatorPorts.add(new InetSocketAddress("localhost", nyLocPort2)); + + vm0.invoke(() -> WANTestBase.checkAllSiteMetaData( dsVsPort )); + vm1.invoke(() -> WANTestBase.checkAllSiteMetaData( dsVsPort )); + vm2.invoke(() -> WANTestBase.checkAllSiteMetaData( dsVsPort )); + vm3.invoke(() -> WANTestBase.checkAllSiteMetaData( dsVsPort )); + } + + /** + * Test to validate that TK site's locator is recognized by LN and NY. Test to + * validate that HK site's locator is recognized by LN , NY, TK. + */ + @Test + public void test_NY_Recognises_TK_AND_HK_Through_LN_Locator() { + + Map<Integer, Set<InetSocketAddress>> dsVsPort = new HashMap<>(); + + Set<InetSocketAddress> locatorPorts = new HashSet<>(); + dsVsPort.put(1, locatorPorts); + + Integer lnLocPort1 = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + locatorPorts.add(new InetSocketAddress("localhost", lnLocPort1)); + + locatorPorts = new HashSet<>(); + dsVsPort.put(2, locatorPorts); + Integer nyLocPort1 = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnLocPort1 )); + locatorPorts.add(new InetSocketAddress("localhost", nyLocPort1)); + + locatorPorts = new HashSet<>(); + dsVsPort.put(3, locatorPorts); + Integer tkLocPort = (Integer)vm2.invoke(() -> WANTestBase.createFirstRemoteLocator( 3, lnLocPort1 )); + locatorPorts.add(new InetSocketAddress("localhost", tkLocPort)); + + locatorPorts = new HashSet<>(); + dsVsPort.put(4, locatorPorts); + Integer hkLocPort = (Integer)vm3.invoke(() -> WANTestBase.createFirstRemoteLocator( 4, lnLocPort1 )); + locatorPorts.add(new InetSocketAddress("localhost", hkLocPort)); + + vm0.invoke(() -> WANTestBase.checkAllSiteMetaData( dsVsPort )); + vm1.invoke(() -> WANTestBase.checkAllSiteMetaData( dsVsPort )); + vm2.invoke(() -> WANTestBase.checkAllSiteMetaData( dsVsPort )); + vm3.invoke(() -> WANTestBase.checkAllSiteMetaData( dsVsPort )); + } + + @Test + public void test_TK_Recognises_LN_AND_NY() { + + Map<Integer, Set<InetSocketAddress>> dsVsPort = new HashMap<>(); + + Set<InetSocketAddress> locatorPorts = new HashSet<>(); + dsVsPort.put(1, locatorPorts); + + Integer lnLocPort1 = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + locatorPorts.add(new InetSocketAddress("localhost", lnLocPort1)); + + locatorPorts = new HashSet<>(); + dsVsPort.put(2, locatorPorts); + Integer nyLocPort1 = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnLocPort1 )); + locatorPorts.add(new InetSocketAddress("localhost", nyLocPort1)); + + locatorPorts = new HashSet<>(); + dsVsPort.put(3, locatorPorts); + Integer tkLocPort = (Integer)vm2.invoke(() -> WANTestBase.createFirstRemoteLocator( 3, nyLocPort1 )); + locatorPorts.add(new InetSocketAddress("localhost", tkLocPort)); + + + vm0.invoke(() -> WANTestBase.checkAllSiteMetaData( dsVsPort )); + vm1.invoke(() -> WANTestBase.checkAllSiteMetaData( dsVsPort )); + vm2.invoke(() -> WANTestBase.checkAllSiteMetaData( dsVsPort )); + } + + @Test + public void test_NY_Recognises_TK_AND_HK_Simultaneously() { + Map<Integer, Set<InetSocketAddress>> dsVsPort = new HashMap<>(); + + Set<InetSocketAddress> locatorPortsln = new HashSet<>(); + dsVsPort.put(1, locatorPortsln); + Integer lnLocPort1 = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + locatorPortsln.add(new InetSocketAddress("localhost", lnLocPort1)); + + Set<InetSocketAddress> locatorPortsny = new HashSet<>(); + dsVsPort.put(2, locatorPortsny); + Integer nyLocPort1 = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnLocPort1 )); + locatorPortsny.add(new InetSocketAddress("localhost", nyLocPort1)); + + int AsyncInvocationArrSize = 4; + AsyncInvocation[] async = new AsyncInvocation[AsyncInvocationArrSize]; + + Set<InetSocketAddress> locatorPortstk = new HashSet<>(); + dsVsPort.put(3, locatorPortstk); + async[0] = vm2.invokeAsync(() -> WANTestBase.createFirstRemoteLocator( 3, lnLocPort1 )); + + Set<InetSocketAddress> locatorPortshk = new HashSet<>(); + dsVsPort.put(4, locatorPortshk); + async[1] = vm3.invokeAsync(() -> WANTestBase.createFirstRemoteLocator(4, nyLocPort1)); + + ArrayList<Integer> locatorPortsln2 = new ArrayList<Integer>(); + async[2] = vm4.invokeAsync(() -> WANTestBase.createSecondLocator( 1, lnLocPort1 )); + + ArrayList<Integer> locatorPortsny2 = new ArrayList<Integer>(); + async[3] = vm5.invokeAsync(() -> WANTestBase.createSecondLocator( 2, nyLocPort1 )); + + + try { + async[0].join(); + async[1].join(); + async[2].join(); + async[3].join(); + } catch (InterruptedException e) { + e.printStackTrace(); + fail(); + } + + locatorPortstk.add(new InetSocketAddress("localhost", (Integer)async[0].getReturnValue())); + locatorPortshk.add(new InetSocketAddress("localhost", (Integer)async[1].getReturnValue())); + locatorPortsln.add(new InetSocketAddress("localhost", (Integer)async[2].getReturnValue())); + locatorPortsny.add(new InetSocketAddress("localhost", (Integer)async[3].getReturnValue())); + + vm0.invoke(() -> WANTestBase.checkAllSiteMetaData( dsVsPort )); + vm1.invoke(() -> WANTestBase.checkAllSiteMetaData( dsVsPort )); + vm2.invoke(() -> WANTestBase.checkAllSiteMetaData( dsVsPort )); + vm3.invoke(() -> WANTestBase.checkAllSiteMetaData( dsVsPort )); + } + + + @Test + public void test_LN_Sender_recognises_ALL_NY_Locators() { + + Integer lnLocPort1 = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + + Integer lnLocPort2 = (Integer)vm5.invoke(() -> WANTestBase.createSecondLocator( 1, lnLocPort1 )); + + vm2.invoke(() -> WANTestBase.createCache(lnLocPort1, lnLocPort2)); + + vm2.invoke(() -> WANTestBase.createSender("ln",2,false,100,10,false,false, null, true)); + + Integer nyLocPort1 = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnLocPort1 )); + + vm2.invoke(() -> WANTestBase.startSender("ln")); + + //Since to fix Bug#46289, we have moved call to initProxy in getConnection which will be called only when batch is getting dispatched. + //So for locator discovery callback to work, its now expected that atleast try to send a batch so that proxy will be initialized + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + + vm2.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", 10)); + + Integer nyLocPort2 = (Integer)vm3 + .invoke(() -> WANTestBase.createSecondRemoteLocator( + 2, nyLocPort1, lnLocPort1 )); + + InetSocketAddress locatorToWaitFor = new InetSocketAddress("localhost", + nyLocPort2); + + vm2.invoke(() -> WANTestBase.checkLocatorsinSender("ln", locatorToWaitFor )); + + Integer nyLocPort3 = (Integer)vm4 + .invoke(() -> WANTestBase.createSecondRemoteLocator( + 2, nyLocPort1, lnLocPort1 )); + + InetSocketAddress locatorToWaitFor2 = new InetSocketAddress("localhost", nyLocPort3); + + vm2.invoke(() -> WANTestBase.checkLocatorsinSender("ln", locatorToWaitFor2 )); + + } + + @Test + public void test_RingTopology() { + + int [] ports = AvailablePortHelper.getRandomAvailableTCPPortsForDUnitSite(4); + + final Set<String> site1LocatorsPort = new HashSet<String>(); + site1LocatorsPort.add("localhost["+ports[0]+"]"); + + final Set<String> site2LocatorsPort = new HashSet<String>(); + site2LocatorsPort.add("localhost["+ports[1]+"]"); + + final Set<String> site3LocatorsPort = new HashSet<String>(); + site3LocatorsPort.add("localhost["+ports[2]+"]"); + + final Set<String> site4LocatorsPort = new HashSet<String>(); + site4LocatorsPort.add("localhost["+ports[3]+"]"); + + Map<Integer, Set<String>> dsVsPort = new HashMap<Integer, Set<String>>(); + dsVsPort.put(1, site1LocatorsPort); + dsVsPort.put(2, site2LocatorsPort); + dsVsPort.put(3, site3LocatorsPort); + dsVsPort.put(4, site4LocatorsPort); + + int AsyncInvocationArrSize = 9; + AsyncInvocation[] async = new AsyncInvocation[AsyncInvocationArrSize]; + + async[0] = vm0.invokeAsync(() -> WANTestBase.createLocator( 1, ports[0], site1LocatorsPort, site2LocatorsPort)); + + async[1] = vm1.invokeAsync(() -> WANTestBase.createLocator( 2, ports[1], site2LocatorsPort, site3LocatorsPort)); + + async[2] = vm2.invokeAsync(() -> WANTestBase.createLocator( 3, ports[2], site3LocatorsPort, site4LocatorsPort)); + + async[3] = vm3.invokeAsync(() -> WANTestBase.createLocator( 4, ports[3], site4LocatorsPort, site1LocatorsPort)); + + // pause(5000); + try { + async[0].join(); + async[1].join(); + async[2].join(); + async[3].join(); + } catch (InterruptedException e) { + e.printStackTrace(); + fail("Could not join async operations"); + } + + vm0.invoke(() -> WANTestBase.checkAllSiteMetaDataFor3Sites( dsVsPort )); + vm1.invoke(() -> WANTestBase.checkAllSiteMetaDataFor3Sites( dsVsPort )); + vm2.invoke(() -> WANTestBase.checkAllSiteMetaDataFor3Sites( dsVsPort )); + vm3.invoke(() -> WANTestBase.checkAllSiteMetaDataFor3Sites( dsVsPort )); + } + + @Ignore + @Test + public void test_3Sites3Locators() { + final Set<String> site1LocatorsPort = new HashSet<String>(); + int site1Port1 = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); + site1LocatorsPort.add("localhost["+site1Port1+"]"); + int site1Port2 = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); + site1LocatorsPort.add("localhost["+site1Port2+"]"); + int site1Port3 = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); + site1LocatorsPort.add("localhost["+site1Port3+"]"); + + final Set<String> site2LocatorsPort = new HashSet<String>(); + int site2Port1 = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); + site2LocatorsPort.add("localhost["+site2Port1+"]"); + int site2Port2 = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); + site2LocatorsPort.add("localhost["+site2Port2+"]"); + int site2Port3 = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); + site2LocatorsPort.add("localhost["+site2Port3+"]"); + + final Set<String> site3LocatorsPort = new HashSet<String>(); + int site3Port1 = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); + site3LocatorsPort.add("localhost["+site3Port1+"]"); + final int site3Port2 = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); + site3LocatorsPort.add("localhost["+site3Port2+"]"); + int site3Port3 = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); + site3LocatorsPort.add("localhost["+site3Port3+"]"); + + Map<Integer, Set<String>> dsVsPort = new HashMap<Integer, Set<String>>(); + dsVsPort.put(1, site1LocatorsPort); + dsVsPort.put(2, site2LocatorsPort); + dsVsPort.put(3, site3LocatorsPort); + + int AsyncInvocationArrSize = 9; + AsyncInvocation[] async = new AsyncInvocation[AsyncInvocationArrSize]; + + async[0] = vm0.invokeAsync(() -> WANTestBase.createLocator( 1, site1Port1, site1LocatorsPort, site2LocatorsPort)); + + async[8] = vm0.invokeAsync(() -> WANTestBase.checkAllSiteMetaDataFor3Sites(dsVsPort)); + + async[1] = vm1.invokeAsync(() -> WANTestBase.createLocator( 1, site1Port2, site1LocatorsPort, site2LocatorsPort)); + async[2] = vm2.invokeAsync(() -> WANTestBase.createLocator( 1, site1Port3, site1LocatorsPort, site2LocatorsPort)); + + async[3] = vm3.invokeAsync(() -> WANTestBase.createLocator( 2, site2Port1, site2LocatorsPort, site3LocatorsPort)); + async[4] = vm4.invokeAsync(() -> WANTestBase.createLocator( 2, site2Port2, site2LocatorsPort, site3LocatorsPort)); + async[5] = vm5.invokeAsync(() -> WANTestBase.createLocator( 2, site2Port3, site2LocatorsPort, site3LocatorsPort)); + + async[6] = vm6.invokeAsync(() -> WANTestBase.createLocator( 3, site3Port1, site3LocatorsPort, site1LocatorsPort)); + async[7] = vm7.invokeAsync(() -> WANTestBase.createLocator( 3, site3Port2, site3LocatorsPort, site1LocatorsPort)); + + WANTestBase.createLocator(3, site3Port3, site3LocatorsPort, site1LocatorsPort); + long startTime = System.currentTimeMillis(); + + try { + async[0].join(); + async[1].join(); + async[2].join(); + async[3].join(); + async[4].join(); + async[5].join(); + async[6].join(); + async[7].join(); + async[8].join(); + } catch (InterruptedException e) { + e.printStackTrace(); + fail("Could not join async operations"); + } + Long endTime = null; + try { + endTime = (Long)async[8].getResult(); + } + catch (Throwable e) { + e.printStackTrace(); + Assert.fail("Could not get end time", e); + } + + LogWriterUtils.getLogWriter().info("Time taken for all 9 locators discovery in 3 sites: " + (endTime.longValue() - startTime)); + + vm0.invoke(() -> WANTestBase.checkAllSiteMetaDataFor3Sites( dsVsPort )); + vm1.invoke(() -> WANTestBase.checkAllSiteMetaDataFor3Sites( dsVsPort )); + vm2.invoke(() -> WANTestBase.checkAllSiteMetaDataFor3Sites( dsVsPort )); + vm3.invoke(() -> WANTestBase.checkAllSiteMetaDataFor3Sites( dsVsPort )); + vm4.invoke(() -> WANTestBase.checkAllSiteMetaDataFor3Sites( dsVsPort )); + vm5.invoke(() -> WANTestBase.checkAllSiteMetaDataFor3Sites( dsVsPort )); + vm6.invoke(() -> WANTestBase.checkAllSiteMetaDataFor3Sites( dsVsPort )); + vm7.invoke(() -> WANTestBase.checkAllSiteMetaDataFor3Sites( dsVsPort )); + WANTestBase.checkAllSiteMetaDataFor3Sites(dsVsPort); + } + + + @Test + public void test_LN_Peer_Locators_Exchange_Information() { + Set<InetSocketAddress> locatorPorts = new HashSet<>(); + Map<Integer, Set<InetSocketAddress>> dsVsPort = new HashMap<>(); + dsVsPort.put(1, locatorPorts); + + Integer lnLocPort1 = (Integer)vm0.invoke(() -> WANTestBase.createFirstPeerLocator(1)); + locatorPorts.add(new InetSocketAddress("localhost", lnLocPort1)); + + Integer lnLocPort2 = (Integer)vm1.invoke(() -> WANTestBase.createSecondPeerLocator( 1, lnLocPort1 )); + locatorPorts.add(new InetSocketAddress("localhost", lnLocPort2)); + + vm0.invoke(() -> WANTestBase.checkAllSiteMetaData( dsVsPort )); + vm1.invoke(() -> WANTestBase.checkAllSiteMetaData( dsVsPort )); + } + + @Test + public void test_LN_NY_TK_5_PeerLocators_1_ServerLocator() { + Map<Integer, Set<InetSocketAddress>> dsVsPort = new HashMap<>(); + + + Set<InetSocketAddress> locatorPorts = new HashSet<>(); + dsVsPort.put(1, locatorPorts); + Integer lnLocPort1 = (Integer)vm0.invoke(() -> WANTestBase.createFirstPeerLocator(1)); + locatorPorts.add(new InetSocketAddress("localhost", lnLocPort1)); + Integer lnLocPort2 = (Integer)vm1.invoke(() -> WANTestBase.createSecondPeerLocator( 1, lnLocPort1 )); + locatorPorts.add(new InetSocketAddress("localhost", lnLocPort2)); + + locatorPorts = new HashSet<>(); + dsVsPort.put(2, locatorPorts); + Integer nyLocPort1 = (Integer)vm2.invoke(() -> WANTestBase.createFirstRemotePeerLocator(2, lnLocPort1)); + locatorPorts.add(new InetSocketAddress("localhost", nyLocPort1)); + Integer nyLocPort2 = (Integer)vm3.invoke(() -> WANTestBase.createSecondRemotePeerLocator( 2, nyLocPort1, lnLocPort2)); + locatorPorts.add(new InetSocketAddress("localhost", nyLocPort2)); + + locatorPorts = new HashSet<>(); + dsVsPort.put(3, locatorPorts); + Integer tkLocPort1 = (Integer)vm4.invoke(() -> WANTestBase.createFirstRemotePeerLocator(3, nyLocPort1)); + locatorPorts.add(new InetSocketAddress("localhost", tkLocPort1)); + Integer tkLocPort2 = (Integer)vm5.invoke(() -> WANTestBase.createSecondRemotePeerLocator( 3, tkLocPort1, nyLocPort1)); + locatorPorts.add(new InetSocketAddress("localhost", tkLocPort2)); + Integer tkLocPort3 = (Integer)vm6.invoke(() -> WANTestBase.createSecondRemoteLocator( 3, tkLocPort1, nyLocPort2)); + locatorPorts.add(new InetSocketAddress("localhost", tkLocPort3)); + + // pause(5000); + + vm0.invoke(() -> WANTestBase.checkAllSiteMetaData( dsVsPort )); + vm1.invoke(() -> WANTestBase.checkAllSiteMetaData( dsVsPort )); + vm2.invoke(() -> WANTestBase.checkAllSiteMetaData( dsVsPort )); + vm3.invoke(() -> WANTestBase.checkAllSiteMetaData( dsVsPort )); + vm4.invoke(() -> WANTestBase.checkAllSiteMetaData( dsVsPort )); + vm5.invoke(() -> WANTestBase.checkAllSiteMetaData( dsVsPort )); + vm6.invoke(() -> WANTestBase.checkAllSiteMetaData( dsVsPort )); + + } + + @Test + public void testNoThreadLeftBehind() { + // Get active thread count before test + int activeThreadCountBefore = Thread.activeCount(); + + // Start / stop locator + int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); + WANTestBase.createFirstRemoteLocator( 2, port ); + disconnectFromDS(); + + // Validate active thread count after test + + // Wait up to 60 seconds for all threads started during the test + // (including the 'WAN Locator Discovery Thread') to stop + // Note: Awaitility is not being used since it adds threads + for (int i=0; i<60; i++) { + if (Thread.activeCount() > activeThreadCountBefore) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + fail("Caught the following exception waiting for threads to stop: " + e); + } + } else { + break; + } + } + + // Fail if the active thread count after the test is greater than the active thread count before the test + if (Thread.activeCount() > activeThreadCountBefore) { + OSProcess.printStacks(0); + StringBuilder builder = new StringBuilder(); + builder + .append("Expected ") + .append(activeThreadCountBefore) + .append(" threads but found ") + .append(Thread.activeCount()) + .append(". Check log file for a thread dump."); + fail(builder.toString()); + } + } +}
