http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java deleted file mode 100644 index 411396b..0000000 --- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java +++ /dev/null @@ -1,327 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.wan.serial; - -import static com.gemstone.gemfire.distributed.ConfigurationProperties.*; -import static org.junit.Assert.*; - -import java.io.File; -import java.util.HashMap; -import java.util.List; -import java.util.Properties; -import java.util.Set; - -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import com.gemstone.gemfire.cache.AttributesFactory; -import com.gemstone.gemfire.cache.CacheFactory; -import com.gemstone.gemfire.cache.DataPolicy; -import com.gemstone.gemfire.cache.DiskStore; -import com.gemstone.gemfire.cache.DiskStoreFactory; -import com.gemstone.gemfire.cache.Region; -import com.gemstone.gemfire.cache.wan.GatewayEventFilter; -import com.gemstone.gemfire.cache.wan.GatewaySender; -import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy; -import com.gemstone.gemfire.cache.wan.GatewaySenderFactory; -import com.gemstone.gemfire.cache.wan.GatewayTransportFilter; -import com.gemstone.gemfire.cache30.MyGatewayEventFilter1; -import com.gemstone.gemfire.cache30.MyGatewayTransportFilter1; -import com.gemstone.gemfire.cache30.MyGatewayTransportFilter2; -import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; -import com.gemstone.gemfire.internal.cache.RegionQueue; -import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender; -import com.gemstone.gemfire.internal.cache.wan.WANTestBase; -import com.gemstone.gemfire.test.dunit.IgnoredException; -import com.gemstone.gemfire.test.dunit.VM; -import com.gemstone.gemfire.test.dunit.Wait; -import com.gemstone.gemfire.test.junit.categories.DistributedTest; - -@Category(DistributedTest.class) -public class SerialGatewaySenderQueueDUnitTest extends WANTestBase { - - @Test - public void testPrimarySecondaryQueueDrainInOrder_RR() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - vm2.invoke(() -> WANTestBase.createCache(nyPort )); - vm3.invoke(() -> WANTestBase.createCache(nyPort )); - - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - - vm2.invoke(() -> WANTestBase.createReceiver()); - vm3.invoke(() -> WANTestBase.createReceiver()); - - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - vm6.invoke(() -> WANTestBase.createCache( lnPort )); - vm7.invoke(() -> WANTestBase.createCache( lnPort )); - - vm4.invoke(() -> WANTestBase.createSenderWithMultipleDispatchers( "ln", 2, - false, 100, 10, false, false, null, true, 1, OrderPolicy.KEY )); - vm5.invoke(() -> WANTestBase.createSenderWithMultipleDispatchers( "ln", 2, - false, 100, 10, false, false, null, true, 1, OrderPolicy.KEY )); - - startSenderInVMs("ln", vm4, vm5); - - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - - vm4.invoke(() -> WANTestBase.addQueueListener( "ln", false)); - vm5.invoke(() -> WANTestBase.addQueueListener( "ln", false)); - - vm2.invoke(() -> WANTestBase.addListenerOnRegion(getTestMethodName() + "_RR")); - vm3.invoke(() -> WANTestBase.addListenerOnRegion(getTestMethodName() + "_RR")); - - vm4.invoke(() -> WANTestBase.pauseSender( "ln")); - - vm6.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", - 1000 )); - Wait.pause(5000); - HashMap primarySenderUpdates = (HashMap)vm4.invoke(() -> WANTestBase.checkQueue()); - HashMap secondarySenderUpdates = (HashMap)vm5.invoke(() -> WANTestBase.checkQueue()); - assertEquals(primarySenderUpdates, secondarySenderUpdates); - - vm4.invoke(() -> WANTestBase.resumeSender( "ln")); - Wait.pause(2000); - vm4.invoke(() -> WANTestBase.pauseSender( "ln")); - Wait.pause(2000); - // We should wait till primarySenderUpdates and secondarySenderUpdates become same - // If in 300000ms they don't then throw error. - primarySenderUpdates = (HashMap)vm4.invoke(() -> WANTestBase.checkQueue()); - secondarySenderUpdates = (HashMap)vm5.invoke(() -> WANTestBase.checkQueue()); - - checkPrimarySenderUpdatesOnVM5(primarySenderUpdates); -// assertIndexDetailsEquals(primarySenderUpdates, secondarySenderUpdates); - - vm4.invoke(() -> WANTestBase.resumeSender( "ln")); - Wait.pause(5000); - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - primarySenderUpdates = (HashMap)vm4.invoke(() -> WANTestBase.checkQueue()); - HashMap receiverUpdates = (HashMap)vm2.invoke(() -> WANTestBase.checkQueue()); - - List destroyList = (List)primarySenderUpdates.get("Destroy"); - List createList = (List)receiverUpdates.get("Create"); - for(int i = 0; i< 1000; i++){ - assertEquals(destroyList.get(i), createList.get(i)); - } - assertEquals(primarySenderUpdates.get("Destroy"), receiverUpdates.get("Create")); - - Wait.pause(5000); - // We expect that after this much time secondary would have got batch removal message - // removing all the keys. - secondarySenderUpdates = (HashMap)vm5.invoke(() -> WANTestBase.checkQueue()); - assertEquals(secondarySenderUpdates.get("Destroy"), receiverUpdates.get("Create")); - } - - protected void checkPrimarySenderUpdatesOnVM5(HashMap primarySenderUpdates) { - vm5.invoke(() -> WANTestBase.checkQueueOnSecondary( primarySenderUpdates )); - } - - @Test - public void testPrimarySecondaryQueueDrainInOrder_PR() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - - vm2.invoke(() -> WANTestBase.addListenerOnRegion(getTestMethodName() + "_PR")); - vm3.invoke(() -> WANTestBase.addListenerOnRegion(getTestMethodName() + "_PR")); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createSenderWithMultipleDispatchers( "ln", 2, - false, 100, 10, false, false, null, true, 1, OrderPolicy.KEY )); - vm5.invoke(() -> WANTestBase.createSenderWithMultipleDispatchers( "ln", 2, - false, 100, 10, false, false, null, true,1, OrderPolicy.KEY )); - - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5); - - vm4.invoke(() -> WANTestBase.addQueueListener( "ln", false)); - vm5.invoke(() -> WANTestBase.addQueueListener( "ln", false)); - - vm4.invoke(() -> WANTestBase.pauseSender( "ln")); - - vm6.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", - 1000 )); - Wait.pause(5000); - HashMap primarySenderUpdates = (HashMap)vm4.invoke(() -> WANTestBase.checkQueue()); - HashMap secondarySenderUpdates = (HashMap)vm5.invoke(() -> WANTestBase.checkQueue()); - checkPrimarySenderUpdatesOnVM5(primarySenderUpdates); - - vm4.invoke(() -> WANTestBase.resumeSender( "ln")); - Wait.pause(4000); - vm4.invoke(() -> WANTestBase.pauseSender( "ln")); - Wait.pause(15000); - primarySenderUpdates = (HashMap)vm4.invoke(() -> WANTestBase.checkQueue()); - secondarySenderUpdates = (HashMap)vm5.invoke(() -> WANTestBase.checkQueue()); - assertEquals(primarySenderUpdates, secondarySenderUpdates); - - vm4.invoke(() -> WANTestBase.resumeSender( "ln")); - Wait.pause(5000); - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR", 1000 )); - } - - /** - * Test to validate that serial gateway sender queue diskSynchronous attribute - * when persistence of sender is enabled. - */ - @Test - public void test_ValidateSerialGatewaySenderQueueAttributes_1() { - Integer localLocPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - - Integer remoteLocPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, localLocPort )); - - WANTestBase test = new WANTestBase(getTestMethodName()); - Properties props = test.getDistributedSystemProperties(); - props.setProperty(MCAST_PORT, "0"); - props.setProperty(LOCATORS, "localhost[" - + localLocPort + "]"); - InternalDistributedSystem ds = test.getSystem(props); - cache = CacheFactory.create(ds); - - File directory = new File("TKSender" + "_disk_" - + System.currentTimeMillis() + "_" + VM.getCurrentVMNum()); - directory.mkdir(); - File[] dirs1 = new File[] { directory }; - DiskStoreFactory dsf = cache.createDiskStoreFactory(); - dsf.setDiskDirs(dirs1); - DiskStore diskStore = dsf.create("FORNY"); - - GatewaySenderFactory fact = cache.createGatewaySenderFactory(); - fact.setBatchConflationEnabled(true); - fact.setBatchSize(200); - fact.setBatchTimeInterval(300); - fact.setPersistenceEnabled(true);// enable the persistence - fact.setDiskSynchronous(true); - fact.setDiskStoreName("FORNY"); - fact.setMaximumQueueMemory(200); - fact.setAlertThreshold(1200); - GatewayEventFilter myEventFilter1 = new MyGatewayEventFilter1(); - fact.addGatewayEventFilter(myEventFilter1); - GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1(); - fact.addGatewayTransportFilter(myStreamFilter1); - GatewayTransportFilter myStreamFilter2 = new MyGatewayTransportFilter2(); - fact.addGatewayTransportFilter(myStreamFilter2); - final IgnoredException exTKSender = IgnoredException.addIgnoredException("Could not connect"); - try { - GatewaySender sender1 = fact.create("TKSender", 2); - - AttributesFactory factory = new AttributesFactory(); - factory.addGatewaySenderId(sender1.getId()); - factory.setDataPolicy(DataPolicy.PARTITION); - Region region = cache.createRegionFactory(factory.create()).create( - "test_ValidateGatewaySenderAttributes"); - Set<GatewaySender> senders = cache.getGatewaySenders(); - assertEquals(senders.size(), 1); - GatewaySender gatewaySender = senders.iterator().next(); - Set<RegionQueue> regionQueues = ((AbstractGatewaySender) gatewaySender) - .getQueues(); - assertEquals(regionQueues.size(), GatewaySender.DEFAULT_DISPATCHER_THREADS); - RegionQueue regionQueue = regionQueues.iterator().next(); - assertEquals(true, regionQueue.getRegion().getAttributes() - .isDiskSynchronous()); - } finally { - exTKSender.remove(); - } - } - - /** - * Test to validate that serial gateway sender queue diskSynchronous attribute - * when persistence of sender is not enabled. - */ - @Test - public void test_ValidateSerialGatewaySenderQueueAttributes_2() { - Integer localLocPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - - Integer remoteLocPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, localLocPort )); - - WANTestBase test = new WANTestBase(getTestMethodName()); - Properties props = test.getDistributedSystemProperties(); - props.setProperty(MCAST_PORT, "0"); - props.setProperty(LOCATORS, "localhost[" + localLocPort + "]"); - InternalDistributedSystem ds = test.getSystem(props); - cache = CacheFactory.create(ds); - - GatewaySenderFactory fact = cache.createGatewaySenderFactory(); - fact.setBatchConflationEnabled(true); - fact.setBatchSize(200); - fact.setBatchTimeInterval(300); - fact.setPersistenceEnabled(false);//set persistence to false - fact.setDiskSynchronous(true); - fact.setMaximumQueueMemory(200); - fact.setAlertThreshold(1200); - GatewayEventFilter myEventFilter1 = new MyGatewayEventFilter1(); - fact.addGatewayEventFilter(myEventFilter1); - GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1(); - fact.addGatewayTransportFilter(myStreamFilter1); - GatewayTransportFilter myStreamFilter2 = new MyGatewayTransportFilter2(); - fact.addGatewayTransportFilter(myStreamFilter2); - final IgnoredException exp = IgnoredException.addIgnoredException("Could not connect"); - try { - GatewaySender sender1 = fact.create("TKSender", 2); - - AttributesFactory factory = new AttributesFactory(); - factory.addGatewaySenderId(sender1.getId()); - factory.setDataPolicy(DataPolicy.PARTITION); - Region region = cache.createRegionFactory(factory.create()).create( - "test_ValidateGatewaySenderAttributes"); - Set<GatewaySender> senders = cache.getGatewaySenders(); - assertEquals(senders.size(), 1); - GatewaySender gatewaySender = senders.iterator().next(); - Set<RegionQueue> regionQueues = ((AbstractGatewaySender) gatewaySender) - .getQueues(); - assertEquals(regionQueues.size(), GatewaySender.DEFAULT_DISPATCHER_THREADS); - RegionQueue regionQueue = regionQueues.iterator().next(); - - assertEquals(false, regionQueue.getRegion().getAttributes() - .isDiskSynchronous()); - } finally { - exp.remove(); - } - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPersistenceEnabledGatewaySenderDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPersistenceEnabledGatewaySenderDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPersistenceEnabledGatewaySenderDUnitTest.java deleted file mode 100644 index 7664ab4..0000000 --- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPersistenceEnabledGatewaySenderDUnitTest.java +++ /dev/null @@ -1,547 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.wan.serial; - -import org.junit.experimental.categories.Category; -import org.junit.Test; - -import static org.junit.Assert.*; - -import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase; -import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase; -import com.gemstone.gemfire.test.junit.categories.DistributedTest; - - -import com.gemstone.gemfire.internal.cache.wan.WANTestBase; -import com.gemstone.gemfire.test.dunit.AsyncInvocation; -import com.gemstone.gemfire.test.dunit.LogWriterUtils; -import com.gemstone.gemfire.test.dunit.Wait; - -/** - * - */ -@Category(DistributedTest.class) -public class SerialWANPersistenceEnabledGatewaySenderDUnitTest extends - WANTestBase { - - private static final long serialVersionUID = 1L; - - public SerialWANPersistenceEnabledGatewaySenderDUnitTest() { - super(); - } - - /** - * Just enable the persistence for GatewaySender and see if it remote site - * receives all the events. - */ - @Test - public void testReplicatedRegionWithGatewaySenderPersistenceEnabled() { - - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 10, false, true, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 10, false, true, null, true )); - - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5); - - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", - 1000 )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - - } - - /** - * Enable persistence for the Region and see if the remote site gets all the - * events. - */ - @Test - public void testPersistentReplicatedRegionWithGatewaySender() { - - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 10, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 10, false, false, null, true )); - - vm2.invoke(() -> WANTestBase.createPersistentReplicatedRegion( getTestMethodName() + "_RR", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createPersistentReplicatedRegion( getTestMethodName() + "_RR", null, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5); - - vm4.invoke(() -> WANTestBase.createPersistentReplicatedRegion( getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPersistentReplicatedRegion( getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createPersistentReplicatedRegion( getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createPersistentReplicatedRegion( getTestMethodName() + "_RR", "ln", isOffHeap() )); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", - 1000 )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - - } - - /** - * Enable persistence for region as well as GatewaySender and see if remote - * site receives all the events. - * - */ - @Test - public void testPersistentReplicatedRegionWithGatewaySenderPersistenceEnabled() { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 10, false, true, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 10, false, true, null, true )); - - vm2.invoke(() -> WANTestBase.createPersistentReplicatedRegion( getTestMethodName() + "_RR", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createPersistentReplicatedRegion( getTestMethodName() + "_RR", null, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5); - - vm4.invoke(() -> WANTestBase.createPersistentReplicatedRegion( getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPersistentReplicatedRegion( getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createPersistentReplicatedRegion( getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createPersistentReplicatedRegion( getTestMethodName() + "_RR", "ln", isOffHeap() )); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", - 1000 )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - - } - - /** - * Enable persistence for GatewaySender, kill the sender and restart it. Check - * if the remote site receives all the event. - */ - @Test - public void testReplicatedRegionWithGatewaySenderPersistenceEnabled_Restart() { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - String firstDStore = (String)vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, false, - 100, 10, false, true, null, null, true )); - String secondDStore = (String)vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, false, - 100, 10, false, true, null, null, true )); - - LogWriterUtils.getLogWriter().info("The first ds is " + firstDStore); - LogWriterUtils.getLogWriter().info("The first ds is " + secondDStore); - - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5); - - vm4.invoke(() -> WANTestBase.createPersistentReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPersistentReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createPersistentReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createPersistentReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - - vm4.invoke(() -> WANTestBase.pauseSender( "ln" )); - vm5.invoke(() -> WANTestBase.pauseSender( "ln" )); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", - 1000 )); - - LogWriterUtils.getLogWriter().info("Completed puts in the region"); - - // verify if the queue has all the events - // vm4.invoke(() -> WANTestBase.checkQueueSize( "ln", 1000 - // )); - // vm5.invoke(() -> WANTestBase.checkQueueSize( "ln", 1000 - // )); - // - // vm2.invoke(() -> WANTestBase.validateRegionSize( - // testName + "_RR", 0 )); - // vm3.invoke(() -> WANTestBase.validateRegionSize( - // testName + "_RR", 0 )); - - // kill the vm - vm4.invoke(() -> WANTestBase.killSender()); - vm5.invoke(() -> WANTestBase.killSender()); - vm6.invoke(() -> WANTestBase.killSender()); - vm7.invoke(() -> WANTestBase.killSender()); - - LogWriterUtils.getLogWriter().info("Killed all the sender. "); - // restart the vm - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( - "ln", 2, false, 100, 10, false, true, null, - firstDStore, true )); - LogWriterUtils.getLogWriter().info("Created the sender.... in vm4 "); - vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( - "ln", 2, false, 100, 10, false, true, null, - secondDStore, true )); - LogWriterUtils.getLogWriter().info("Created the sender.... in vm5 "); - AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.startSender( "ln" )); - LogWriterUtils.getLogWriter().info("Started the sender in vm 4"); - - vm5.invoke(() -> WANTestBase.startSender( "ln" )); - LogWriterUtils.getLogWriter().info("Started the sender in vm 5"); - try { - inv1.join(); - } catch (InterruptedException e) { - fail("Got interrupted exception while waiting for startSender to finish."); - } - - Wait.pause(5000); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - - } - - /** - * Enable persistence for Region and persistence for GatewaySender. Kill the - * vm with regions and bring that up again. Check if the remote site receives - * all the event. again? - * - */ - @Test - public void testPersistentReplicatedRegionWithGatewaySenderPersistenceEnabled_Restart() { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - String firstDStore = (String)vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, false, - 100, 10, false, true, null, null, true )); - String secondDStore = (String)vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, false, - 100, 10, false, true, null, null, true )); - - LogWriterUtils.getLogWriter().info("The first ds is " + firstDStore); - LogWriterUtils.getLogWriter().info("The first ds is " + secondDStore); - - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5); - - vm4.invoke(() -> WANTestBase.createPersistentReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPersistentReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createPersistentReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createPersistentReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - - vm4.invoke(() -> WANTestBase.pauseSender( "ln" )); - vm5.invoke(() -> WANTestBase.pauseSender( "ln" )); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", - 1000 )); - - LogWriterUtils.getLogWriter().info("Completed puts in the region"); - - // kill the vm - vm4.invoke(() -> WANTestBase.killSender()); - vm5.invoke(() -> WANTestBase.killSender()); - - LogWriterUtils.getLogWriter().info("Killed the sender. "); - // restart the vm - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - - vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, false, - 100, 10, false, true, null, firstDStore, true )); - LogWriterUtils.getLogWriter().info("Created the sender.... in vm4 "); - vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, false, - 100, 10, false, true, null, secondDStore, true )); - LogWriterUtils.getLogWriter().info("Created the sender.... in vm5 "); - - vm4.invoke(() -> WANTestBase.createPersistentReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - - vm5.invoke(() -> WANTestBase.createPersistentReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - - AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.startSender( "ln" )); - LogWriterUtils.getLogWriter().info("Started the sender in vm 4"); - - vm5.invoke(() -> WANTestBase.startSender( "ln" )); - LogWriterUtils.getLogWriter().info("Started the sender in vm 5"); - try { - inv1.join(); - } catch (InterruptedException e) { - fail("Got interrupted exception while waiting for startSender to finish."); - } - - Wait.pause(5000); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - - } - - /** - * Enable persistence for Region. No persistence for GatewaySender. Kill the - * vm with regions and bring that up again. Check if the remote site receives - * all the event. again? - * - */ - @Test - public void testPersistentReplicatedRegionWithGatewaySender_Restart() { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, false, - 100, 10, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, false, - 100, 10, false, false, null, true )); - - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5); - - vm4.invoke(() -> WANTestBase.createPersistentReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPersistentReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createPersistentReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createPersistentReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - - vm4.invoke(() -> WANTestBase.pauseSender( "ln" )); - vm5.invoke(() -> WANTestBase.pauseSender( "ln" )); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", - 1000 )); - - LogWriterUtils.getLogWriter().info("Completed puts in the region"); - - // verify if the queue has all the events - // vm4.invoke(() -> WANTestBase.checkQueueSize( "ln", 1000 - // )); - // vm5.invoke(() -> WANTestBase.checkQueueSize( "ln", 1000 - // )); - // - // vm2.invoke(() -> WANTestBase.validateRegionSize( - // testName + "_RR", 0 )); - // vm3.invoke(() -> WANTestBase.validateRegionSize( - // testName + "_RR", 0 )); - - // kill the vm - vm4.invoke(() -> WANTestBase.killSender()); - vm5.invoke(() -> WANTestBase.killSender()); - - LogWriterUtils.getLogWriter().info("Killed the sender. "); - // restart the vm - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - vm4.invoke(() -> WANTestBase.createSender( - "ln", 2, false, 100, 10, false, false, null, true)); - LogWriterUtils.getLogWriter().info("Created the sender.... in vm4 "); - vm5.invoke(() -> WANTestBase.createSender( - "ln", 2, false, 100, 10, false, false, null, true)); - LogWriterUtils.getLogWriter().info("Created the sender.... in vm5 "); - - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - LogWriterUtils.getLogWriter().info("Started the sender in vm 4"); - - vm5.invoke(() -> WANTestBase.startSender( "ln" )); - LogWriterUtils.getLogWriter().info("Started the sender in vm 5"); - - AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.createPersistentReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPersistentReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - - try { - inv1.join(); - } catch (InterruptedException e) { - fail("Got interrupted exception while waiting for startSender to finish."); - } - - Wait.pause(5000); - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", - 1000 )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - - } - - - /** - * Enable persistence for Region and persistence for GatewaySender. Kill the - * vm with regions and bring that up again. Check if the remote site receives - * all the event. again? - * In this case put is continuously happening while the vm is down. - */ - @Test - public void testPersistentReplicatedRegionWithGatewaySenderPersistenceEnabled_Restart2() { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - String firstDStore = (String)vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, false, - 100, 10, false, true, null, null, true )); - String secondDStore = (String)vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, false, - 100, 10, false, true, null, null, true )); - - LogWriterUtils.getLogWriter().info("The first ds is " + firstDStore); - LogWriterUtils.getLogWriter().info("The first ds is " + secondDStore); - - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5); - - vm4.invoke(() -> WANTestBase.createPersistentReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPersistentReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createPersistentReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createPersistentReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - - vm4.invoke(() -> WANTestBase.pauseSender( "ln" )); - vm5.invoke(() -> WANTestBase.pauseSender( "ln" )); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", - 1000 )); - - LogWriterUtils.getLogWriter().info("Completed puts in the region"); - - // kill the vm - vm4.invoke(() -> WANTestBase.killSender()); - vm5.invoke(() -> WANTestBase.killSender()); - - LogWriterUtils.getLogWriter().info("Killed the sender. "); - // restart the vm - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - - vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, false, - 100, 10, false, true, null, firstDStore, true )); - LogWriterUtils.getLogWriter().info("Created the sender.... in vm4 "); - vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, false, - 100, 10, false, true, null, secondDStore, true )); - LogWriterUtils.getLogWriter().info("Created the sender.... in vm5 "); - - vm4.invoke(() -> WANTestBase.createPersistentReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - - vm5.invoke(() -> WANTestBase.createPersistentReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - - AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.startSender( "ln" )); - LogWriterUtils.getLogWriter().info("Started the sender in vm 4"); - - vm5.invoke(() -> WANTestBase.startSender( "ln" )); - LogWriterUtils.getLogWriter().info("Started the sender in vm 5"); - try { - inv1.join(); - } catch (InterruptedException e) { - fail("Got interrupted exception while waiting for startSender to finish."); - } - - Wait.pause(5000); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - - } -}
