http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderDUnitTest.java deleted file mode 100644 index 19edd7d..0000000 --- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderDUnitTest.java +++ /dev/null @@ -1,737 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.wan.concurrent; - -import com.jayway.awaitility.Awaitility; -import org.junit.experimental.categories.Category; -import org.junit.Test; - -import static org.junit.Assert.*; - -import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase; -import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase; -import com.gemstone.gemfire.test.junit.categories.DistributedTest; - -import com.gemstone.gemfire.cache.EntryExistsException; -import com.gemstone.gemfire.cache.client.ServerOperationException; -import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy; -import com.gemstone.gemfire.cache.wan.GatewaySender; -import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender; -import com.gemstone.gemfire.internal.cache.wan.BatchException70; -import com.gemstone.gemfire.internal.cache.wan.WANTestBase; -import com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderEventProcessor; -import com.gemstone.gemfire.test.dunit.AsyncInvocation; -import com.gemstone.gemfire.test.dunit.IgnoredException; -import com.gemstone.gemfire.test.dunit.LogWriterUtils; -import com.gemstone.gemfire.test.dunit.Wait; - -import java.net.SocketException; -import java.util.Set; -import java.util.concurrent.TimeUnit; - -/** - * Test the functionality of ParallelGatewaySender with multiple dispatchers. - * - */ -@Category(DistributedTest.class) -public class ConcurrentParallelGatewaySenderDUnitTest extends WANTestBase { - - public ConcurrentParallelGatewaySenderDUnitTest() { - super(); - } - - /** - * Normal happy scenario test case. - * checks that all the dispatchers have successfully - * dispatched something individually. - * - * @throws Exception - */ - @Test - public void testParallelPropagationConcurrentArtifacts() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 5, OrderPolicy.PARTITION )); - vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 5, OrderPolicy.PARTITION )); - vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 5, OrderPolicy.PARTITION )); - vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 5, OrderPolicy.PARTITION )); - - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5, vm6, vm7); - - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - - //before doing any puts, let the senders be running in order to ensure that - //not a single event will be lost - vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - - try { - // set the test hook to find out dispatched events by each of the - // concurrent dispatcher - vm4.invoke(() -> ConcurrentParallelGatewaySenderDUnitTest.setTestHook("ln", Boolean.TRUE )); - vm5.invoke(() -> ConcurrentParallelGatewaySenderDUnitTest.setTestHook("ln", Boolean.TRUE )); - vm6.invoke(() -> ConcurrentParallelGatewaySenderDUnitTest.setTestHook("ln", Boolean.TRUE )); - vm7.invoke(() -> ConcurrentParallelGatewaySenderDUnitTest.setTestHook("ln", Boolean.TRUE )); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", - 1000 )); - - // verify all buckets drained on all sender nodes. - vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" )); - vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" )); - vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" )); - vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR", 1000 )); - - int dispatched1 = (Integer)vm4.invoke(() -> WANTestBase.verifyAndGetEventsDispatchedByConcurrentDispatchers( "ln" )); - int dispatched2 = (Integer)vm5.invoke(() -> WANTestBase.verifyAndGetEventsDispatchedByConcurrentDispatchers( "ln" )); - int dispatched3 = (Integer)vm6.invoke(() -> WANTestBase.verifyAndGetEventsDispatchedByConcurrentDispatchers( "ln" )); - int dispatched4 = (Integer)vm7.invoke(() -> WANTestBase.verifyAndGetEventsDispatchedByConcurrentDispatchers( "ln" )); - - assertEquals(1000, dispatched1 + dispatched2 + dispatched3 + dispatched4); - } - finally { - vm4.invoke(() -> ConcurrentParallelGatewaySenderDUnitTest.setTestHook("ln", Boolean.FALSE )); - vm5.invoke(() -> ConcurrentParallelGatewaySenderDUnitTest.setTestHook("ln", Boolean.FALSE )); - vm6.invoke(() -> ConcurrentParallelGatewaySenderDUnitTest.setTestHook("ln", Boolean.FALSE )); - vm7.invoke(() -> ConcurrentParallelGatewaySenderDUnitTest.setTestHook("ln", Boolean.FALSE )); - } - } - - /** - * Normal happy scenario test case. - * @throws Exception - */ - @Test - public void testParallelPropagation() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 5, OrderPolicy.PARTITION )); - vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 5, OrderPolicy.PARTITION )); - vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 5, OrderPolicy.PARTITION )); - vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 5, OrderPolicy.PARTITION )); - - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5, vm6, vm7); - - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - - //before doing any puts, let the senders be running in order to ensure that - //not a single event will be lost - vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", - 1000 )); - - //verify all buckets drained on all sender nodes. - vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR", 1000 )); - } - - - /** - * Normal happy scenario test case when bucket division tests boundary cases. - * @throws Exception - */ - @Test - public void testParallelPropagationWithUnEqualBucketDivision() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 7, OrderPolicy.PARTITION )); - vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 7, OrderPolicy.PARTITION )); - vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 7, OrderPolicy.PARTITION )); - vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 7, OrderPolicy.PARTITION )); - - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5, vm6, vm7); - - //before doing any puts, let the senders be running in order to ensure that - //not a single event will be lost - vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", - 1000 )); - - //verify all buckets drained on all sender nodes. - vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR", 1000 )); - } - - - /** - * Initially the GatewayReceiver is not up but later it comes up. - * We verify that - * @throws Exception - */ - @Test - public void testParallelPropagation_withoutRemoteSite() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - //keep a larger batch to minimize number of exception occurrences in the log - vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 300, false, false, null, true, 6, OrderPolicy.PARTITION )); - vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 300, false, false, null, true, 6, OrderPolicy.PARTITION )); - vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 300, false, false, null, true, 6, OrderPolicy.PARTITION )); - vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 300, false, false, null, true, 6, OrderPolicy.PARTITION )); - - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5, vm6, vm7); - - //make sure all the senders are running before doing any puts - vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", - 1000 )); - - createCacheInVMs(nyPort, vm2, vm3); - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - - createReceiverInVMs(vm2, vm3); - - //verify all buckets drained on all sender nodes. - vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - - // Just making sure that though the remote site is started later, - // remote site is still able to get the data. Since the receivers are - // started before creating partition region it is quite possible that the - // region may loose some of the events. This needs to be handled by the code - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR", 1000 )); - } - - /** - * Testing for colocated region with orderPolicy Partition - */ - @Test - public void testParallelPropagationColocatedPartitionedRegions() { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 5, OrderPolicy.PARTITION )); - vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 5, OrderPolicy.PARTITION )); - vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 5, OrderPolicy.PARTITION )); - vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 5, OrderPolicy.PARTITION )); - - - vm4.invoke(() -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 1, 100, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 1, 100, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 1, 100, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 1, 100, isOffHeap() )); - - - startSenderInVMs("ln", vm4, vm5, vm6, vm7); - - vm2.invoke(() -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 1, 100, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 1, 100, isOffHeap() )); - - //before doing any puts, let the senders be running in order to ensure that - //not a single event will be lost - vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - - vm4.invoke(() -> WANTestBase.putcolocatedPartitionedRegion( 1000 )); - - //verify all buckets drained on all sender nodes. - vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - customerRegionName, 1000 )); - vm2.invoke(() -> WANTestBase.validateRegionSize( - orderRegionName, 1000 )); - vm2.invoke(() -> WANTestBase.validateRegionSize( - shipmentRegionName, 1000 )); - } - - /** - * Local and remote sites are up and running. - * Local site cache is closed and the site is built again. - * Puts are done to local site. - * Expected: Remote site should receive all the events put after the local - * site was built back. - * - * @throws Exception - */ - @Test - public void testParallelPropagationWithLocalCacheClosedAndRebuilt() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 6, OrderPolicy.PARTITION )); - vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 6, OrderPolicy.PARTITION )); - vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 6, OrderPolicy.PARTITION)); - vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 6, OrderPolicy.PARTITION )); - - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5, vm6, vm7); - - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - - //before doing any puts, let the senders be running in order to ensure that - //not a single event will be lost - vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", - 1000 )); - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR", 1000 )); - //-------------------Close and rebuild local site --------------------------------- - - vm4.invoke(() -> WANTestBase.killSender()); - vm5.invoke(() -> WANTestBase.killSender()); - vm6.invoke(() -> WANTestBase.killSender()); - vm7.invoke(() -> WANTestBase.killSender()); - - Integer regionSize = - (Integer) vm2.invoke(() -> WANTestBase.getRegionSize(getTestMethodName() + "_PR" )); - LogWriterUtils.getLogWriter().info("Region size on remote is: " + regionSize); - - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - vm6.invoke(() -> WANTestBase.createCache( lnPort )); - vm7.invoke(() -> WANTestBase.createCache( lnPort )); - - vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 6, OrderPolicy.PARTITION )); - vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 6, OrderPolicy.PARTITION )); - vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 6, OrderPolicy.PARTITION )); - vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 6, OrderPolicy.PARTITION )); - - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - //------------------------------------------------------------------------------------ - - IgnoredException.addIgnoredException(EntryExistsException.class.getName()); - IgnoredException.addIgnoredException(BatchException70.class.getName()); - IgnoredException.addIgnoredException(ServerOperationException.class.getName()); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 10000 )); - - //verify all buckets drained on all sender nodes. - vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR", 10000 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR", 10000 )); - } - - /** - * Colocated regions using ConcurrentParallelGatewaySender. - * Normal scenario - * @throws Exception - */ - @Test - public void testParallelColocatedPropagation() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 7, OrderPolicy.KEY )); - vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 7, OrderPolicy.KEY )); - vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 7, OrderPolicy.KEY )); - vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 7, OrderPolicy.KEY )); - - vm4.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), "ln", 1, 100, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), "ln", 1, 100, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), "ln", 1, 100, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), "ln", 1, 100, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5, vm6, vm7); - - vm2.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), null, 1, 100, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), null, 1, 100, isOffHeap() )); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 1000 )); - - //verify all buckets drained on all sender nodes. - vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName(), 1000 )); - } - - - /** - * Colocated regions using ConcurrentParallelGatewaySender. - * Normal scenario - * @throws Exception - */ - @Test - public void testParallelColocatedPropagationOrderPolicyPartition() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 7, OrderPolicy.PARTITION )); - vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 7, OrderPolicy.PARTITION )); - vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 7, OrderPolicy.PARTITION )); - vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 7, OrderPolicy.PARTITION )); - - vm4.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), "ln", 1, 100, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), "ln", 1, 100, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), "ln", 1, 100, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), "ln", 1, 100, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5, vm6, vm7); - - vm2.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), null, 1, 100, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), null, 1, 100, isOffHeap() )); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 1000 )); - - //verify all buckets drained on all sender nodes. - vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName(), 1000 )); - } - - @Test - public void testPartitionedParallelPropagationHA() throws Exception { - IgnoredException.addIgnoredException(SocketException.class.getName()); // for Connection reset - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 6, OrderPolicy.KEY )); - vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 6, OrderPolicy.KEY )); - vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 6, OrderPolicy.KEY )); - vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 6, OrderPolicy.KEY )); - - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 2, 100, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 2, 100, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 2, 100, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 2, 100, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5, vm6, vm7); - - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - - AsyncInvocation inv1 = vm7.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 5000 )); - vm2.invoke(() -> Awaitility.await().atMost(30000, TimeUnit.MILLISECONDS).until(() -> - assertEquals("Failure in waiting for at least 10 events to be received by the receiver", - true, (getRegionSize(getTestMethodName() + "_PR") > 10 )))); - AsyncInvocation inv2 = vm4.invokeAsync(() -> WANTestBase.killSender()); - AsyncInvocation inv3 = vm6.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 10000 )); - vm2.invoke(() -> Awaitility.await().atMost(30000, TimeUnit.MILLISECONDS).until(() -> - assertEquals("Failure in waiting for additional 2000 events to be received by the receiver ", - true,getRegionSize(getTestMethodName() + "_PR") > 7000 ))); - AsyncInvocation inv4 = vm5.invokeAsync(() -> WANTestBase.killSender()); - inv1.join(); - inv2.join(); - inv3.join(); - inv4.join(); - - vm6.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR", 10000 )); - vm7.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR", 10000 )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR", 10000 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR", 10000 )); - - //verify all buckets drained on the sender nodes that up and running. - vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - } - - @Test - public void testWANPDX_PR_MultipleVM_ConcurrentParallelSender() { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2); - vm2.invoke(() -> WANTestBase.createReceiver()); - - createCacheInVMs(lnPort, vm3, vm4); - - vm3.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY )); - vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY )); - - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 0, 2, isOffHeap())); - - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 0, 2, isOffHeap())); - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 0, 2, isOffHeap())); - - startSenderInVMs("ln", vm3, vm4); - - vm3.invoke(() -> WANTestBase.doPutsPDXSerializable( getTestMethodName() + "_PR", - 10 )); - - vm2.invoke(() -> WANTestBase.validateRegionSize_PDX( - getTestMethodName() + "_PR", 10 )); - } - - @Test - public void testWANPDX_PR_MultipleVM_ConcurrentParallelSender_StartedLater() { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - vm2.invoke(() -> WANTestBase.createReceiver_PDX( nyPort )); - - vm3.invoke(() -> WANTestBase.createCache_PDX( lnPort )); - vm4.invoke(() -> WANTestBase.createCache_PDX( lnPort )); - - vm3.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY )); - vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY )); - - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 0, 2, isOffHeap())); - - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 0, 2, isOffHeap())); - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 0, 2, isOffHeap())); - - vm3.invoke(() -> WANTestBase.doPutsPDXSerializable( getTestMethodName() + "_PR", - 10 )); - - startSenderInVMsAsync("ln", vm3, vm4); - - vm4.invoke(() -> WANTestBase.doPutsPDXSerializable( getTestMethodName() + "_PR", - 40 )); - - vm2.invoke(() -> WANTestBase.validateRegionSize_PDX( - getTestMethodName() + "_PR", 40 )); - } - - public static void setTestHook(String senderId, boolean hook) { - Set<GatewaySender> senders = cache.getGatewaySenders(); - GatewaySender sender = null; - for (GatewaySender s : senders) { - if (s.getId().equals(senderId)) { - sender = s; - break; - } - } - ConcurrentParallelGatewaySenderEventProcessor cProc = (ConcurrentParallelGatewaySenderEventProcessor)((AbstractGatewaySender)sender) - .getEventProcessor(); - if (cProc == null) return; - cProc.TEST_HOOK = hook; - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOffHeapDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOffHeapDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOffHeapDUnitTest.java deleted file mode 100644 index da8a631..0000000 --- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOffHeapDUnitTest.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.wan.concurrent; - -import org.junit.experimental.categories.Category; -import org.junit.Test; - -import static org.junit.Assert.*; - -import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase; -import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase; -import com.gemstone.gemfire.test.junit.categories.DistributedTest; - -@SuppressWarnings("serial") -@Category(DistributedTest.class) -public class ConcurrentParallelGatewaySenderOffHeapDUnitTest extends - ConcurrentParallelGatewaySenderDUnitTest { - - public ConcurrentParallelGatewaySenderOffHeapDUnitTest() { - super(); - } - - @Override - public boolean isOffHeap() { - return true; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_1_DUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_1_DUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_1_DUnitTest.java deleted file mode 100644 index ab1c06b..0000000 --- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_1_DUnitTest.java +++ /dev/null @@ -1,796 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.wan.concurrent; - -import org.junit.Ignore; -import org.junit.experimental.categories.Category; -import org.junit.Test; - -import static org.junit.Assert.*; - -import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase; -import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase; -import com.gemstone.gemfire.test.junit.categories.DistributedTest; - -import org.junit.experimental.categories.Category; - -import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy; -import com.gemstone.gemfire.internal.cache.wan.WANTestBase; -import com.gemstone.gemfire.test.dunit.AsyncInvocation; -import com.gemstone.gemfire.test.dunit.IgnoredException; -import com.gemstone.gemfire.test.dunit.LogWriterUtils; -import com.gemstone.gemfire.test.dunit.Wait; -import com.gemstone.gemfire.test.junit.categories.FlakyTest; - -@Category(DistributedTest.class) -public class ConcurrentParallelGatewaySenderOperation_1_DUnitTest extends WANTestBase { - private static final long serialVersionUID = 1L; - - public ConcurrentParallelGatewaySenderOperation_1_DUnitTest() { - super(); - } - - @Override - protected final void postSetUpWANTestBase() throws Exception { - IgnoredException.addIgnoredException("Broken pipe"); - IgnoredException.addIgnoredException("Connection reset"); - IgnoredException.addIgnoredException("Unexpected IOException"); - } - - @Test - public void testParallelGatewaySenderWithoutStarting() { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 6, OrderPolicy.KEY )); - vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 6, OrderPolicy.KEY )); - vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 6, OrderPolicy.KEY )); - vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 6, OrderPolicy.KEY )); - - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); - - vm4.invoke(() -> WANTestBase.verifySenderStoppedState( "ln" )); - vm5.invoke(() -> WANTestBase.verifySenderStoppedState( "ln" )); - vm6.invoke(() -> WANTestBase.verifySenderStoppedState( "ln" )); - vm7.invoke(() -> WANTestBase.verifySenderStoppedState( "ln" )); - - vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 0 )); - vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 0 )); - } - - /** - * Defect 44323 (ParallelGatewaySender should not be started on Accessor node) - */ - @Test - public void testParallelGatewaySenderStartOnAccessorNode() { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 7, OrderPolicy.KEY )); - vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 7, OrderPolicy.KEY )); - vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 7, OrderPolicy.KEY )); - vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 7, OrderPolicy.KEY )); - - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createPartitionedRegionAsAccessor( - getTestMethodName() + "_PR", "ln", 1, 100 )); - vm7.invoke(() -> WANTestBase.createPartitionedRegionAsAccessor( - getTestMethodName() + "_PR", "ln", 1, 100 )); - - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - - //start the senders - startSenderInVMs("ln", vm4, vm5, vm6, vm7); - - vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); - - vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - - vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 1000 )); - vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 1000 )); - } - - - /** - * Normal scenario in which the sender is paused in between. - * @throws Exception - */ - @Test - public void testParallelPropagationSenderPause() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY )); - vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY )); - vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY )); - vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY )); - - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5, vm6, vm7); - - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - - //make sure all the senders are running before doing any puts - vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - - //FIRST RUN: now, the senders are started. So, start the puts - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 100 )); - - //now, pause all of the senders - vm4.invoke(() -> WANTestBase.pauseSender( "ln" )); - vm5.invoke(() -> WANTestBase.pauseSender( "ln" )); - vm6.invoke(() -> WANTestBase.pauseSender( "ln" )); - vm7.invoke(() -> WANTestBase.pauseSender( "ln" )); - Wait.pause(2000); - //SECOND RUN: keep one thread doing puts to the region - vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); - - //verify region size remains on remote vm and is restricted below a specified limit (i.e. number of puts in the first run) - vm2.invoke(() -> WANTestBase.validateRegionSizeRemainsSame(getTestMethodName() + "_PR", 100 )); - } - - /** - * Normal scenario in which a paused sender is resumed. - * @throws Exception - */ - @Test - public void testParallelPropagationSenderResume() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 8, OrderPolicy.KEY )); - vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 8, OrderPolicy.KEY )); - vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 8, OrderPolicy.KEY )); - vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 8, OrderPolicy.KEY )); - - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5, vm6, vm7); - - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - - //make sure all the senders are running before doing any puts - vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - - //now, the senders are started. So, start the puts - vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); - - //now, pause all of the senders - vm4.invoke(() -> WANTestBase.pauseSender( "ln" )); - vm5.invoke(() -> WANTestBase.pauseSender( "ln" )); - vm6.invoke(() -> WANTestBase.pauseSender( "ln" )); - vm7.invoke(() -> WANTestBase.pauseSender( "ln" )); - - //sleep for a second or two - Wait.pause(2000); - - //resume the senders - vm4.invoke(() -> WANTestBase.resumeSender( "ln" )); - vm5.invoke(() -> WANTestBase.resumeSender( "ln" )); - vm6.invoke(() -> WANTestBase.resumeSender( "ln" )); - vm7.invoke(() -> WANTestBase.resumeSender( "ln" )); - - Wait.pause(2000); - - vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - - //find the region size on remote vm - vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 1000 )); - - } - - /** - * Negative scenario in which a sender that is stopped (and not paused) is resumed. - * Expected: resume is only valid for pause. If a sender which is stopped is resumed, - * it will not be started again. - * - * @throws Exception - */ - @Test - public void testParallelPropagationSenderResumeNegativeScenario() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(nyPort, vm4, vm5); - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - - vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY )); - vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY )); - - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5); - - //wait till the senders are running - vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - - //start the puts - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 100 )); - - //let the queue drain completely - vm4.invoke(() -> WANTestBase.validateQueueContents( "ln", 0 )); - - //stop the senders - vm4.invoke(() -> WANTestBase.stopSender( "ln" )); - vm5.invoke(() -> WANTestBase.stopSender( "ln" )); - - //now, try to resume a stopped sender - vm4.invoke(() -> WANTestBase.resumeSender( "ln" )); - vm5.invoke(() -> WANTestBase.resumeSender( "ln" )); - - //do more puts - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); - - //validate region size on remote vm to contain only the events put in local site - //before the senders are stopped. - vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 100 )); - } - - /** - * Normal scenario in which a sender is stopped. - * @throws Exception - */ - @Test - public void testParallelPropagationSenderStop() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 3, OrderPolicy.KEY )); - vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 3, OrderPolicy.KEY )); - vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 3, OrderPolicy.KEY )); - vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 3, OrderPolicy.KEY )); - - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5, vm6, vm7); - - //make sure all the senders are running before doing any puts - vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - - //FIRST RUN: now, the senders are started. So, do some of the puts - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 100 )); - - //now, stop all of the senders - vm4.invoke(() -> WANTestBase.stopSender( "ln" )); - vm5.invoke(() -> WANTestBase.stopSender( "ln" )); - vm6.invoke(() -> WANTestBase.stopSender( "ln" )); - vm7.invoke(() -> WANTestBase.stopSender( "ln" )); - - //SECOND RUN: keep one thread doing puts - vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); - - //verify region size remains on remote vm and is restricted below a specified limit (number of puts in the first run) - vm2.invoke(() -> WANTestBase.validateRegionSizeRemainsSame(getTestMethodName() + "_PR", 100 )); - } - - /** - * Normal scenario in which a sender is stopped and then started again. - */ - @Test - public void testParallelPropagationSenderStartAfterStop() throws Throwable { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY )); - vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY )); - vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY )); - vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY )); - - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5, vm6, vm7); - - - //make sure all the senders are running before doing any puts - vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - - //FIRST RUN: now, the senders are started. So, do some of the puts - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 200 )); - - //now, stop all of the senders - vm4.invoke(() -> WANTestBase.stopSender( "ln" )); - vm5.invoke(() -> WANTestBase.stopSender( "ln" )); - vm6.invoke(() -> WANTestBase.stopSender( "ln" )); - vm7.invoke(() -> WANTestBase.stopSender( "ln" )); - - Wait.pause(2000); - - //SECOND RUN: do some of the puts after the senders are stopped - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); - - //Region size on remote site should remain same and below the number of puts done in the FIRST RUN - vm2.invoke(() -> WANTestBase.validateRegionSizeRemainsSame(getTestMethodName() + "_PR", 200 )); - - //start the senders again - AsyncInvocation vm4start = vm4.invokeAsync(() -> WANTestBase.startSender( "ln" )); - AsyncInvocation vm5start = vm5.invokeAsync(() -> WANTestBase.startSender( "ln" )); - AsyncInvocation vm6start = vm6.invokeAsync(() -> WANTestBase.startSender( "ln" )); - AsyncInvocation vm7start = vm7.invokeAsync(() -> WANTestBase.startSender( "ln" )); - int START_TIMEOUT = 30000; - vm4start.getResult(START_TIMEOUT); - vm5start.getResult(START_TIMEOUT); - vm6start.getResult(START_TIMEOUT); - vm7start.getResult(START_TIMEOUT); - - //Region size on remote site should remain same and below the number of puts done in the FIRST RUN - vm2.invoke(() -> WANTestBase.validateRegionSizeRemainsSame(getTestMethodName() + "_PR", 200 )); - - //SECOND RUN: do some more puts - AsyncInvocation async = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); - async.join(); - - //verify all the buckets on all the sender nodes are drained - vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - - //verify the events propagate to remote site - vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 1000 )); - - vm4.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 0 )); - vm5.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 0 )); - vm6.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 0 )); - vm7.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 0 )); - } - - /** - * Normal scenario in which a sender is stopped and then started again. - * Differs from above test case in the way that when the sender is starting from - * stopped state, puts are simultaneously happening on the region by another thread. - * @throws Exception - */ - @Ignore("Bug47553") - @Test - public void testParallelPropagationSenderStartAfterStop_Scenario2() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 7, OrderPolicy.KEY )); - vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 7, OrderPolicy.KEY )); - vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 7, OrderPolicy.KEY )); - vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 7, OrderPolicy.KEY )); - - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5, vm6, vm7); - - //make sure all the senders are running before doing any puts - vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - - LogWriterUtils.getLogWriter().info("All the senders are now started"); - - //FIRST RUN: now, the senders are started. So, do some of the puts - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 200 )); - - LogWriterUtils.getLogWriter().info("Done few puts"); - - //now, stop all of the senders - vm4.invoke(() -> WANTestBase.stopSender( "ln" )); - vm5.invoke(() -> WANTestBase.stopSender( "ln" )); - vm6.invoke(() -> WANTestBase.stopSender( "ln" )); - vm7.invoke(() -> WANTestBase.stopSender( "ln" )); - - LogWriterUtils.getLogWriter().info("All the senders are stopped"); - Wait.pause(2000); - - //SECOND RUN: do some of the puts after the senders are stopped - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); - LogWriterUtils.getLogWriter().info("Done some more puts in second run"); - - //Region size on remote site should remain same and below the number of puts done in the FIRST RUN - vm2.invoke(() -> WANTestBase.validateRegionSizeRemainsSame(getTestMethodName() + "_PR", 200 )); - - //SECOND RUN: start async puts on region - AsyncInvocation async = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 5000 )); - LogWriterUtils.getLogWriter().info("Started high number of puts by async thread"); - - LogWriterUtils.getLogWriter().info("Starting the senders at the same time"); - //when puts are happening by another thread, start the senders - startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7); - - LogWriterUtils.getLogWriter().info("All the senders are started"); - - async.join(); - - Wait.pause(2000); - - //verify all the buckets on all the sender nodes are drained - vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - - //verify that the queue size ultimately becomes zero. That means all the events propagate to remote site. - vm4.invoke(() -> WANTestBase.validateQueueContents( "ln", 0 )); - } - - /** - * Normal scenario in which a sender is stopped and then started again on accessor node. - * @throws Exception - */ - @Test - public void testParallelPropagationSenderStartAfterStopOnAccessorNode() throws Throwable { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createPartitionedRegionAsAccessor( - getTestMethodName() + "_PR", "ln", 1, 100)); - vm7.invoke(() -> WANTestBase.createPartitionedRegionAsAccessor( - getTestMethodName() + "_PR", "ln", 1, 100)); - - vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY )); - vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY )); - vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY )); - vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY )); - - startSenderInVMs("ln", vm4, vm5, vm6, vm7); - - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - - - createReceiverInVMs(vm2, vm3); - - //make sure all the senders are not running on accessor nodes and running on non-accessor nodes - vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - - vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - - //FIRST RUN: now, the senders are started. So, do some of the puts - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 200 )); - - //now, stop all of the senders - vm4.invoke(() -> WANTestBase.stopSender( "ln" )); - vm5.invoke(() -> WANTestBase.stopSender( "ln" )); - vm6.invoke(() -> WANTestBase.stopSender( "ln" )); - vm7.invoke(() -> WANTestBase.stopSender( "ln" )); - - //SECOND RUN: do some of the puts after the senders are stopped - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); - - //Region size on remote site should remain same and below the number of puts done in the FIRST RUN - vm2.invoke(() -> WANTestBase.validateRegionSizeRemainsSame(getTestMethodName() + "_PR", 200 )); - - //start the senders again - AsyncInvocation vm4start = vm4.invokeAsync(() -> WANTestBase.startSender( "ln" )); - AsyncInvocation vm5start = vm5.invokeAsync(() -> WANTestBase.startSender( "ln" )); - AsyncInvocation vm6start = vm6.invokeAsync(() -> WANTestBase.startSender( "ln" )); - AsyncInvocation vm7start = vm7.invokeAsync(() -> WANTestBase.startSender( "ln" )); - vm4start.join(); - vm5start.join(); - vm6start.join(); - vm7start.join(); - - vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - - - //Region size on remote site should remain same and below the number of puts done in the FIRST RUN - vm2.invoke(() -> WANTestBase.validateRegionSizeRemainsSame(getTestMethodName() + "_PR", 200 )); - - //SECOND RUN: do some more puts - AsyncInvocation async = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); - async.join(); - - //verify all buckets drained only on non-accessor nodes. - vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - - //verify the events propagate to remote site - vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 1000 )); - } - - - /** - * Normal scenario in which a combinations of start, pause, resume operations - * is tested - */ - @Test - public void testStartPauseResumeParallelGatewaySender() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - LogWriterUtils.getLogWriter().info("Created cache on local site"); - - vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY )); - vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY )); - vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY )); - vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY )); - - LogWriterUtils.getLogWriter().info("Created senders on local site"); - - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - - LogWriterUtils.getLogWriter().info("Created PRs on local site"); - - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - LogWriterUtils.getLogWriter().info("Created PRs on remote site"); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); - LogWriterUtils.getLogWriter().info("Done 1000 puts on local site"); - - //Since puts are already done on userPR, it will have the buckets created. - //During sender start, it will wait until those buckets are created for shadowPR as well. - //Start the senders in async threads, so colocation of shadowPR will be complete and - //missing buckets will be created in PRHARedundancyProvider.createMissingBuckets(). - startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - - LogWriterUtils.getLogWriter().info("Started senders on local site"); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 5000 )); - LogWriterUtils.getLogWriter().info("Done 5000 puts on local site"); - - vm4.invoke(() -> WANTestBase.pauseSender( "ln" )); - vm5.invoke(() -> WANTestBase.pauseSender( "ln" )); - vm6.invoke(() -> WANTestBase.pauseSender( "ln" )); - vm7.invoke(() -> WANTestBase.pauseSender( "ln" )); - LogWriterUtils.getLogWriter().info("Paused senders on local site"); - - vm4.invoke(() -> WANTestBase.verifySenderPausedState( "ln" )); - vm5.invoke(() -> WANTestBase.verifySenderPausedState( "ln" )); - vm6.invoke(() -> WANTestBase.verifySenderPausedState( "ln" )); - vm7.invoke(() -> WANTestBase.verifySenderPausedState( "ln" )); - - AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); - LogWriterUtils.getLogWriter().info("Started 1000 async puts on local site"); - - vm4.invoke(() -> WANTestBase.resumeSender( "ln" )); - vm5.invoke(() -> WANTestBase.resumeSender( "ln" )); - vm6.invoke(() -> WANTestBase.resumeSender( "ln" )); - vm7.invoke(() -> WANTestBase.resumeSender( "ln" )); - LogWriterUtils.getLogWriter().info("Resumed senders on local site"); - - vm4.invoke(() -> WANTestBase.verifySenderResumedState( "ln" )); - vm5.invoke(() -> WANTestBase.verifySenderResumedState( "ln" )); - vm6.invoke(() -> WANTestBase.verifySenderResumedState( "ln" )); - vm7.invoke(() -> WANTestBase.verifySenderResumedState( "ln" )); - - try { - inv1.join(); - } catch (InterruptedException e) { - e.printStackTrace(); - fail("Interrupted the async invocation."); - } - - //verify all buckets drained on all sender nodes. - vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR", 5000 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR", 5000 )); - } -}
