http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderDUnitTest.java new file mode 100644 index 0000000..19edd7d --- /dev/null +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderDUnitTest.java @@ -0,0 +1,737 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.wan.concurrent; + +import com.jayway.awaitility.Awaitility; +import org.junit.experimental.categories.Category; +import org.junit.Test; + +import static org.junit.Assert.*; + +import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase; +import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase; +import com.gemstone.gemfire.test.junit.categories.DistributedTest; + +import com.gemstone.gemfire.cache.EntryExistsException; +import com.gemstone.gemfire.cache.client.ServerOperationException; +import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy; +import com.gemstone.gemfire.cache.wan.GatewaySender; +import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender; +import com.gemstone.gemfire.internal.cache.wan.BatchException70; +import com.gemstone.gemfire.internal.cache.wan.WANTestBase; +import com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderEventProcessor; +import com.gemstone.gemfire.test.dunit.AsyncInvocation; +import com.gemstone.gemfire.test.dunit.IgnoredException; +import com.gemstone.gemfire.test.dunit.LogWriterUtils; +import com.gemstone.gemfire.test.dunit.Wait; + +import java.net.SocketException; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +/** + * Test the functionality of ParallelGatewaySender with multiple dispatchers. + * + */ +@Category(DistributedTest.class) +public class ConcurrentParallelGatewaySenderDUnitTest extends WANTestBase { + + public ConcurrentParallelGatewaySenderDUnitTest() { + super(); + } + + /** + * Normal happy scenario test case. + * checks that all the dispatchers have successfully + * dispatched something individually. + * + * @throws Exception + */ + @Test + public void testParallelPropagationConcurrentArtifacts() throws Exception { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 5, OrderPolicy.PARTITION )); + vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 5, OrderPolicy.PARTITION )); + vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 5, OrderPolicy.PARTITION )); + vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 5, OrderPolicy.PARTITION )); + + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + + //before doing any puts, let the senders be running in order to ensure that + //not a single event will be lost + vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + + try { + // set the test hook to find out dispatched events by each of the + // concurrent dispatcher + vm4.invoke(() -> ConcurrentParallelGatewaySenderDUnitTest.setTestHook("ln", Boolean.TRUE )); + vm5.invoke(() -> ConcurrentParallelGatewaySenderDUnitTest.setTestHook("ln", Boolean.TRUE )); + vm6.invoke(() -> ConcurrentParallelGatewaySenderDUnitTest.setTestHook("ln", Boolean.TRUE )); + vm7.invoke(() -> ConcurrentParallelGatewaySenderDUnitTest.setTestHook("ln", Boolean.TRUE )); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", + 1000 )); + + // verify all buckets drained on all sender nodes. + vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" )); + vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" )); + vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" )); + vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 1000 )); + + int dispatched1 = (Integer)vm4.invoke(() -> WANTestBase.verifyAndGetEventsDispatchedByConcurrentDispatchers( "ln" )); + int dispatched2 = (Integer)vm5.invoke(() -> WANTestBase.verifyAndGetEventsDispatchedByConcurrentDispatchers( "ln" )); + int dispatched3 = (Integer)vm6.invoke(() -> WANTestBase.verifyAndGetEventsDispatchedByConcurrentDispatchers( "ln" )); + int dispatched4 = (Integer)vm7.invoke(() -> WANTestBase.verifyAndGetEventsDispatchedByConcurrentDispatchers( "ln" )); + + assertEquals(1000, dispatched1 + dispatched2 + dispatched3 + dispatched4); + } + finally { + vm4.invoke(() -> ConcurrentParallelGatewaySenderDUnitTest.setTestHook("ln", Boolean.FALSE )); + vm5.invoke(() -> ConcurrentParallelGatewaySenderDUnitTest.setTestHook("ln", Boolean.FALSE )); + vm6.invoke(() -> ConcurrentParallelGatewaySenderDUnitTest.setTestHook("ln", Boolean.FALSE )); + vm7.invoke(() -> ConcurrentParallelGatewaySenderDUnitTest.setTestHook("ln", Boolean.FALSE )); + } + } + + /** + * Normal happy scenario test case. + * @throws Exception + */ + @Test + public void testParallelPropagation() throws Exception { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 5, OrderPolicy.PARTITION )); + vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 5, OrderPolicy.PARTITION )); + vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 5, OrderPolicy.PARTITION )); + vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 5, OrderPolicy.PARTITION )); + + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + + //before doing any puts, let the senders be running in order to ensure that + //not a single event will be lost + vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", + 1000 )); + + //verify all buckets drained on all sender nodes. + vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 1000 )); + } + + + /** + * Normal happy scenario test case when bucket division tests boundary cases. + * @throws Exception + */ + @Test + public void testParallelPropagationWithUnEqualBucketDivision() throws Exception { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 7, OrderPolicy.PARTITION )); + vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 7, OrderPolicy.PARTITION )); + vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 7, OrderPolicy.PARTITION )); + vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 7, OrderPolicy.PARTITION )); + + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + //before doing any puts, let the senders be running in order to ensure that + //not a single event will be lost + vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", + 1000 )); + + //verify all buckets drained on all sender nodes. + vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 1000 )); + } + + + /** + * Initially the GatewayReceiver is not up but later it comes up. + * We verify that + * @throws Exception + */ + @Test + public void testParallelPropagation_withoutRemoteSite() throws Exception { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + //keep a larger batch to minimize number of exception occurrences in the log + vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 300, false, false, null, true, 6, OrderPolicy.PARTITION )); + vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 300, false, false, null, true, 6, OrderPolicy.PARTITION )); + vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 300, false, false, null, true, 6, OrderPolicy.PARTITION )); + vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 300, false, false, null, true, 6, OrderPolicy.PARTITION )); + + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + //make sure all the senders are running before doing any puts + vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", + 1000 )); + + createCacheInVMs(nyPort, vm2, vm3); + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + + createReceiverInVMs(vm2, vm3); + + //verify all buckets drained on all sender nodes. + vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + + // Just making sure that though the remote site is started later, + // remote site is still able to get the data. Since the receivers are + // started before creating partition region it is quite possible that the + // region may loose some of the events. This needs to be handled by the code + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 1000 )); + } + + /** + * Testing for colocated region with orderPolicy Partition + */ + @Test + public void testParallelPropagationColocatedPartitionedRegions() { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 5, OrderPolicy.PARTITION )); + vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 5, OrderPolicy.PARTITION )); + vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 5, OrderPolicy.PARTITION )); + vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 5, OrderPolicy.PARTITION )); + + + vm4.invoke(() -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 1, 100, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 1, 100, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 1, 100, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 1, 100, isOffHeap() )); + + + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + vm2.invoke(() -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 1, 100, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 1, 100, isOffHeap() )); + + //before doing any puts, let the senders be running in order to ensure that + //not a single event will be lost + vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + + vm4.invoke(() -> WANTestBase.putcolocatedPartitionedRegion( 1000 )); + + //verify all buckets drained on all sender nodes. + vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + customerRegionName, 1000 )); + vm2.invoke(() -> WANTestBase.validateRegionSize( + orderRegionName, 1000 )); + vm2.invoke(() -> WANTestBase.validateRegionSize( + shipmentRegionName, 1000 )); + } + + /** + * Local and remote sites are up and running. + * Local site cache is closed and the site is built again. + * Puts are done to local site. + * Expected: Remote site should receive all the events put after the local + * site was built back. + * + * @throws Exception + */ + @Test + public void testParallelPropagationWithLocalCacheClosedAndRebuilt() throws Exception { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 6, OrderPolicy.PARTITION )); + vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 6, OrderPolicy.PARTITION )); + vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 6, OrderPolicy.PARTITION)); + vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 6, OrderPolicy.PARTITION )); + + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + + //before doing any puts, let the senders be running in order to ensure that + //not a single event will be lost + vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", + 1000 )); + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 1000 )); + //-------------------Close and rebuild local site --------------------------------- + + vm4.invoke(() -> WANTestBase.killSender()); + vm5.invoke(() -> WANTestBase.killSender()); + vm6.invoke(() -> WANTestBase.killSender()); + vm7.invoke(() -> WANTestBase.killSender()); + + Integer regionSize = + (Integer) vm2.invoke(() -> WANTestBase.getRegionSize(getTestMethodName() + "_PR" )); + LogWriterUtils.getLogWriter().info("Region size on remote is: " + regionSize); + + vm4.invoke(() -> WANTestBase.createCache( lnPort )); + vm5.invoke(() -> WANTestBase.createCache( lnPort )); + vm6.invoke(() -> WANTestBase.createCache( lnPort )); + vm7.invoke(() -> WANTestBase.createCache( lnPort )); + + vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 6, OrderPolicy.PARTITION )); + vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 6, OrderPolicy.PARTITION )); + vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 6, OrderPolicy.PARTITION )); + vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 6, OrderPolicy.PARTITION )); + + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + //------------------------------------------------------------------------------------ + + IgnoredException.addIgnoredException(EntryExistsException.class.getName()); + IgnoredException.addIgnoredException(BatchException70.class.getName()); + IgnoredException.addIgnoredException(ServerOperationException.class.getName()); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 10000 )); + + //verify all buckets drained on all sender nodes. + vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 10000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 10000 )); + } + + /** + * Colocated regions using ConcurrentParallelGatewaySender. + * Normal scenario + * @throws Exception + */ + @Test + public void testParallelColocatedPropagation() throws Exception { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 7, OrderPolicy.KEY )); + vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 7, OrderPolicy.KEY )); + vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 7, OrderPolicy.KEY )); + vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 7, OrderPolicy.KEY )); + + vm4.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), "ln", 1, 100, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), "ln", 1, 100, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), "ln", 1, 100, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), "ln", 1, 100, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + vm2.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), null, 1, 100, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), null, 1, 100, isOffHeap() )); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 1000 )); + + //verify all buckets drained on all sender nodes. + vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName(), 1000 )); + } + + + /** + * Colocated regions using ConcurrentParallelGatewaySender. + * Normal scenario + * @throws Exception + */ + @Test + public void testParallelColocatedPropagationOrderPolicyPartition() throws Exception { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 7, OrderPolicy.PARTITION )); + vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 7, OrderPolicy.PARTITION )); + vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 7, OrderPolicy.PARTITION )); + vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 7, OrderPolicy.PARTITION )); + + vm4.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), "ln", 1, 100, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), "ln", 1, 100, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), "ln", 1, 100, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), "ln", 1, 100, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + vm2.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), null, 1, 100, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), null, 1, 100, isOffHeap() )); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 1000 )); + + //verify all buckets drained on all sender nodes. + vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName(), 1000 )); + } + + @Test + public void testPartitionedParallelPropagationHA() throws Exception { + IgnoredException.addIgnoredException(SocketException.class.getName()); // for Connection reset + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 6, OrderPolicy.KEY )); + vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 6, OrderPolicy.KEY )); + vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 6, OrderPolicy.KEY )); + vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 6, OrderPolicy.KEY )); + + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 2, 100, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 2, 100, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 2, 100, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 2, 100, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + + AsyncInvocation inv1 = vm7.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 5000 )); + vm2.invoke(() -> Awaitility.await().atMost(30000, TimeUnit.MILLISECONDS).until(() -> + assertEquals("Failure in waiting for at least 10 events to be received by the receiver", + true, (getRegionSize(getTestMethodName() + "_PR") > 10 )))); + AsyncInvocation inv2 = vm4.invokeAsync(() -> WANTestBase.killSender()); + AsyncInvocation inv3 = vm6.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 10000 )); + vm2.invoke(() -> Awaitility.await().atMost(30000, TimeUnit.MILLISECONDS).until(() -> + assertEquals("Failure in waiting for additional 2000 events to be received by the receiver ", + true,getRegionSize(getTestMethodName() + "_PR") > 7000 ))); + AsyncInvocation inv4 = vm5.invokeAsync(() -> WANTestBase.killSender()); + inv1.join(); + inv2.join(); + inv3.join(); + inv4.join(); + + vm6.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 10000 )); + vm7.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 10000 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 10000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 10000 )); + + //verify all buckets drained on the sender nodes that up and running. + vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + } + + @Test + public void testWANPDX_PR_MultipleVM_ConcurrentParallelSender() { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2); + vm2.invoke(() -> WANTestBase.createReceiver()); + + createCacheInVMs(lnPort, vm3, vm4); + + vm3.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY )); + vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY )); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 0, 2, isOffHeap())); + + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 0, 2, isOffHeap())); + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 0, 2, isOffHeap())); + + startSenderInVMs("ln", vm3, vm4); + + vm3.invoke(() -> WANTestBase.doPutsPDXSerializable( getTestMethodName() + "_PR", + 10 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize_PDX( + getTestMethodName() + "_PR", 10 )); + } + + @Test + public void testWANPDX_PR_MultipleVM_ConcurrentParallelSender_StartedLater() { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + vm2.invoke(() -> WANTestBase.createReceiver_PDX( nyPort )); + + vm3.invoke(() -> WANTestBase.createCache_PDX( lnPort )); + vm4.invoke(() -> WANTestBase.createCache_PDX( lnPort )); + + vm3.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY )); + vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY )); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 0, 2, isOffHeap())); + + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 0, 2, isOffHeap())); + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 0, 2, isOffHeap())); + + vm3.invoke(() -> WANTestBase.doPutsPDXSerializable( getTestMethodName() + "_PR", + 10 )); + + startSenderInVMsAsync("ln", vm3, vm4); + + vm4.invoke(() -> WANTestBase.doPutsPDXSerializable( getTestMethodName() + "_PR", + 40 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize_PDX( + getTestMethodName() + "_PR", 40 )); + } + + public static void setTestHook(String senderId, boolean hook) { + Set<GatewaySender> senders = cache.getGatewaySenders(); + GatewaySender sender = null; + for (GatewaySender s : senders) { + if (s.getId().equals(senderId)) { + sender = s; + break; + } + } + ConcurrentParallelGatewaySenderEventProcessor cProc = (ConcurrentParallelGatewaySenderEventProcessor)((AbstractGatewaySender)sender) + .getEventProcessor(); + if (cProc == null) return; + cProc.TEST_HOOK = hook; + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOffHeapDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOffHeapDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOffHeapDUnitTest.java new file mode 100644 index 0000000..da8a631 --- /dev/null +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOffHeapDUnitTest.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.wan.concurrent; + +import org.junit.experimental.categories.Category; +import org.junit.Test; + +import static org.junit.Assert.*; + +import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase; +import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase; +import com.gemstone.gemfire.test.junit.categories.DistributedTest; + +@SuppressWarnings("serial") +@Category(DistributedTest.class) +public class ConcurrentParallelGatewaySenderOffHeapDUnitTest extends + ConcurrentParallelGatewaySenderDUnitTest { + + public ConcurrentParallelGatewaySenderOffHeapDUnitTest() { + super(); + } + + @Override + public boolean isOffHeap() { + return true; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_1_DUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_1_DUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_1_DUnitTest.java new file mode 100644 index 0000000..ab1c06b --- /dev/null +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_1_DUnitTest.java @@ -0,0 +1,796 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.wan.concurrent; + +import org.junit.Ignore; +import org.junit.experimental.categories.Category; +import org.junit.Test; + +import static org.junit.Assert.*; + +import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase; +import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase; +import com.gemstone.gemfire.test.junit.categories.DistributedTest; + +import org.junit.experimental.categories.Category; + +import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy; +import com.gemstone.gemfire.internal.cache.wan.WANTestBase; +import com.gemstone.gemfire.test.dunit.AsyncInvocation; +import com.gemstone.gemfire.test.dunit.IgnoredException; +import com.gemstone.gemfire.test.dunit.LogWriterUtils; +import com.gemstone.gemfire.test.dunit.Wait; +import com.gemstone.gemfire.test.junit.categories.FlakyTest; + +@Category(DistributedTest.class) +public class ConcurrentParallelGatewaySenderOperation_1_DUnitTest extends WANTestBase { + private static final long serialVersionUID = 1L; + + public ConcurrentParallelGatewaySenderOperation_1_DUnitTest() { + super(); + } + + @Override + protected final void postSetUpWANTestBase() throws Exception { + IgnoredException.addIgnoredException("Broken pipe"); + IgnoredException.addIgnoredException("Connection reset"); + IgnoredException.addIgnoredException("Unexpected IOException"); + } + + @Test + public void testParallelGatewaySenderWithoutStarting() { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 6, OrderPolicy.KEY )); + vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 6, OrderPolicy.KEY )); + vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 6, OrderPolicy.KEY )); + vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 6, OrderPolicy.KEY )); + + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); + + vm4.invoke(() -> WANTestBase.verifySenderStoppedState( "ln" )); + vm5.invoke(() -> WANTestBase.verifySenderStoppedState( "ln" )); + vm6.invoke(() -> WANTestBase.verifySenderStoppedState( "ln" )); + vm7.invoke(() -> WANTestBase.verifySenderStoppedState( "ln" )); + + vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 0 )); + vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 0 )); + } + + /** + * Defect 44323 (ParallelGatewaySender should not be started on Accessor node) + */ + @Test + public void testParallelGatewaySenderStartOnAccessorNode() { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 7, OrderPolicy.KEY )); + vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 7, OrderPolicy.KEY )); + vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 7, OrderPolicy.KEY )); + vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 7, OrderPolicy.KEY )); + + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createPartitionedRegionAsAccessor( + getTestMethodName() + "_PR", "ln", 1, 100 )); + vm7.invoke(() -> WANTestBase.createPartitionedRegionAsAccessor( + getTestMethodName() + "_PR", "ln", 1, 100 )); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + + //start the senders + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); + + vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + + vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 1000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 1000 )); + } + + + /** + * Normal scenario in which the sender is paused in between. + * @throws Exception + */ + @Test + public void testParallelPropagationSenderPause() throws Exception { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY )); + vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY )); + vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY )); + vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY )); + + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + + //make sure all the senders are running before doing any puts + vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + + //FIRST RUN: now, the senders are started. So, start the puts + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 100 )); + + //now, pause all of the senders + vm4.invoke(() -> WANTestBase.pauseSender( "ln" )); + vm5.invoke(() -> WANTestBase.pauseSender( "ln" )); + vm6.invoke(() -> WANTestBase.pauseSender( "ln" )); + vm7.invoke(() -> WANTestBase.pauseSender( "ln" )); + Wait.pause(2000); + //SECOND RUN: keep one thread doing puts to the region + vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); + + //verify region size remains on remote vm and is restricted below a specified limit (i.e. number of puts in the first run) + vm2.invoke(() -> WANTestBase.validateRegionSizeRemainsSame(getTestMethodName() + "_PR", 100 )); + } + + /** + * Normal scenario in which a paused sender is resumed. + * @throws Exception + */ + @Test + public void testParallelPropagationSenderResume() throws Exception { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 8, OrderPolicy.KEY )); + vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 8, OrderPolicy.KEY )); + vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 8, OrderPolicy.KEY )); + vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 8, OrderPolicy.KEY )); + + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + + //make sure all the senders are running before doing any puts + vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + + //now, the senders are started. So, start the puts + vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); + + //now, pause all of the senders + vm4.invoke(() -> WANTestBase.pauseSender( "ln" )); + vm5.invoke(() -> WANTestBase.pauseSender( "ln" )); + vm6.invoke(() -> WANTestBase.pauseSender( "ln" )); + vm7.invoke(() -> WANTestBase.pauseSender( "ln" )); + + //sleep for a second or two + Wait.pause(2000); + + //resume the senders + vm4.invoke(() -> WANTestBase.resumeSender( "ln" )); + vm5.invoke(() -> WANTestBase.resumeSender( "ln" )); + vm6.invoke(() -> WANTestBase.resumeSender( "ln" )); + vm7.invoke(() -> WANTestBase.resumeSender( "ln" )); + + Wait.pause(2000); + + vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + + //find the region size on remote vm + vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 1000 )); + + } + + /** + * Negative scenario in which a sender that is stopped (and not paused) is resumed. + * Expected: resume is only valid for pause. If a sender which is stopped is resumed, + * it will not be started again. + * + * @throws Exception + */ + @Test + public void testParallelPropagationSenderResumeNegativeScenario() throws Exception { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(nyPort, vm4, vm5); + vm4.invoke(() -> WANTestBase.createCache( lnPort )); + vm5.invoke(() -> WANTestBase.createCache( lnPort )); + + vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY )); + vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY )); + + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5); + + //wait till the senders are running + vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + + //start the puts + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 100 )); + + //let the queue drain completely + vm4.invoke(() -> WANTestBase.validateQueueContents( "ln", 0 )); + + //stop the senders + vm4.invoke(() -> WANTestBase.stopSender( "ln" )); + vm5.invoke(() -> WANTestBase.stopSender( "ln" )); + + //now, try to resume a stopped sender + vm4.invoke(() -> WANTestBase.resumeSender( "ln" )); + vm5.invoke(() -> WANTestBase.resumeSender( "ln" )); + + //do more puts + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); + + //validate region size on remote vm to contain only the events put in local site + //before the senders are stopped. + vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 100 )); + } + + /** + * Normal scenario in which a sender is stopped. + * @throws Exception + */ + @Test + public void testParallelPropagationSenderStop() throws Exception { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 3, OrderPolicy.KEY )); + vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 3, OrderPolicy.KEY )); + vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 3, OrderPolicy.KEY )); + vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 3, OrderPolicy.KEY )); + + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + //make sure all the senders are running before doing any puts + vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + + //FIRST RUN: now, the senders are started. So, do some of the puts + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 100 )); + + //now, stop all of the senders + vm4.invoke(() -> WANTestBase.stopSender( "ln" )); + vm5.invoke(() -> WANTestBase.stopSender( "ln" )); + vm6.invoke(() -> WANTestBase.stopSender( "ln" )); + vm7.invoke(() -> WANTestBase.stopSender( "ln" )); + + //SECOND RUN: keep one thread doing puts + vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); + + //verify region size remains on remote vm and is restricted below a specified limit (number of puts in the first run) + vm2.invoke(() -> WANTestBase.validateRegionSizeRemainsSame(getTestMethodName() + "_PR", 100 )); + } + + /** + * Normal scenario in which a sender is stopped and then started again. + */ + @Test + public void testParallelPropagationSenderStartAfterStop() throws Throwable { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY )); + vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY )); + vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY )); + vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY )); + + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + + //make sure all the senders are running before doing any puts + vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + + //FIRST RUN: now, the senders are started. So, do some of the puts + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 200 )); + + //now, stop all of the senders + vm4.invoke(() -> WANTestBase.stopSender( "ln" )); + vm5.invoke(() -> WANTestBase.stopSender( "ln" )); + vm6.invoke(() -> WANTestBase.stopSender( "ln" )); + vm7.invoke(() -> WANTestBase.stopSender( "ln" )); + + Wait.pause(2000); + + //SECOND RUN: do some of the puts after the senders are stopped + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); + + //Region size on remote site should remain same and below the number of puts done in the FIRST RUN + vm2.invoke(() -> WANTestBase.validateRegionSizeRemainsSame(getTestMethodName() + "_PR", 200 )); + + //start the senders again + AsyncInvocation vm4start = vm4.invokeAsync(() -> WANTestBase.startSender( "ln" )); + AsyncInvocation vm5start = vm5.invokeAsync(() -> WANTestBase.startSender( "ln" )); + AsyncInvocation vm6start = vm6.invokeAsync(() -> WANTestBase.startSender( "ln" )); + AsyncInvocation vm7start = vm7.invokeAsync(() -> WANTestBase.startSender( "ln" )); + int START_TIMEOUT = 30000; + vm4start.getResult(START_TIMEOUT); + vm5start.getResult(START_TIMEOUT); + vm6start.getResult(START_TIMEOUT); + vm7start.getResult(START_TIMEOUT); + + //Region size on remote site should remain same and below the number of puts done in the FIRST RUN + vm2.invoke(() -> WANTestBase.validateRegionSizeRemainsSame(getTestMethodName() + "_PR", 200 )); + + //SECOND RUN: do some more puts + AsyncInvocation async = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); + async.join(); + + //verify all the buckets on all the sender nodes are drained + vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + + //verify the events propagate to remote site + vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 1000 )); + + vm4.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 0 )); + vm5.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 0 )); + vm6.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 0 )); + vm7.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 0 )); + } + + /** + * Normal scenario in which a sender is stopped and then started again. + * Differs from above test case in the way that when the sender is starting from + * stopped state, puts are simultaneously happening on the region by another thread. + * @throws Exception + */ + @Ignore("Bug47553") + @Test + public void testParallelPropagationSenderStartAfterStop_Scenario2() throws Exception { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 7, OrderPolicy.KEY )); + vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 7, OrderPolicy.KEY )); + vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 7, OrderPolicy.KEY )); + vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 7, OrderPolicy.KEY )); + + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + //make sure all the senders are running before doing any puts + vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + + LogWriterUtils.getLogWriter().info("All the senders are now started"); + + //FIRST RUN: now, the senders are started. So, do some of the puts + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 200 )); + + LogWriterUtils.getLogWriter().info("Done few puts"); + + //now, stop all of the senders + vm4.invoke(() -> WANTestBase.stopSender( "ln" )); + vm5.invoke(() -> WANTestBase.stopSender( "ln" )); + vm6.invoke(() -> WANTestBase.stopSender( "ln" )); + vm7.invoke(() -> WANTestBase.stopSender( "ln" )); + + LogWriterUtils.getLogWriter().info("All the senders are stopped"); + Wait.pause(2000); + + //SECOND RUN: do some of the puts after the senders are stopped + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); + LogWriterUtils.getLogWriter().info("Done some more puts in second run"); + + //Region size on remote site should remain same and below the number of puts done in the FIRST RUN + vm2.invoke(() -> WANTestBase.validateRegionSizeRemainsSame(getTestMethodName() + "_PR", 200 )); + + //SECOND RUN: start async puts on region + AsyncInvocation async = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 5000 )); + LogWriterUtils.getLogWriter().info("Started high number of puts by async thread"); + + LogWriterUtils.getLogWriter().info("Starting the senders at the same time"); + //when puts are happening by another thread, start the senders + startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7); + + LogWriterUtils.getLogWriter().info("All the senders are started"); + + async.join(); + + Wait.pause(2000); + + //verify all the buckets on all the sender nodes are drained + vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + + //verify that the queue size ultimately becomes zero. That means all the events propagate to remote site. + vm4.invoke(() -> WANTestBase.validateQueueContents( "ln", 0 )); + } + + /** + * Normal scenario in which a sender is stopped and then started again on accessor node. + * @throws Exception + */ + @Test + public void testParallelPropagationSenderStartAfterStopOnAccessorNode() throws Throwable { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createPartitionedRegionAsAccessor( + getTestMethodName() + "_PR", "ln", 1, 100)); + vm7.invoke(() -> WANTestBase.createPartitionedRegionAsAccessor( + getTestMethodName() + "_PR", "ln", 1, 100)); + + vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY )); + vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY )); + vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY )); + vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY )); + + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + + + createReceiverInVMs(vm2, vm3); + + //make sure all the senders are not running on accessor nodes and running on non-accessor nodes + vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + + vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + + //FIRST RUN: now, the senders are started. So, do some of the puts + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 200 )); + + //now, stop all of the senders + vm4.invoke(() -> WANTestBase.stopSender( "ln" )); + vm5.invoke(() -> WANTestBase.stopSender( "ln" )); + vm6.invoke(() -> WANTestBase.stopSender( "ln" )); + vm7.invoke(() -> WANTestBase.stopSender( "ln" )); + + //SECOND RUN: do some of the puts after the senders are stopped + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); + + //Region size on remote site should remain same and below the number of puts done in the FIRST RUN + vm2.invoke(() -> WANTestBase.validateRegionSizeRemainsSame(getTestMethodName() + "_PR", 200 )); + + //start the senders again + AsyncInvocation vm4start = vm4.invokeAsync(() -> WANTestBase.startSender( "ln" )); + AsyncInvocation vm5start = vm5.invokeAsync(() -> WANTestBase.startSender( "ln" )); + AsyncInvocation vm6start = vm6.invokeAsync(() -> WANTestBase.startSender( "ln" )); + AsyncInvocation vm7start = vm7.invokeAsync(() -> WANTestBase.startSender( "ln" )); + vm4start.join(); + vm5start.join(); + vm6start.join(); + vm7start.join(); + + vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + + + //Region size on remote site should remain same and below the number of puts done in the FIRST RUN + vm2.invoke(() -> WANTestBase.validateRegionSizeRemainsSame(getTestMethodName() + "_PR", 200 )); + + //SECOND RUN: do some more puts + AsyncInvocation async = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); + async.join(); + + //verify all buckets drained only on non-accessor nodes. + vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + + //verify the events propagate to remote site + vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 1000 )); + } + + + /** + * Normal scenario in which a combinations of start, pause, resume operations + * is tested + */ + @Test + public void testStartPauseResumeParallelGatewaySender() throws Exception { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + LogWriterUtils.getLogWriter().info("Created cache on local site"); + + vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY )); + vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY )); + vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY )); + vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY )); + + LogWriterUtils.getLogWriter().info("Created senders on local site"); + + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + + LogWriterUtils.getLogWriter().info("Created PRs on local site"); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + LogWriterUtils.getLogWriter().info("Created PRs on remote site"); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); + LogWriterUtils.getLogWriter().info("Done 1000 puts on local site"); + + //Since puts are already done on userPR, it will have the buckets created. + //During sender start, it will wait until those buckets are created for shadowPR as well. + //Start the senders in async threads, so colocation of shadowPR will be complete and + //missing buckets will be created in PRHARedundancyProvider.createMissingBuckets(). + startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + + LogWriterUtils.getLogWriter().info("Started senders on local site"); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 5000 )); + LogWriterUtils.getLogWriter().info("Done 5000 puts on local site"); + + vm4.invoke(() -> WANTestBase.pauseSender( "ln" )); + vm5.invoke(() -> WANTestBase.pauseSender( "ln" )); + vm6.invoke(() -> WANTestBase.pauseSender( "ln" )); + vm7.invoke(() -> WANTestBase.pauseSender( "ln" )); + LogWriterUtils.getLogWriter().info("Paused senders on local site"); + + vm4.invoke(() -> WANTestBase.verifySenderPausedState( "ln" )); + vm5.invoke(() -> WANTestBase.verifySenderPausedState( "ln" )); + vm6.invoke(() -> WANTestBase.verifySenderPausedState( "ln" )); + vm7.invoke(() -> WANTestBase.verifySenderPausedState( "ln" )); + + AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); + LogWriterUtils.getLogWriter().info("Started 1000 async puts on local site"); + + vm4.invoke(() -> WANTestBase.resumeSender( "ln" )); + vm5.invoke(() -> WANTestBase.resumeSender( "ln" )); + vm6.invoke(() -> WANTestBase.resumeSender( "ln" )); + vm7.invoke(() -> WANTestBase.resumeSender( "ln" )); + LogWriterUtils.getLogWriter().info("Resumed senders on local site"); + + vm4.invoke(() -> WANTestBase.verifySenderResumedState( "ln" )); + vm5.invoke(() -> WANTestBase.verifySenderResumedState( "ln" )); + vm6.invoke(() -> WANTestBase.verifySenderResumedState( "ln" )); + vm7.invoke(() -> WANTestBase.verifySenderResumedState( "ln" )); + + try { + inv1.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + fail("Interrupted the async invocation."); + } + + //verify all buckets drained on all sender nodes. + vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 5000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 5000 )); + } +}
