http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationConcurrentOpsDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationConcurrentOpsDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationConcurrentOpsDUnitTest.java deleted file mode 100644 index 427054a..0000000 --- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationConcurrentOpsDUnitTest.java +++ /dev/null @@ -1,285 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.wan.parallel; - -import org.junit.experimental.categories.Category; -import org.junit.Test; - -import static org.junit.Assert.*; - -import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase; -import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase; -import com.gemstone.gemfire.test.junit.categories.DistributedTest; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; - -import com.gemstone.gemfire.internal.cache.wan.WANTestBase; -import com.gemstone.gemfire.test.dunit.AsyncInvocation; -import com.gemstone.gemfire.test.dunit.Wait; - -@Category(DistributedTest.class) -public class ParallelWANPropagationConcurrentOpsDUnitTest extends WANTestBase { - - private static final long serialVersionUID = 1L; - - public ParallelWANPropagationConcurrentOpsDUnitTest() { - super(); - } - - /** - * Normal propagation scenario test case for a PR with only one bucket. - * This has been added for bug# 44284. - * @throws Exception - */ - @Test - public void testParallelPropagationWithSingleBucketPR() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5); - - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 1, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 1, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5); - - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 1, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 1, isOffHeap() )); - - //before doing any puts, let the senders be running in order to ensure that - //not a single event will be lost - vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - - //pause the senders - vm4.invoke(() -> WANTestBase.pauseSender( "ln" )); - vm5.invoke(() -> WANTestBase.pauseSender( "ln" )); - - Wait.pause(5000); - - AsyncInvocation async1 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 700 )); - AsyncInvocation async2 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); - AsyncInvocation async3 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 800 )); - AsyncInvocation async4 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); - - async1.join(); - async2.join(); - async3.join(); - async4.join(); - - int queueSize = (Integer) vm4.invoke(() -> WANTestBase.getQueueContentSize( "ln" )); - assertEquals("Actual queue size is not matching with the expected", 3500, queueSize); - - //resume the senders now - vm4.invoke(() -> WANTestBase.resumeSender( "ln" )); - vm5.invoke(() -> WANTestBase.resumeSender( "ln" )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR", 1000 )); - } - - /** - * Normal propagation scenario test case for a PR with less number of buckets. - * Buckets have been kept to 10 for this test. - * This has been added for bug# 44287. - * @throws Exception - */ - @Test - public void testParallelPropagationWithLowNumberOfBuckets() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5); - - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 10, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 10, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5); - - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 10, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 10, isOffHeap() )); - - //before doing any puts, let the senders be running in order to ensure that - //not a single event will be lost - vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - - AsyncInvocation async1 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 700 )); - AsyncInvocation async2 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); - AsyncInvocation async3 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 800 )); - AsyncInvocation async4 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); - - async1.join(); - async2.join(); - async3.join(); - async4.join(); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR", 1000 )); - } - - @Test - public void testParallelQueueDrainInOrder_PR() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2); - vm2.invoke(() -> WANTestBase.createReceiver()); - - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 3, 4, isOffHeap() )); - - vm2.invoke(() -> WANTestBase.addListenerOnRegion(getTestMethodName() + "_PR")); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - vm6.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - vm7.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 3, 4, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 3, 4, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 3, 4, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 3, 4, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.addQueueListener( "ln", true)); - vm5.invoke(() -> WANTestBase.addQueueListener( "ln", true)); - vm6.invoke(() -> WANTestBase.addQueueListener( "ln", true)); - vm7.invoke(() -> WANTestBase.addQueueListener( "ln", true)); - - Wait.pause(2000); - vm4.invoke(() -> WANTestBase.pauseSender( "ln")); - vm5.invoke(() -> WANTestBase.pauseSender( "ln")); - vm6.invoke(() -> WANTestBase.pauseSender( "ln")); - vm7.invoke(() -> WANTestBase.pauseSender( "ln")); - - Wait.pause(2000); - - vm6.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", - 4 )); - vm4.invoke(() -> WANTestBase.addListenerOnBucketRegion(getTestMethodName() + "_PR", 4)); - vm5.invoke(() -> WANTestBase.addListenerOnBucketRegion(getTestMethodName() + "_PR", 4)); - vm6.invoke(() -> WANTestBase.addListenerOnBucketRegion(getTestMethodName() + "_PR", 4 )); - vm7.invoke(() -> WANTestBase.addListenerOnBucketRegion(getTestMethodName() + "_PR", 4)); - - vm4.invoke(() -> WANTestBase.addListenerOnQueueBucketRegion( "ln" , 4)); - vm5.invoke(() -> WANTestBase.addListenerOnQueueBucketRegion( "ln" , 4)); - vm6.invoke(() -> WANTestBase.addListenerOnQueueBucketRegion( "ln" , 4 )); - vm7.invoke(() -> WANTestBase.addListenerOnQueueBucketRegion( "ln" , 4)); - - vm6.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", - 1000 )); - - vm6.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR", 1000 )); - - HashMap vm4BRUpdates = (HashMap)vm4.invoke(() -> WANTestBase.checkBR(getTestMethodName() + "_PR", 4)); - HashMap vm5BRUpdates = (HashMap)vm5.invoke(() -> WANTestBase.checkBR(getTestMethodName() + "_PR", 4)); - HashMap vm6BRUpdates = (HashMap)vm6.invoke(() -> WANTestBase.checkBR(getTestMethodName() + "_PR", 4)); - HashMap vm7BRUpdates = (HashMap)vm7.invoke(() -> WANTestBase.checkBR(getTestMethodName() + "_PR", 4)); - - List b0SenderUpdates = (List)vm4BRUpdates.get("Create0"); - List b1SenderUpdates = (List)vm4BRUpdates.get("Create1"); - List b2SenderUpdates = (List)vm4BRUpdates.get("Create2"); - List b3SenderUpdates = (List)vm4BRUpdates.get("Create3"); - - HashMap vm4QueueBRUpdates = (HashMap)vm4.invoke(() -> WANTestBase.checkQueue_BR("ln", 4)); - HashMap vm5QueueBRUpdates = (HashMap)vm5.invoke(() -> WANTestBase.checkQueue_BR("ln", 4)); - HashMap vm6QueueBRUpdates = (HashMap)vm6.invoke(() -> WANTestBase.checkQueue_BR("ln", 4)); - HashMap vm7QueueBRUpdates = (HashMap)vm7.invoke(() -> WANTestBase.checkQueue_BR("ln", 4)); - - assertEquals(vm4QueueBRUpdates, vm5QueueBRUpdates); - assertEquals(vm4QueueBRUpdates, vm6QueueBRUpdates); - assertEquals(vm4QueueBRUpdates, vm7QueueBRUpdates); - - vm4.invoke(() -> WANTestBase.resumeSender( "ln")); - vm5.invoke(() -> WANTestBase.resumeSender( "ln")); - vm6.invoke(() -> WANTestBase.resumeSender( "ln")); - vm7.invoke(() -> WANTestBase.resumeSender( "ln")); - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR", 1000 )); - HashMap receiverUpdates = (HashMap)vm2.invoke(() -> WANTestBase.checkPR( - getTestMethodName() + "_PR")); - List<Long> createList = (List)receiverUpdates.get("Create"); - ArrayList<Long> b0ReceiverUpdates = new ArrayList<Long>(); - ArrayList<Long> b1ReceiverUpdates = new ArrayList<Long>(); - ArrayList<Long> b2ReceiverUpdates = new ArrayList<Long>(); - ArrayList<Long> b3ReceiverUpdates = new ArrayList<Long>(); - for (Long key : createList) { - long mod = key % 4; - if (mod == 0) { - b0ReceiverUpdates.add(key); - } - else if (mod == 1) { - b1ReceiverUpdates.add(key); - } - else if (mod == 2) { - b2ReceiverUpdates.add(key); - } - else if (mod == 3) { - b3ReceiverUpdates.add(key); - } - } - b0ReceiverUpdates.remove(0); - b1ReceiverUpdates.remove(0); - b2ReceiverUpdates.remove(0); - b3ReceiverUpdates.remove(0); - - assertEquals(b0SenderUpdates, b0ReceiverUpdates); - assertEquals(b1SenderUpdates, b1ReceiverUpdates); - assertEquals(b2SenderUpdates, b2ReceiverUpdates); - assertEquals(b3SenderUpdates, b3ReceiverUpdates); - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java deleted file mode 100644 index 7b0cf3e..0000000 --- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java +++ /dev/null @@ -1,1234 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.wan.parallel; - -import org.junit.Ignore; -import org.junit.experimental.categories.Category; -import org.junit.Test; - -import static org.junit.Assert.*; - -import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase; -import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase; -import com.gemstone.gemfire.test.junit.categories.DistributedTest; - -import java.util.Set; - -import org.junit.experimental.categories.Category; - -import com.gemstone.gemfire.cache.EntryExistsException; -import com.gemstone.gemfire.cache.Region; -import com.gemstone.gemfire.cache.client.ServerOperationException; -import com.gemstone.gemfire.cache.wan.GatewaySender; -import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; -import com.gemstone.gemfire.internal.cache.RegionQueue; -import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender; -import com.gemstone.gemfire.internal.cache.wan.BatchException70; -import com.gemstone.gemfire.internal.cache.wan.WANTestBase; -import com.gemstone.gemfire.test.dunit.AsyncInvocation; -import com.gemstone.gemfire.test.dunit.IgnoredException; -import com.gemstone.gemfire.test.dunit.LogWriterUtils; -import com.gemstone.gemfire.test.dunit.SerializableRunnableIF; -import com.gemstone.gemfire.test.dunit.Wait; -import com.gemstone.gemfire.test.junit.categories.FlakyTest; - -@Category(DistributedTest.class) -public class ParallelWANPropagationDUnitTest extends WANTestBase { - private static final long serialVersionUID = 1L; - - public ParallelWANPropagationDUnitTest() { - super(); - } - - @Test - public void test_ParallelGatewaySenderMetaRegionNotExposedToUser_Bug44216() { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCache(lnPort); - createSender("ln", 2, true, 100, 300, false, false, - null, true); - createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap()); - - Set<GatewaySender> senders = cache.getGatewaySenders(); - GatewaySender sender = null; - for (GatewaySender s : senders) { - if (s.getId().equals("ln")) { - sender = s; - break; - } - } - try { - sender.start(); - } catch (Exception e) { - e.printStackTrace(); - fail("Failed with IOException"); - } - - GemFireCacheImpl gemCache = (GemFireCacheImpl)cache; - Set regionSet = gemCache.rootRegions(); - - for (Object r : regionSet) { - if (((Region)r).getName().equals( - ((AbstractGatewaySender)sender).getQueues().toArray(new RegionQueue[1])[0].getRegion().getName())) { - fail("The shadowPR is exposed to the user"); - } - } - } - - @Test - public void testParallelPropagation_withoutRemoteSite() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - //keep a larger batch to minimize number of exception occurrences in the log - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 300, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 300, false, false, null, true )); - vm6.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 300, false, false, null, true )); - vm7.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 300, false, false, null, true )); - - vm4.invoke(createPartitionedRegionRedundancy1Runnable()); - vm5.invoke(createPartitionedRegionRedundancy1Runnable()); - vm6.invoke(createPartitionedRegionRedundancy1Runnable()); - vm7.invoke(createPartitionedRegionRedundancy1Runnable()); - - startSenderInVMs("ln", vm4, vm5, vm6, vm7); - - //make sure all the senders are running before doing any puts - vm4.invoke(waitForSenderRunnable()); - vm5.invoke(waitForSenderRunnable()); - vm6.invoke(waitForSenderRunnable()); - vm7.invoke(waitForSenderRunnable()); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", - 1000 )); - - createCacheInVMs(nyPort, vm2, vm3); - vm2.invoke(createReceiverPartitionedRegionRedundancy1()); - vm3.invoke(createReceiverPartitionedRegionRedundancy1()); - createReceiverInVMs(vm2, vm3); - - //verify all buckets drained on all sender nodes. - vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - - // Just making sure that though the remote site is started later, - // remote site is still able to get the data. Since the receivers are - // started before creating partition region it is quite possible that the - // region may loose some of the events. This needs to be handled by the code - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR", 1000 )); - } - - /** - * Normal happy scenario test case. - * @throws Exception - */ - @Test - public void testParallelPropagation() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - vm6.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - vm7.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - - vm4.invoke(createPartitionedRegionRedundancy1Runnable()); - vm5.invoke(createPartitionedRegionRedundancy1Runnable()); - vm6.invoke(createPartitionedRegionRedundancy1Runnable()); - vm7.invoke(createPartitionedRegionRedundancy1Runnable()); - - startSenderInVMs("ln", vm4, vm5, vm6, vm7); - - vm2.invoke(createReceiverPartitionedRegionRedundancy1()); - vm3.invoke(createReceiverPartitionedRegionRedundancy1()); - - //before doing any puts, let the senders be running in order to ensure that - //not a single event will be lost - vm4.invoke(waitForSenderRunnable()); - vm5.invoke(waitForSenderRunnable()); - vm6.invoke(waitForSenderRunnable()); - vm7.invoke(waitForSenderRunnable()); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", - 1000 )); - - //verify all buckets drained on all sender nodes. - vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR", 1000 )); - } - - protected SerializableRunnableIF createReceiverPartitionedRegionRedundancy1() { - return () -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ); - } - - protected SerializableRunnableIF createPartitionedRegionRedundancy1Runnable() { - return () -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ); - } - - protected SerializableRunnableIF waitForSenderRunnable() { - return () -> WANTestBase.waitForSenderRunningState( "ln" ); - } - - @Test - public void testParallelPropagation_ManualStart() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, false )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, false )); - vm6.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, false )); - vm7.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, false )); - - vm4.invoke(createPartitionedRegionRedundancy1Runnable()); - vm5.invoke(createPartitionedRegionRedundancy1Runnable()); - vm6.invoke(createPartitionedRegionRedundancy1Runnable()); - vm7.invoke(createPartitionedRegionRedundancy1Runnable()); - - vm2.invoke(createReceiverPartitionedRegionRedundancy1()); - vm3.invoke(createReceiverPartitionedRegionRedundancy1()); - - //before doing any puts, let the senders be running in order to ensure that - //not a single event will be lost - vm4.invoke(waitForSenderRunnable()); - vm5.invoke(waitForSenderRunnable()); - vm6.invoke(waitForSenderRunnable()); - vm7.invoke(waitForSenderRunnable()); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", - 1000 )); - - //verify all buckets drained on all sender nodes. - vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR", 1000 )); - } - - /** - * Normal happy scenario test case2. - * @throws Exception - */ - @Test - public void testParallelPropagationPutBeforeSenderStart() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - vm6.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - vm7.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - - vm4.invoke(createPartitionedRegionRedundancy1Runnable()); - vm5.invoke(createPartitionedRegionRedundancy1Runnable()); - vm6.invoke(createPartitionedRegionRedundancy1Runnable()); - vm7.invoke(createPartitionedRegionRedundancy1Runnable()); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", - 1000 )); - - startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7); - - vm2.invoke(createReceiverPartitionedRegionRedundancy1()); - vm3.invoke(createReceiverPartitionedRegionRedundancy1()); - - //before doing any puts, let the senders be running in order to ensure that - //not a single event will be lost - vm4.invoke(waitForSenderRunnable()); - vm5.invoke(waitForSenderRunnable()); - vm6.invoke(waitForSenderRunnable()); - vm7.invoke(waitForSenderRunnable()); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", - 1000 )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR", 1000 )); - } - - /** - * Local and remote sites are up and running. - * Local site cache is closed and the site is built again. - * Puts are done to local site. - * Expected: Remote site should receive all the events put after the local - * site was built back. - * - * @throws Exception - */ - @Test - public void testParallelPropagationWithLocalCacheClosedAndRebuilt() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - vm6.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - vm7.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - - vm4.invoke(createPartitionedRegionRedundancy1Runnable()); - vm5.invoke(createPartitionedRegionRedundancy1Runnable()); - vm6.invoke(createPartitionedRegionRedundancy1Runnable()); - vm7.invoke(createPartitionedRegionRedundancy1Runnable()); - - startSenderInVMs("ln", vm4, vm5, vm6, vm7); - - vm2.invoke(createReceiverPartitionedRegionRedundancy1()); - vm3.invoke(createReceiverPartitionedRegionRedundancy1()); - - //before doing any puts, let the senders be running in order to ensure that - //not a single event will be lost - vm4.invoke(waitForSenderRunnable()); - vm5.invoke(waitForSenderRunnable()); - vm6.invoke(waitForSenderRunnable()); - vm7.invoke(waitForSenderRunnable()); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", - 1000 )); - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR", 1000 )); - //-------------------Close and rebuild local site --------------------------------- - - vm4.invoke(() -> WANTestBase.killSender()); - vm5.invoke(() -> WANTestBase.killSender()); - vm6.invoke(() -> WANTestBase.killSender()); - vm7.invoke(() -> WANTestBase.killSender()); - - Integer regionSize = - (Integer) vm2.invoke(() -> WANTestBase.getRegionSize(getTestMethodName() + "_PR" )); - LogWriterUtils.getLogWriter().info("Region size on remote is: " + regionSize); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - vm6.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - vm7.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - - vm4.invoke(createPartitionedRegionRedundancy1Runnable()); - vm5.invoke(createPartitionedRegionRedundancy1Runnable()); - vm6.invoke(createPartitionedRegionRedundancy1Runnable()); - vm7.invoke(createPartitionedRegionRedundancy1Runnable()); - - startSenderInVMs("ln", vm4, vm5, vm6, vm7); - - vm4.invoke(waitForSenderRunnable()); - vm5.invoke(waitForSenderRunnable()); - vm6.invoke(waitForSenderRunnable()); - vm7.invoke(waitForSenderRunnable()); - //------------------------------------------------------------------------------------ - - IgnoredException.addIgnoredException(EntryExistsException.class.getName()); - IgnoredException.addIgnoredException(BatchException70.class.getName()); - IgnoredException.addIgnoredException(ServerOperationException.class.getName()); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); - - //verify all buckets drained on all sender nodes. - vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR", 1000 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR", 1000 )); - } - - @Test - public void testParallelColocatedPropagation() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - vm6.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - vm7.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - - vm4.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), "ln", 1, 100, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), "ln", 1, 100, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), "ln", 1, 100, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), "ln", 1, 100, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5, vm6, vm7); - - vm2.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), null, 1, 100, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), null, 1, 100, isOffHeap() )); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 1000 )); - - //verify all buckets drained on all sender nodes. - vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName(), 1000 )); - } - /** - * Create colocated partitioned regions. - * Parent region has PGS attached and child region doesn't. - * - * Validate that events for parent region reaches remote site. - * - * @throws Exception - */ - - @Test - public void testParallelColocatedPropagation2() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - vm6.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - vm7.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - - vm4.invoke(() -> WANTestBase.createColocatedPartitionedRegions2( getTestMethodName(), "ln", 1, 100, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createColocatedPartitionedRegions2( getTestMethodName(), "ln", 1, 100, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createColocatedPartitionedRegions2( getTestMethodName(), "ln", 1, 100, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createColocatedPartitionedRegions2( getTestMethodName(), "ln", 1, 100, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5, vm6, vm7); - - vm2.invoke(() -> WANTestBase.createColocatedPartitionedRegions2( getTestMethodName(), null, 1, 100, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createColocatedPartitionedRegions2( getTestMethodName(), null, 1, 100, isOffHeap() )); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 1000 )); - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName()+"_child1", 1000 )); - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName()+"_child2", 1000 )); - - //verify all buckets drained on all sender nodes. - vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName(), 1000 )); - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName()+"_child1", 0 )); - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName()+"_child2", 0 )); - } - - - @Test - public void testParallelPropagationWithOverflow() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName(), null, 1, 100, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName(), null, 1, 100, isOffHeap() )); - createReceiverInVMs(vm2, vm3); - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - vm6.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - vm7.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName(), "ln", 1, 100, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName(), "ln", 1, 100, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName(), "ln", 1, 100, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName(), "ln", 1, 100, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5, vm6, vm7); - - //let all the senders start before doing any puts to ensure that none of the events is lost - vm4.invoke(waitForSenderRunnable()); - vm5.invoke(waitForSenderRunnable()); - vm6.invoke(waitForSenderRunnable()); - vm7.invoke(waitForSenderRunnable()); - - vm4.invoke(() -> WANTestBase.doHeavyPuts( getTestMethodName(), 150 )); - - //verify all buckets drained on all sender nodes. - vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName(), 150 )); - } - - @Test - public void testSerialReplicatedAndParallelPartitionedPropagation() - throws Exception { - - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createSender( "lnSerial", - 2, false, 100, 10, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "lnSerial", - 2, false, 100, 10, false, false, null, true )); - - vm4.invoke(() -> WANTestBase.createSender( "lnParallel", - 2, true, 100, 10, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "lnParallel", - 2, true, 100, 10, false, false, null, true )); - vm6.invoke(() -> WANTestBase.createSender( "lnParallel", - 2, true, 100, 10, false, false, null, true )); - vm7.invoke(() -> WANTestBase.createSender( "lnParallel", - 2, true, 100, 10, false, false, null, true )); - - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - - vm2.invoke(createReceiverPartitionedRegionRedundancy1()); - vm3.invoke(createReceiverPartitionedRegionRedundancy1()); - - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "lnSerial", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "lnSerial", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "lnSerial", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "lnSerial", isOffHeap() )); - - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "lnParallel", 1, 100, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "lnParallel", 1, 100, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "lnParallel", 1, 100, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "lnParallel", 1, 100, isOffHeap() )); - - startSenderInVMs("lnSerial", vm4, vm5); - - startSenderInVMs("lnParallel", vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", - 1000 )); - vm5.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", - 1000 )); - - //verify all buckets drained on all sender nodes. - vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("lnParallel")); - vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("lnParallel")); - vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("lnParallel")); - vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("lnParallel")); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR", 1000 )); - } - - @Test - public void testPartitionedParallelPropagationToTwoWanSites() - throws Exception { - Integer lnPort = createFirstLocatorWithDSId(1); - Integer nyPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - Integer tkPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 3, lnPort )); - - createCacheInVMs(nyPort, vm2); - vm2.invoke(createReceiverPartitionedRegionRedundancy1()); - createReceiverInVMs(vm2); - - createCacheInVMs(tkPort, vm3); - vm3.invoke(createReceiverPartitionedRegionRedundancy1()); - createReceiverInVMs(vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createSender( "lnParallel1", - 2, true, 100, 10, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "lnParallel1", - 2, true, 100, 10, false, false, null, true )); - vm6.invoke(() -> WANTestBase.createSender( "lnParallel1", - 2, true, 100, 10, false, false, null, true )); - vm7.invoke(() -> WANTestBase.createSender( "lnParallel1", - 2, true, 100, 10, false, false, null, true )); - - vm4.invoke(() -> WANTestBase.createSender( "lnParallel2", - 3, true, 100, 10, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "lnParallel2", - 3, true, 100, 10, false, false, null, true )); - vm6.invoke(() -> WANTestBase.createSender( "lnParallel2", - 3, true, 100, 10, false, false, null, true )); - vm7.invoke(() -> WANTestBase.createSender( "lnParallel2", - 3, true, 100, 10, false, false, null, true )); - - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "lnParallel1,lnParallel2", 1, 100, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "lnParallel1,lnParallel2", 1, 100, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "lnParallel1,lnParallel2", 1, 100, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "lnParallel1,lnParallel2", 1, 100, isOffHeap() )); - - startSenderInVMs("lnParallel1", vm4, vm5, vm6, vm7); - - startSenderInVMs("lnParallel2", vm4, vm5, vm6, vm7); - - - //before doing puts, make sure that the senders are started. - //this will ensure that not a single events is lost - vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "lnParallel1" )); - vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "lnParallel1" )); - vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "lnParallel1" )); - vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "lnParallel1" )); - - vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "lnParallel2" )); - vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "lnParallel2" )); - vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "lnParallel2" )); - vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "lnParallel2" )); - - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", - 1000 )); - - //verify all buckets drained on all sender nodes. - vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("lnParallel1")); - vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("lnParallel1")); - vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("lnParallel1")); - vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("lnParallel1")); - - //verify all buckets drained on all sender nodes. - vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("lnParallel2")); - vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("lnParallel2")); - vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("lnParallel2")); - vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("lnParallel2")); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR", 1000 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR", 1000 )); - } - - @Category(FlakyTest.class) // GEODE-1008 and GEODE-1180: random ports, async actions, thread sleeps, time sensitive, waitForCriterion - @Test - public void testPartitionedParallelPropagationHA() throws Exception { - IgnoredException.addIgnoredException("Broken pipe"); - IgnoredException.addIgnoredException("Connection reset"); - IgnoredException.addIgnoredException("Unexpected IOException"); - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - vm6.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - vm7.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - - - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 2, 100, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 2, 100, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 2, 100, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 2, 100, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5, vm6, vm7); - - vm2.invoke(createReceiverPartitionedRegionRedundancy1()); - vm3.invoke(createReceiverPartitionedRegionRedundancy1()); - - AsyncInvocation inv1 = vm7.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 5000 )); - Wait.pause(500); - AsyncInvocation inv2 = vm4.invokeAsync(() -> WANTestBase.killSender()); - AsyncInvocation inv3 = vm6.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 10000 )); - Wait.pause(1500); - AsyncInvocation inv4 = vm5.invokeAsync(() -> WANTestBase.killSender()); - inv1.join(); - inv2.join(); - inv3.join(); - inv4.join(); - - vm6.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR", 10000 )); - vm7.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR", 10000 )); - - //verify all buckets drained on the sender nodes that up and running. - vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR", 10000 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR", 10000 )); - } - - @Test - public void testParallelPropagationWithFilter() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, - new MyGatewayEventFilter(), true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, - new MyGatewayEventFilter(), true )); - vm6.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, - new MyGatewayEventFilter(), true )); - vm7.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, - new MyGatewayEventFilter(), true )); - - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName(), "ln", 1, 100, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName(), "ln", 1, 100, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName(), "ln", 1, 100, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName(), "ln", 1, 100, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5, vm6, vm7); - - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName(), null, 1, 100, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName(), null, 1, 100, isOffHeap() )); - - //wait for senders to be running before doing any puts. This will ensure that - //not a single events is lost - vm4.invoke(waitForSenderRunnable()); - vm5.invoke(waitForSenderRunnable()); - vm6.invoke(waitForSenderRunnable()); - vm7.invoke(waitForSenderRunnable()); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 1000 )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName(), 800 )); - } - - - @Test - public void testParallelPropagationWithPutAll() throws Exception { - - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - vm6.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - vm7.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - - vm4.invoke(createPartitionedRegionRedundancy1Runnable()); - vm5.invoke(createPartitionedRegionRedundancy1Runnable()); - vm6.invoke(createPartitionedRegionRedundancy1Runnable()); - vm7.invoke(createPartitionedRegionRedundancy1Runnable()); - - startSenderInVMs("ln", vm4, vm5, vm6, vm7); - - vm2.invoke(createReceiverPartitionedRegionRedundancy1()); - vm3.invoke(createReceiverPartitionedRegionRedundancy1()); - - //before doing any puts, let the senders be running in order to ensure that - //not a single event will be lost - vm4.invoke(waitForSenderRunnable()); - vm5.invoke(waitForSenderRunnable()); - vm6.invoke(waitForSenderRunnable()); - vm7.invoke(waitForSenderRunnable()); - - vm4.invoke(() -> WANTestBase.doPutAll( getTestMethodName() + "_PR", - 100 , 50 )); - - //verify all buckets drained on all sender nodes. - vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR", 5000 )); - - } - - /** - * There was a bug that all destroy events were being put into different buckets of sender queue - * against the key 0. Bug# 44304 - * - * @throws Exception - */ - @Test - public void testParallelPropagationWithDestroy() throws Exception { - - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 100, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 100, false, false, null, true )); - vm6.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 100, false, false, null, true )); - vm7.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 100, false, false, null, true )); - - vm4.invoke(createPartitionedRegionRedundancy1Runnable()); - vm5.invoke(createPartitionedRegionRedundancy1Runnable()); - vm6.invoke(createPartitionedRegionRedundancy1Runnable()); - vm7.invoke(createPartitionedRegionRedundancy1Runnable()); - - startSenderInVMs("ln", vm4, vm5, vm6, vm7); - - vm2.invoke(createReceiverPartitionedRegionRedundancy1()); - vm3.invoke(createReceiverPartitionedRegionRedundancy1()); - - //before doing any puts, let the senders be running in order to ensure that - //not a single event will be lost - vm4.invoke(waitForSenderRunnable()); - vm5.invoke(waitForSenderRunnable()); - vm6.invoke(waitForSenderRunnable()); - vm7.invoke(waitForSenderRunnable()); - - vm4.invoke(() -> WANTestBase.pauseSender( "ln" )); - vm5.invoke(() -> WANTestBase.pauseSender( "ln" )); - vm6.invoke(() -> WANTestBase.pauseSender( "ln" )); - vm7.invoke(() -> WANTestBase.pauseSender( "ln" )); - - Wait.pause(2000); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); - vm4.invoke(() -> WANTestBase.doDestroys( getTestMethodName() + "_PR", 500 )); - - - vm4.invoke(() -> WANTestBase.validateParallelSenderQueueBucketSize( "ln", 15 )); - vm5.invoke(() -> WANTestBase.validateParallelSenderQueueBucketSize( "ln", 15 )); - vm6.invoke(() -> WANTestBase.validateParallelSenderQueueBucketSize( "ln", 15 )); - vm7.invoke(() -> WANTestBase.validateParallelSenderQueueBucketSize( "ln", 15 )); - - vm4.invoke(() -> WANTestBase.resumeSender( "ln" )); - vm5.invoke(() -> WANTestBase.resumeSender( "ln" )); - vm6.invoke(() -> WANTestBase.resumeSender( "ln" )); - vm7.invoke(() -> WANTestBase.resumeSender( "ln" )); - - //give some time for the queue to drain - Wait.pause(5000); - - vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" )); - vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" )); - vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" )); - vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR", 500 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR", 500 )); - - } - - /** - * Normal happy scenario test case. But with Tx operations - * @throws Exception - */ - @Test - public void testParallelPropagationTxOperations() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - createCacheInVMs(lnPort, vm4, vm5); - //vm6.invoke(() -> WANTestBase.createCache( lnPort )); - //vm7.invoke(() -> WANTestBase.createCache( lnPort )); - - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - //vm6.invoke(() -> WANTestBase.createSender( "ln", 2, - // true, 100, 10, false, false, null, true )); - //vm7.invoke(() -> WANTestBase.createSender( "ln", 2, - // true, 100, 10, false, false, null, true )); - - vm4.invoke(createPartitionedRegionRedundancy1Runnable()); - vm5.invoke(createPartitionedRegionRedundancy1Runnable()); -// vm6.invoke(() -> WANTestBase.createPartitionedRegion( -// testName + "_PR", "ln", true, 1, 100, isOffHeap() )); -// vm7.invoke(() -> WANTestBase.createPartitionedRegion( -// testName + "_PR", "ln", true, 1, 100, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5); -// vm6.invoke(() -> WANTestBase.startSender( "ln" )); -// vm7.invoke(() -> WANTestBase.startSender( "ln" )); - - vm2.invoke(createReceiverPartitionedRegionRedundancy1()); - vm3.invoke(createReceiverPartitionedRegionRedundancy1()); - - //before doing any puts, let the senders be running in order to ensure that - //not a single event will be lost - vm4.invoke(waitForSenderRunnable()); - vm5.invoke(waitForSenderRunnable()); -// vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); -// vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm4.invoke(() -> WANTestBase.doTxPuts( getTestMethodName() + "_PR")); - - //verify all buckets drained on all sender nodes. - vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); -// vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); -// vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR", 3 )); - } - - @Ignore - @Test - public void testParallelGatewaySenderQueueLocalSize() { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2); - createCacheInVMs(nyPort, vm2); - createReceiverInVMs(vm2); - createCacheInVMs(lnPort, vm4, vm5); - - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - - vm4.invoke(createPartitionedRegionRedundancy1Runnable()); - vm5.invoke(createPartitionedRegionRedundancy1Runnable()); - - startSenderInVMs("ln", vm4, vm5); - - vm4.invoke(waitForSenderRunnable()); - vm5.invoke(waitForSenderRunnable()); - - vm4.invoke(() -> WANTestBase.pauseSender( "ln" )); - vm5.invoke(() -> WANTestBase.pauseSender( "ln" )); - - /* - * Remember pausing sender does not guarantee that peek will be paused - * immediately as its quite possible event processor is already in peeking - * events and send them after peeking without a check for pause. hence below - * pause of 1 sec to allow dispatching to be paused - */ -// vm4.invoke(() -> WANTestBase.waitForSenderPausedState( "ln" )); -// vm5.invoke(() -> WANTestBase.waitForSenderPausedState( "ln" )); - Wait.pause(1000); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", - 10 )); - - vm4.invoke(() -> WANTestBase.validateQueueContents( "ln", 10 )); - vm5.invoke(() -> WANTestBase.validateQueueContents( "ln", 10 )); - - // instead of checking size as 5 and 5. check that combined size is 10 - Integer localSize1 = (Integer)vm4.invoke(() -> WANTestBase.getPRQLocalSize( "ln")); - Integer localSize2 = (Integer)vm5.invoke(() -> WANTestBase.getPRQLocalSize( "ln")); - assertEquals(10, localSize1 + localSize2); - } - - - - public void tParallelGatewaySenderQueueLocalSizeWithHA() { - IgnoredException.addIgnoredException("Broken pipe"); - IgnoredException.addIgnoredException("Connection reset"); - IgnoredException.addIgnoredException("Unexpected IOException"); - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2); - createReceiverInVMs(vm2); - createCacheInVMs(lnPort, vm4, vm5); - - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - - vm4.invoke(createPartitionedRegionRedundancy1Runnable()); - vm5.invoke(createPartitionedRegionRedundancy1Runnable()); - - startSenderInVMs("ln", vm4, vm5); - - vm4.invoke(waitForSenderRunnable()); - vm5.invoke(waitForSenderRunnable()); - - vm4.invoke(() -> WANTestBase.pauseSender( "ln" )); - vm5.invoke(() -> WANTestBase.pauseSender( "ln" )); - - /* - * Remember pausing sender does not guarantee that peek will be paused - * immediately as its quite possible event processor is already in peeking - * events and send them after peeking without a check for pause. hence below - * pause of 1 sec to allow dispatching to be paused - */ -// vm4.invoke(() -> WANTestBase.waitForSenderPausedState( "ln" )); -// vm5.invoke(() -> WANTestBase.waitForSenderPausedState( "ln" )); - Wait.pause(1000); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", - 10 )); - - vm4.invoke(() -> WANTestBase.validateQueueContents( "ln", 10 )); - vm5.invoke(() -> WANTestBase.validateQueueContents( "ln", 10 )); - - Integer localSize1 = (Integer)vm4.invoke(() -> WANTestBase.getPRQLocalSize( "ln")); - Integer localSize2 = (Integer)vm5.invoke(() -> WANTestBase.getPRQLocalSize( "ln")); - assertEquals(10, localSize1 + localSize2); - - vm5.invoke(() -> WANTestBase.killSender( )); - - vm4.invoke(() -> WANTestBase.validateQueueContents( "ln", 10 )); - vm4.invoke(() -> WANTestBase.checkPRQLocalSize( "ln", 10 )); - - } - - /** - * Added for defect #50364 Can't colocate region that has AEQ with a region that does not have that same AEQ - */ - @Test - public void testParallelSenderAttachedToChildRegionButNotToParentRegion() { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - //create cache and receiver on site2 - createCacheInVMs(nyPort, vm2); - createReceiverInVMs(vm2); - //create cache on site1 - createCacheInVMs(lnPort, vm3); - - //create sender on site1 - vm3.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - - //start sender on site1 - startSenderInVMs("ln", vm3); - - //create leader (parent) PR on site1 - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "PARENT_PR", null, 0, 100, isOffHeap() )); - String parentRegionFullPath = - (String) vm3.invoke(() -> WANTestBase.getRegionFullPath( getTestMethodName() + "PARENT_PR")); - - //create colocated (child) PR on site1 - vm3.invoke(() -> WANTestBase.createColocatedPartitionedRegion( - getTestMethodName() + "CHILD_PR", "ln", 0, 100, parentRegionFullPath )); - - //create leader and colocated PR on site2 - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "PARENT_PR", null, 0, 100, isOffHeap() )); - vm2.invoke(() -> WANTestBase.createColocatedPartitionedRegion( - getTestMethodName() + "CHILD_PR", null, 0, 100, parentRegionFullPath )); - - //do puts in colocated (child) PR on site1 - vm3.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "CHILD_PR", 1000 )); - - //verify the puts reach site2 - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "CHILD_PR", 1000 )); - } - - @Test - public void testParallelPropagationWithFilter_AfterAck() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm6, vm7); - createReceiverInVMs(vm6, vm7); - - createCacheInVMs(lnPort, vm2, vm3, vm4, vm5); - - vm2.invoke(() -> WANTestBase.createSender( "ln", 2, true, - 100, 10, false, false, new MyGatewayEventFilter_AfterAck(), true )); - vm3.invoke(() -> WANTestBase.createSender( "ln", 2, true, - 100, 10, false, false, new MyGatewayEventFilter_AfterAck(), true )); - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, true, - 100, 10, false, false, new MyGatewayEventFilter_AfterAck(), true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, true, - 100, 10, false, false, new MyGatewayEventFilter_AfterAck(), true )); - - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName(), "ln", 1, 100, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName(), "ln", 1, 100, isOffHeap() )); - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName(), "ln", 1, 100, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName(), "ln", 1, 100, isOffHeap() )); - - startSenderInVMs("ln", vm2, vm3, vm4, vm5); - - vm6.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName(), null, 1, 100, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName(), null, 1, 100, isOffHeap() )); - - // wait for senders to be running before doing any puts. This will ensure - // that - // not a single events is lost - vm2.invoke(waitForSenderRunnable()); - vm3.invoke(waitForSenderRunnable()); - vm4.invoke(waitForSenderRunnable()); - vm5.invoke(waitForSenderRunnable()); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 1000 )); - - vm2.invoke(() -> WANTestBase.validateQueueContents( "ln", - 0 )); - vm3.invoke(() -> WANTestBase.validateQueueContents( "ln", - 0 )); - vm4.invoke(() -> WANTestBase.validateQueueContents( "ln", - 0 )); - vm5.invoke(() -> WANTestBase.validateQueueContents( "ln", - 0 )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName(), 1000 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName(), 1000 )); - vm4.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName(), 1000 )); - vm5.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName(), 1000 )); - - vm6.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName(), 1000 )); - vm7.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName(), 1000 )); - - Integer vm2Acks = (Integer)vm2.invoke(() -> WANTestBase.validateAfterAck( "ln")); - Integer vm3Acks = (Integer)vm3.invoke(() -> WANTestBase.validateAfterAck( "ln")); - Integer vm4Acks = (Integer)vm4.invoke(() -> WANTestBase.validateAfterAck( "ln")); - Integer vm5Acks = (Integer)vm5.invoke(() -> WANTestBase.validateAfterAck( "ln")); - - assertEquals(2000, (vm2Acks + vm3Acks + vm4Acks + vm5Acks)); - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationLoopBackDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationLoopBackDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationLoopBackDUnitTest.java deleted file mode 100644 index 07a6223..0000000 --- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationLoopBackDUnitTest.java +++ /dev/null @@ -1,415 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.wan.parallel; - -import org.junit.experimental.categories.Category; -import org.junit.Test; - -import static org.junit.Assert.*; - -import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase; -import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase; -import com.gemstone.gemfire.test.junit.categories.DistributedTest; - -import com.gemstone.gemfire.internal.cache.wan.WANTestBase; -import com.gemstone.gemfire.test.dunit.Wait; - -@Category(DistributedTest.class) -public class ParallelWANPropagationLoopBackDUnitTest extends WANTestBase { - - private static final long serialVersionUID = 1L; - - public ParallelWANPropagationLoopBackDUnitTest() { - super(); - } - - /** - * Test loop back issue between 2 WAN sites (LN & NY). LN -> NY -> LN. - * Site1 (LN): vm2, vm4, vm5 - * Site2 (NY): vm3, vm6, vm7 - */ - @Test - public void testParallelPropagationLoopBack() { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - //create receiver on site1 and site2 - createCacheInVMs(lnPort, vm2, vm4, vm5); - vm2.invoke(() -> WANTestBase.createReceiver()); - createCacheInVMs(nyPort, vm3, vm6, vm7); - vm3.invoke(() -> WANTestBase.createReceiver()); - - //create senders on site1 - vm2.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - - //create senders on site2 - vm3.invoke(() -> WANTestBase.createSender( "ny", 1, - true, 100, 10, false, false, null, true )); - vm6.invoke(() -> WANTestBase.createSender( "ny", 1, - true, 100, 10, false, false, null, true )); - vm7.invoke(() -> WANTestBase.createSender( "ny", 1, - true, 100, 10, false, false, null, true )); - - //create PR on site1 - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 0, 100, isOffHeap() )); - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 0, 100, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 0, 100, isOffHeap() )); - - //create PR on site2 - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ny", 0, 100, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ny", 0, 100, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ny", 0, 100, isOffHeap() )); - - //start sender on site1 - startSenderInVMs("ln", vm2, vm4, vm5); - - - //start sender on site2 - startSenderInVMs("ny", vm3, vm6, vm7); - - - //pause senders on site1 - vm2.invoke(() -> WANTestBase.pauseSender( "ln" )); - vm4.invoke(() -> WANTestBase.pauseSender( "ln" )); - vm5.invoke(() -> WANTestBase.pauseSender( "ln" )); - - //pause senders on site2 - vm3.invoke(() -> WANTestBase.pauseSender( "ny" )); - vm6.invoke(() -> WANTestBase.pauseSender( "ny" )); - vm7.invoke(() -> WANTestBase.pauseSender( "ny" )); - - //this is required since sender pause doesn't take effect immediately - Wait.pause(1000); - - //Do 100 puts on site1 - vm2.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", - 100 )); - //do next 100 puts on site2 - vm3.invoke(() -> WANTestBase.doPutsFrom( getTestMethodName() + "_PR", - 100, 200 )); - //verify queue size on both sites - vm2.invoke(() -> WANTestBase.verifyQueueSize( "ln", 100 )); - vm4.invoke(() -> WANTestBase.verifyQueueSize( "ln", 100 )); - vm5.invoke(() -> WANTestBase.verifyQueueSize( "ln", 100 )); - - vm3.invoke(() -> WANTestBase.verifyQueueSize( "ny", 100 )); - vm6.invoke(() -> WANTestBase.verifyQueueSize( "ny", 100 )); - vm7.invoke(() -> WANTestBase.verifyQueueSize( "ny", 100 )); - - //resume sender on site1 - vm2.invoke(() -> WANTestBase.resumeSender( "ln" )); - vm4.invoke(() -> WANTestBase.resumeSender( "ln" )); - vm5.invoke(() -> WANTestBase.resumeSender( "ln" )); - - //validate events reached site2 from site1 - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR", 200 )); - - //on site2, verify queue size again - //this ensures that loopback is not happening since the queue size is same as before - //the event coming from site1 are not enqueued again - vm3.invoke(() -> WANTestBase.verifyQueueSize( "ny", 100 )); - vm6.invoke(() -> WANTestBase.verifyQueueSize( "ny", 100 )); - vm7.invoke(() -> WANTestBase.verifyQueueSize( "ny", 100 )); - - //resume sender on site2 - vm3.invoke(() -> WANTestBase.resumeSender( "ny" )); - vm6.invoke(() -> WANTestBase.resumeSender( "ny" )); - vm7.invoke(() -> WANTestBase.resumeSender( "ny" )); - - //validate region size on both the sites - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR", 200 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR", 200 )); - } - - /** - * Test loop back issue among 3 WAN sites with Ring topology i.e. LN -> NY -> TK -> LN - * Site1 (LN): vm3, vm6 - * Site2 (NY): vm4, vm7 - * Site3 (TK): vm5 - */ - @Test - public void testParallelPropagationLoopBack3Sites() { - //Create locators - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - Integer tkPort = (Integer)vm2.invoke(() -> WANTestBase.createFirstRemoteLocator( 3, lnPort )); - - //create cache and receivers on all the 3 sites - createCacheInVMs(lnPort, vm3, vm6); - createReceiverInVMs(vm3, vm6); - createCacheInVMs(nyPort, vm4, vm7); - createReceiverInVMs(vm4, vm7); - createCacheInVMs(tkPort, vm5); - createReceiverInVMs(vm5); - - - //create senders on all the 3 sites - vm3.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - vm6.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - - vm4.invoke(() -> WANTestBase.createSender( "ny", 3, - true, 100, 10, false, false, null, true )); - vm7.invoke(() -> WANTestBase.createSender( "ny", 3, - true, 100, 10, false, false, null, true )); - - vm5.invoke(() -> WANTestBase.createSender( "tk", 1, - true, 100, 10, false, false, null, true )); - - //create PR on the 3 sites - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 0, 100, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 0, 100, isOffHeap() )); - - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ny", 0, 100, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ny", 0, 100, isOffHeap() )); - - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "tk", 0, 100, isOffHeap() )); - - //start senders on all the sites - startSenderInVMs("ln", vm3, vm6); - - startSenderInVMs("ny", vm4, vm7); - - vm5.invoke(() -> WANTestBase.startSender( "tk" )); - - //pause senders on site1 and site3. Site2 has the sender running to pass along events - vm3.invoke(() -> WANTestBase.pauseSender( "ln" )); - vm6.invoke(() -> WANTestBase.pauseSender( "ln" )); - - vm5.invoke(() -> WANTestBase.pauseSender( "tk" )); - - //need to have this pause since pauseSender doesn't take effect immediately - Wait.pause(1000); - - //do puts on site1 - vm3.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", - 100 )); - - //do more puts on site3 - vm5.invoke(() -> WANTestBase.doPutsFrom( getTestMethodName() + "_PR", - 100, 200 )); - - //verify queue size on site1 and site3 - vm3.invoke(() -> WANTestBase.verifyQueueSize( "ln", 100 )); - vm5.invoke(() -> WANTestBase.verifyQueueSize( "tk", 100 )); - - //resume sender on site1 so that events reach site2 and from there to site3 - vm3.invoke(() -> WANTestBase.resumeSender( "ln" )); - vm6.invoke(() -> WANTestBase.resumeSender( "ln" )); - - //validate region size on site2 (should have 100) and site3 (should have 200) - vm4.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR", 100 )); - vm5.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR", 200 )); - - //verify queue size remains same on site3 which means event loopback did not happen - //this means events coming from site1 are not enqueued back into the sender - vm5.invoke(() -> WANTestBase.verifyQueueSize( "tk", 100 )); - - //resume sender on site3 - vm5.invoke(() -> WANTestBase.resumeSender( "tk" )); - - //validate region size - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR", 200 )); - vm4.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR", 200 )); - vm5.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR", 200 )); - } - - /** - * Test loop back issue among 3 WAN sites with N to N topology - * i.e. each site connected to all other sites. - * Puts are done to only one DS. - * LN site: vm3, vm6 - * NY site: vm4, vm7 - * TK site: vm5 - */ - @Test - public void testParallelPropagationLoopBack3SitesNtoNTopologyPutFromOneDS() { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - Integer tkPort = (Integer)vm2.invoke(() -> WANTestBase.createFirstRemoteLocator( 3, lnPort )); - - createCacheInVMs(lnPort, vm3, vm6); - createCacheInVMs(nyPort, vm4, vm7); - createCacheInVMs(tkPort, vm5); - vm3.invoke(() -> WANTestBase.createReceiver()); - vm4.invoke(() -> WANTestBase.createReceiver()); - vm5.invoke(() -> WANTestBase.createReceiver()); - - //site1 - vm3.invoke(() -> WANTestBase.createSender( "ln1", 2, - true, 100, 10, false, false, null, true )); - vm6.invoke(() -> WANTestBase.createSender( "ln1", 2, - true, 100, 10, false, false, null, true )); - - vm3.invoke(() -> WANTestBase.createSender( "ln2", 3, - true, 100, 10, false, false, null, true )); - vm6.invoke(() -> WANTestBase.createSender( "ln2", 3, - true, 100, 10, false, false, null, true )); - - //site2 - vm4.invoke(() -> WANTestBase.createSender( "ny1", 1, - true, 100, 10, false, false, null, true )); - vm7.invoke(() -> WANTestBase.createSender( "ny1", 1, - true, 100, 10, false, false, null, true )); - - vm4.invoke(() -> WANTestBase.createSender( "ny2", 3, - true, 100, 10, false, false, null, true )); - vm7.invoke(() -> WANTestBase.createSender( "ny2", 3, - true, 100, 10, false, false, null, true )); - - //site3 - vm5.invoke(() -> WANTestBase.createSender( "tk1", 1, - true, 100, 10, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "tk2", 2, - true, 100, 10, false, false, null, true )); - - //create PR - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln1,ln2", 0, 1, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln1,ln2", 0, 1, isOffHeap() )); - - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ny1,ny2", 0, 1, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ny1,ny2", 0, 1, isOffHeap() )); - - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "tk1,tk2", 0, 1, isOffHeap() )); - - //start all the senders - vm3.invoke(() -> WANTestBase.startSender( "ln1" )); - vm3.invoke(() -> WANTestBase.startSender( "ln2" )); - vm6.invoke(() -> WANTestBase.startSender( "ln1" )); - vm6.invoke(() -> WANTestBase.startSender( "ln2" )); - - vm4.invoke(() -> WANTestBase.startSender( "ny1" )); - vm4.invoke(() -> WANTestBase.startSender( "ny2" )); - vm7.invoke(() -> WANTestBase.startSender( "ny1" )); - vm7.invoke(() -> WANTestBase.startSender( "ny2" )); - - vm5.invoke(() -> WANTestBase.startSender( "tk1" )); - vm5.invoke(() -> WANTestBase.startSender( "tk2" )); - - //pause senders on all the sites - vm3.invoke(() -> WANTestBase.pauseSender( "ln1" )); - vm3.invoke(() -> WANTestBase.pauseSender( "ln2" )); - vm6.invoke(() -> WANTestBase.pauseSender( "ln1" )); - vm6.invoke(() -> WANTestBase.pauseSender( "ln2" )); - - vm4.invoke(() -> WANTestBase.pauseSender( "ny1" )); - vm4.invoke(() -> WANTestBase.pauseSender( "ny2" )); - vm7.invoke(() -> WANTestBase.pauseSender( "ny1" )); - vm7.invoke(() -> WANTestBase.pauseSender( "ny2" )); - - vm5.invoke(() -> WANTestBase.pauseSender( "tk1" )); - vm5.invoke(() -> WANTestBase.pauseSender( "tk2" )); - - //this is required since sender pause doesn't take effect immediately - Wait.pause(1000); - - //do puts on site1 - vm3.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", - 100 )); - - //verify queue size on site1 and site3 - vm3.invoke(() -> WANTestBase.verifyQueueSize( "ln1", 100 )); - vm3.invoke(() -> WANTestBase.verifyQueueSize( "ln2", 100 )); - - //resume sender (from site1 to site2) on site1 - vm3.invoke(() -> WANTestBase.resumeSender( "ln1" )); - vm6.invoke(() -> WANTestBase.resumeSender( "ln1" )); - - //validate region size on site2 - vm4.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR", 100 )); - - //verify queue size on site2 (sender 2 to 1) - //should remain at 0 as the events from site1 should not go back to site1 - vm4.invoke(() -> WANTestBase.verifyQueueSize( "ny1", 0 )); - - //verify queue size on site2 (sender 2 to 3) - //should remain at 0 as events from site1 will reach site3 directly..site2 need not send to site3 again - vm4.invoke(() -> WANTestBase.verifyQueueSize( "ny2", 0 )); - - //do more puts on site3 - vm5.invoke(() -> WANTestBase.doPutsFrom( getTestMethodName() + "_PR", - 100, 200 )); - - //resume sender (from site3 to site2) on site3 - vm5.invoke(() -> WANTestBase.resumeSender( "tk2" )); - - //validate region size on site2 - vm4.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR", 200 )); - - //verify queue size on site2 (sender 2 to 3) - //should remain at 0 as the events from site3 should not go back to site3 - vm4.invoke(() -> WANTestBase.verifyQueueSize( "ny2", 0 )); - - //verify queue size on site2 (sender 2 to 1) - //should remain at 0 as events from site3 will reach site1 directly..site2 need not send to site1 again - vm4.invoke(() -> WANTestBase.verifyQueueSize( "ny1", 0 )); - - //resume all senders - vm3.invoke(() -> WANTestBase.resumeSender( "ln2" )); - vm6.invoke(() -> WANTestBase.resumeSender( "ln2" )); - - vm4.invoke(() -> WANTestBase.resumeSender( "ny1" )); - vm4.invoke(() -> WANTestBase.resumeSender( "ny2" )); - vm7.invoke(() -> WANTestBase.resumeSender( "ny1" )); - vm7.invoke(() -> WANTestBase.resumeSender( "ny2" )); - - vm5.invoke(() -> WANTestBase.resumeSender( "tk1" )); - - //validate region size on all sites - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR", 200 )); - vm4.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR", 200 )); - vm5.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR", 200 )); - } - - - -}
