http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationConcurrentOpsDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationConcurrentOpsDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationConcurrentOpsDUnitTest.java new file mode 100644 index 0000000..427054a --- /dev/null +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationConcurrentOpsDUnitTest.java @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.wan.parallel; + +import org.junit.experimental.categories.Category; +import org.junit.Test; + +import static org.junit.Assert.*; + +import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase; +import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase; +import com.gemstone.gemfire.test.junit.categories.DistributedTest; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +import com.gemstone.gemfire.internal.cache.wan.WANTestBase; +import com.gemstone.gemfire.test.dunit.AsyncInvocation; +import com.gemstone.gemfire.test.dunit.Wait; + +@Category(DistributedTest.class) +public class ParallelWANPropagationConcurrentOpsDUnitTest extends WANTestBase { + + private static final long serialVersionUID = 1L; + + public ParallelWANPropagationConcurrentOpsDUnitTest() { + super(); + } + + /** + * Normal propagation scenario test case for a PR with only one bucket. + * This has been added for bug# 44284. + * @throws Exception + */ + @Test + public void testParallelPropagationWithSingleBucketPR() throws Exception { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 1, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 1, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 1, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 1, isOffHeap() )); + + //before doing any puts, let the senders be running in order to ensure that + //not a single event will be lost + vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + + //pause the senders + vm4.invoke(() -> WANTestBase.pauseSender( "ln" )); + vm5.invoke(() -> WANTestBase.pauseSender( "ln" )); + + Wait.pause(5000); + + AsyncInvocation async1 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 700 )); + AsyncInvocation async2 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); + AsyncInvocation async3 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 800 )); + AsyncInvocation async4 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); + + async1.join(); + async2.join(); + async3.join(); + async4.join(); + + int queueSize = (Integer) vm4.invoke(() -> WANTestBase.getQueueContentSize( "ln" )); + assertEquals("Actual queue size is not matching with the expected", 3500, queueSize); + + //resume the senders now + vm4.invoke(() -> WANTestBase.resumeSender( "ln" )); + vm5.invoke(() -> WANTestBase.resumeSender( "ln" )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 1000 )); + } + + /** + * Normal propagation scenario test case for a PR with less number of buckets. + * Buckets have been kept to 10 for this test. + * This has been added for bug# 44287. + * @throws Exception + */ + @Test + public void testParallelPropagationWithLowNumberOfBuckets() throws Exception { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 10, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 10, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 10, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 10, isOffHeap() )); + + //before doing any puts, let the senders be running in order to ensure that + //not a single event will be lost + vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + + AsyncInvocation async1 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 700 )); + AsyncInvocation async2 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); + AsyncInvocation async3 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 800 )); + AsyncInvocation async4 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); + + async1.join(); + async2.join(); + async3.join(); + async4.join(); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 1000 )); + } + + @Test + public void testParallelQueueDrainInOrder_PR() throws Exception { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2); + vm2.invoke(() -> WANTestBase.createReceiver()); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 3, 4, isOffHeap() )); + + vm2.invoke(() -> WANTestBase.addListenerOnRegion(getTestMethodName() + "_PR")); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + vm6.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + vm7.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 3, 4, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 3, 4, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 3, 4, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 3, 4, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.addQueueListener( "ln", true)); + vm5.invoke(() -> WANTestBase.addQueueListener( "ln", true)); + vm6.invoke(() -> WANTestBase.addQueueListener( "ln", true)); + vm7.invoke(() -> WANTestBase.addQueueListener( "ln", true)); + + Wait.pause(2000); + vm4.invoke(() -> WANTestBase.pauseSender( "ln")); + vm5.invoke(() -> WANTestBase.pauseSender( "ln")); + vm6.invoke(() -> WANTestBase.pauseSender( "ln")); + vm7.invoke(() -> WANTestBase.pauseSender( "ln")); + + Wait.pause(2000); + + vm6.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", + 4 )); + vm4.invoke(() -> WANTestBase.addListenerOnBucketRegion(getTestMethodName() + "_PR", 4)); + vm5.invoke(() -> WANTestBase.addListenerOnBucketRegion(getTestMethodName() + "_PR", 4)); + vm6.invoke(() -> WANTestBase.addListenerOnBucketRegion(getTestMethodName() + "_PR", 4 )); + vm7.invoke(() -> WANTestBase.addListenerOnBucketRegion(getTestMethodName() + "_PR", 4)); + + vm4.invoke(() -> WANTestBase.addListenerOnQueueBucketRegion( "ln" , 4)); + vm5.invoke(() -> WANTestBase.addListenerOnQueueBucketRegion( "ln" , 4)); + vm6.invoke(() -> WANTestBase.addListenerOnQueueBucketRegion( "ln" , 4 )); + vm7.invoke(() -> WANTestBase.addListenerOnQueueBucketRegion( "ln" , 4)); + + vm6.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", + 1000 )); + + vm6.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 1000 )); + + HashMap vm4BRUpdates = (HashMap)vm4.invoke(() -> WANTestBase.checkBR(getTestMethodName() + "_PR", 4)); + HashMap vm5BRUpdates = (HashMap)vm5.invoke(() -> WANTestBase.checkBR(getTestMethodName() + "_PR", 4)); + HashMap vm6BRUpdates = (HashMap)vm6.invoke(() -> WANTestBase.checkBR(getTestMethodName() + "_PR", 4)); + HashMap vm7BRUpdates = (HashMap)vm7.invoke(() -> WANTestBase.checkBR(getTestMethodName() + "_PR", 4)); + + List b0SenderUpdates = (List)vm4BRUpdates.get("Create0"); + List b1SenderUpdates = (List)vm4BRUpdates.get("Create1"); + List b2SenderUpdates = (List)vm4BRUpdates.get("Create2"); + List b3SenderUpdates = (List)vm4BRUpdates.get("Create3"); + + HashMap vm4QueueBRUpdates = (HashMap)vm4.invoke(() -> WANTestBase.checkQueue_BR("ln", 4)); + HashMap vm5QueueBRUpdates = (HashMap)vm5.invoke(() -> WANTestBase.checkQueue_BR("ln", 4)); + HashMap vm6QueueBRUpdates = (HashMap)vm6.invoke(() -> WANTestBase.checkQueue_BR("ln", 4)); + HashMap vm7QueueBRUpdates = (HashMap)vm7.invoke(() -> WANTestBase.checkQueue_BR("ln", 4)); + + assertEquals(vm4QueueBRUpdates, vm5QueueBRUpdates); + assertEquals(vm4QueueBRUpdates, vm6QueueBRUpdates); + assertEquals(vm4QueueBRUpdates, vm7QueueBRUpdates); + + vm4.invoke(() -> WANTestBase.resumeSender( "ln")); + vm5.invoke(() -> WANTestBase.resumeSender( "ln")); + vm6.invoke(() -> WANTestBase.resumeSender( "ln")); + vm7.invoke(() -> WANTestBase.resumeSender( "ln")); + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 1000 )); + HashMap receiverUpdates = (HashMap)vm2.invoke(() -> WANTestBase.checkPR( + getTestMethodName() + "_PR")); + List<Long> createList = (List)receiverUpdates.get("Create"); + ArrayList<Long> b0ReceiverUpdates = new ArrayList<Long>(); + ArrayList<Long> b1ReceiverUpdates = new ArrayList<Long>(); + ArrayList<Long> b2ReceiverUpdates = new ArrayList<Long>(); + ArrayList<Long> b3ReceiverUpdates = new ArrayList<Long>(); + for (Long key : createList) { + long mod = key % 4; + if (mod == 0) { + b0ReceiverUpdates.add(key); + } + else if (mod == 1) { + b1ReceiverUpdates.add(key); + } + else if (mod == 2) { + b2ReceiverUpdates.add(key); + } + else if (mod == 3) { + b3ReceiverUpdates.add(key); + } + } + b0ReceiverUpdates.remove(0); + b1ReceiverUpdates.remove(0); + b2ReceiverUpdates.remove(0); + b3ReceiverUpdates.remove(0); + + assertEquals(b0SenderUpdates, b0ReceiverUpdates); + assertEquals(b1SenderUpdates, b1ReceiverUpdates); + assertEquals(b2SenderUpdates, b2ReceiverUpdates); + assertEquals(b3SenderUpdates, b3ReceiverUpdates); + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java new file mode 100644 index 0000000..7b0cf3e --- /dev/null +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java @@ -0,0 +1,1234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.wan.parallel; + +import org.junit.Ignore; +import org.junit.experimental.categories.Category; +import org.junit.Test; + +import static org.junit.Assert.*; + +import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase; +import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase; +import com.gemstone.gemfire.test.junit.categories.DistributedTest; + +import java.util.Set; + +import org.junit.experimental.categories.Category; + +import com.gemstone.gemfire.cache.EntryExistsException; +import com.gemstone.gemfire.cache.Region; +import com.gemstone.gemfire.cache.client.ServerOperationException; +import com.gemstone.gemfire.cache.wan.GatewaySender; +import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; +import com.gemstone.gemfire.internal.cache.RegionQueue; +import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender; +import com.gemstone.gemfire.internal.cache.wan.BatchException70; +import com.gemstone.gemfire.internal.cache.wan.WANTestBase; +import com.gemstone.gemfire.test.dunit.AsyncInvocation; +import com.gemstone.gemfire.test.dunit.IgnoredException; +import com.gemstone.gemfire.test.dunit.LogWriterUtils; +import com.gemstone.gemfire.test.dunit.SerializableRunnableIF; +import com.gemstone.gemfire.test.dunit.Wait; +import com.gemstone.gemfire.test.junit.categories.FlakyTest; + +@Category(DistributedTest.class) +public class ParallelWANPropagationDUnitTest extends WANTestBase { + private static final long serialVersionUID = 1L; + + public ParallelWANPropagationDUnitTest() { + super(); + } + + @Test + public void test_ParallelGatewaySenderMetaRegionNotExposedToUser_Bug44216() { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCache(lnPort); + createSender("ln", 2, true, 100, 300, false, false, + null, true); + createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap()); + + Set<GatewaySender> senders = cache.getGatewaySenders(); + GatewaySender sender = null; + for (GatewaySender s : senders) { + if (s.getId().equals("ln")) { + sender = s; + break; + } + } + try { + sender.start(); + } catch (Exception e) { + e.printStackTrace(); + fail("Failed with IOException"); + } + + GemFireCacheImpl gemCache = (GemFireCacheImpl)cache; + Set regionSet = gemCache.rootRegions(); + + for (Object r : regionSet) { + if (((Region)r).getName().equals( + ((AbstractGatewaySender)sender).getQueues().toArray(new RegionQueue[1])[0].getRegion().getName())) { + fail("The shadowPR is exposed to the user"); + } + } + } + + @Test + public void testParallelPropagation_withoutRemoteSite() throws Exception { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + //keep a larger batch to minimize number of exception occurrences in the log + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 300, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 300, false, false, null, true )); + vm6.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 300, false, false, null, true )); + vm7.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 300, false, false, null, true )); + + vm4.invoke(createPartitionedRegionRedundancy1Runnable()); + vm5.invoke(createPartitionedRegionRedundancy1Runnable()); + vm6.invoke(createPartitionedRegionRedundancy1Runnable()); + vm7.invoke(createPartitionedRegionRedundancy1Runnable()); + + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + //make sure all the senders are running before doing any puts + vm4.invoke(waitForSenderRunnable()); + vm5.invoke(waitForSenderRunnable()); + vm6.invoke(waitForSenderRunnable()); + vm7.invoke(waitForSenderRunnable()); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", + 1000 )); + + createCacheInVMs(nyPort, vm2, vm3); + vm2.invoke(createReceiverPartitionedRegionRedundancy1()); + vm3.invoke(createReceiverPartitionedRegionRedundancy1()); + createReceiverInVMs(vm2, vm3); + + //verify all buckets drained on all sender nodes. + vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + + // Just making sure that though the remote site is started later, + // remote site is still able to get the data. Since the receivers are + // started before creating partition region it is quite possible that the + // region may loose some of the events. This needs to be handled by the code + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 1000 )); + } + + /** + * Normal happy scenario test case. + * @throws Exception + */ + @Test + public void testParallelPropagation() throws Exception { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + vm6.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + vm7.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + + vm4.invoke(createPartitionedRegionRedundancy1Runnable()); + vm5.invoke(createPartitionedRegionRedundancy1Runnable()); + vm6.invoke(createPartitionedRegionRedundancy1Runnable()); + vm7.invoke(createPartitionedRegionRedundancy1Runnable()); + + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + vm2.invoke(createReceiverPartitionedRegionRedundancy1()); + vm3.invoke(createReceiverPartitionedRegionRedundancy1()); + + //before doing any puts, let the senders be running in order to ensure that + //not a single event will be lost + vm4.invoke(waitForSenderRunnable()); + vm5.invoke(waitForSenderRunnable()); + vm6.invoke(waitForSenderRunnable()); + vm7.invoke(waitForSenderRunnable()); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", + 1000 )); + + //verify all buckets drained on all sender nodes. + vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 1000 )); + } + + protected SerializableRunnableIF createReceiverPartitionedRegionRedundancy1() { + return () -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ); + } + + protected SerializableRunnableIF createPartitionedRegionRedundancy1Runnable() { + return () -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ); + } + + protected SerializableRunnableIF waitForSenderRunnable() { + return () -> WANTestBase.waitForSenderRunningState( "ln" ); + } + + @Test + public void testParallelPropagation_ManualStart() throws Exception { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, false )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, false )); + vm6.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, false )); + vm7.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, false )); + + vm4.invoke(createPartitionedRegionRedundancy1Runnable()); + vm5.invoke(createPartitionedRegionRedundancy1Runnable()); + vm6.invoke(createPartitionedRegionRedundancy1Runnable()); + vm7.invoke(createPartitionedRegionRedundancy1Runnable()); + + vm2.invoke(createReceiverPartitionedRegionRedundancy1()); + vm3.invoke(createReceiverPartitionedRegionRedundancy1()); + + //before doing any puts, let the senders be running in order to ensure that + //not a single event will be lost + vm4.invoke(waitForSenderRunnable()); + vm5.invoke(waitForSenderRunnable()); + vm6.invoke(waitForSenderRunnable()); + vm7.invoke(waitForSenderRunnable()); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", + 1000 )); + + //verify all buckets drained on all sender nodes. + vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 1000 )); + } + + /** + * Normal happy scenario test case2. + * @throws Exception + */ + @Test + public void testParallelPropagationPutBeforeSenderStart() throws Exception { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + vm6.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + vm7.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + + vm4.invoke(createPartitionedRegionRedundancy1Runnable()); + vm5.invoke(createPartitionedRegionRedundancy1Runnable()); + vm6.invoke(createPartitionedRegionRedundancy1Runnable()); + vm7.invoke(createPartitionedRegionRedundancy1Runnable()); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", + 1000 )); + + startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7); + + vm2.invoke(createReceiverPartitionedRegionRedundancy1()); + vm3.invoke(createReceiverPartitionedRegionRedundancy1()); + + //before doing any puts, let the senders be running in order to ensure that + //not a single event will be lost + vm4.invoke(waitForSenderRunnable()); + vm5.invoke(waitForSenderRunnable()); + vm6.invoke(waitForSenderRunnable()); + vm7.invoke(waitForSenderRunnable()); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", + 1000 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 1000 )); + } + + /** + * Local and remote sites are up and running. + * Local site cache is closed and the site is built again. + * Puts are done to local site. + * Expected: Remote site should receive all the events put after the local + * site was built back. + * + * @throws Exception + */ + @Test + public void testParallelPropagationWithLocalCacheClosedAndRebuilt() throws Exception { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + vm6.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + vm7.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + + vm4.invoke(createPartitionedRegionRedundancy1Runnable()); + vm5.invoke(createPartitionedRegionRedundancy1Runnable()); + vm6.invoke(createPartitionedRegionRedundancy1Runnable()); + vm7.invoke(createPartitionedRegionRedundancy1Runnable()); + + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + vm2.invoke(createReceiverPartitionedRegionRedundancy1()); + vm3.invoke(createReceiverPartitionedRegionRedundancy1()); + + //before doing any puts, let the senders be running in order to ensure that + //not a single event will be lost + vm4.invoke(waitForSenderRunnable()); + vm5.invoke(waitForSenderRunnable()); + vm6.invoke(waitForSenderRunnable()); + vm7.invoke(waitForSenderRunnable()); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", + 1000 )); + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 1000 )); + //-------------------Close and rebuild local site --------------------------------- + + vm4.invoke(() -> WANTestBase.killSender()); + vm5.invoke(() -> WANTestBase.killSender()); + vm6.invoke(() -> WANTestBase.killSender()); + vm7.invoke(() -> WANTestBase.killSender()); + + Integer regionSize = + (Integer) vm2.invoke(() -> WANTestBase.getRegionSize(getTestMethodName() + "_PR" )); + LogWriterUtils.getLogWriter().info("Region size on remote is: " + regionSize); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + vm6.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + vm7.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + + vm4.invoke(createPartitionedRegionRedundancy1Runnable()); + vm5.invoke(createPartitionedRegionRedundancy1Runnable()); + vm6.invoke(createPartitionedRegionRedundancy1Runnable()); + vm7.invoke(createPartitionedRegionRedundancy1Runnable()); + + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + vm4.invoke(waitForSenderRunnable()); + vm5.invoke(waitForSenderRunnable()); + vm6.invoke(waitForSenderRunnable()); + vm7.invoke(waitForSenderRunnable()); + //------------------------------------------------------------------------------------ + + IgnoredException.addIgnoredException(EntryExistsException.class.getName()); + IgnoredException.addIgnoredException(BatchException70.class.getName()); + IgnoredException.addIgnoredException(ServerOperationException.class.getName()); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); + + //verify all buckets drained on all sender nodes. + vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 1000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 1000 )); + } + + @Test + public void testParallelColocatedPropagation() throws Exception { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + vm6.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + vm7.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + + vm4.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), "ln", 1, 100, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), "ln", 1, 100, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), "ln", 1, 100, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), "ln", 1, 100, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + vm2.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), null, 1, 100, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), null, 1, 100, isOffHeap() )); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 1000 )); + + //verify all buckets drained on all sender nodes. + vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName(), 1000 )); + } + /** + * Create colocated partitioned regions. + * Parent region has PGS attached and child region doesn't. + * + * Validate that events for parent region reaches remote site. + * + * @throws Exception + */ + + @Test + public void testParallelColocatedPropagation2() throws Exception { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + vm6.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + vm7.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + + vm4.invoke(() -> WANTestBase.createColocatedPartitionedRegions2( getTestMethodName(), "ln", 1, 100, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createColocatedPartitionedRegions2( getTestMethodName(), "ln", 1, 100, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createColocatedPartitionedRegions2( getTestMethodName(), "ln", 1, 100, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createColocatedPartitionedRegions2( getTestMethodName(), "ln", 1, 100, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + vm2.invoke(() -> WANTestBase.createColocatedPartitionedRegions2( getTestMethodName(), null, 1, 100, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createColocatedPartitionedRegions2( getTestMethodName(), null, 1, 100, isOffHeap() )); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 1000 )); + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName()+"_child1", 1000 )); + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName()+"_child2", 1000 )); + + //verify all buckets drained on all sender nodes. + vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName(), 1000 )); + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName()+"_child1", 0 )); + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName()+"_child2", 0 )); + } + + + @Test + public void testParallelPropagationWithOverflow() throws Exception { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName(), null, 1, 100, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName(), null, 1, 100, isOffHeap() )); + createReceiverInVMs(vm2, vm3); + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + vm6.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + vm7.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName(), "ln", 1, 100, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName(), "ln", 1, 100, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName(), "ln", 1, 100, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName(), "ln", 1, 100, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + //let all the senders start before doing any puts to ensure that none of the events is lost + vm4.invoke(waitForSenderRunnable()); + vm5.invoke(waitForSenderRunnable()); + vm6.invoke(waitForSenderRunnable()); + vm7.invoke(waitForSenderRunnable()); + + vm4.invoke(() -> WANTestBase.doHeavyPuts( getTestMethodName(), 150 )); + + //verify all buckets drained on all sender nodes. + vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName(), 150 )); + } + + @Test + public void testSerialReplicatedAndParallelPartitionedPropagation() + throws Exception { + + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createSender( "lnSerial", + 2, false, 100, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "lnSerial", + 2, false, 100, 10, false, false, null, true )); + + vm4.invoke(() -> WANTestBase.createSender( "lnParallel", + 2, true, 100, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "lnParallel", + 2, true, 100, 10, false, false, null, true )); + vm6.invoke(() -> WANTestBase.createSender( "lnParallel", + 2, true, 100, 10, false, false, null, true )); + vm7.invoke(() -> WANTestBase.createSender( "lnParallel", + 2, true, 100, 10, false, false, null, true )); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + + vm2.invoke(createReceiverPartitionedRegionRedundancy1()); + vm3.invoke(createReceiverPartitionedRegionRedundancy1()); + + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "lnSerial", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "lnSerial", isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "lnSerial", isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "lnSerial", isOffHeap() )); + + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "lnParallel", 1, 100, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "lnParallel", 1, 100, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "lnParallel", 1, 100, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "lnParallel", 1, 100, isOffHeap() )); + + startSenderInVMs("lnSerial", vm4, vm5); + + startSenderInVMs("lnParallel", vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", + 1000 )); + vm5.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", + 1000 )); + + //verify all buckets drained on all sender nodes. + vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("lnParallel")); + vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("lnParallel")); + vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("lnParallel")); + vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("lnParallel")); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 1000 )); + } + + @Test + public void testPartitionedParallelPropagationToTwoWanSites() + throws Exception { + Integer lnPort = createFirstLocatorWithDSId(1); + Integer nyPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + Integer tkPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 3, lnPort )); + + createCacheInVMs(nyPort, vm2); + vm2.invoke(createReceiverPartitionedRegionRedundancy1()); + createReceiverInVMs(vm2); + + createCacheInVMs(tkPort, vm3); + vm3.invoke(createReceiverPartitionedRegionRedundancy1()); + createReceiverInVMs(vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createSender( "lnParallel1", + 2, true, 100, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "lnParallel1", + 2, true, 100, 10, false, false, null, true )); + vm6.invoke(() -> WANTestBase.createSender( "lnParallel1", + 2, true, 100, 10, false, false, null, true )); + vm7.invoke(() -> WANTestBase.createSender( "lnParallel1", + 2, true, 100, 10, false, false, null, true )); + + vm4.invoke(() -> WANTestBase.createSender( "lnParallel2", + 3, true, 100, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "lnParallel2", + 3, true, 100, 10, false, false, null, true )); + vm6.invoke(() -> WANTestBase.createSender( "lnParallel2", + 3, true, 100, 10, false, false, null, true )); + vm7.invoke(() -> WANTestBase.createSender( "lnParallel2", + 3, true, 100, 10, false, false, null, true )); + + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "lnParallel1,lnParallel2", 1, 100, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "lnParallel1,lnParallel2", 1, 100, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "lnParallel1,lnParallel2", 1, 100, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "lnParallel1,lnParallel2", 1, 100, isOffHeap() )); + + startSenderInVMs("lnParallel1", vm4, vm5, vm6, vm7); + + startSenderInVMs("lnParallel2", vm4, vm5, vm6, vm7); + + + //before doing puts, make sure that the senders are started. + //this will ensure that not a single events is lost + vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "lnParallel1" )); + vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "lnParallel1" )); + vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "lnParallel1" )); + vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "lnParallel1" )); + + vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "lnParallel2" )); + vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "lnParallel2" )); + vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "lnParallel2" )); + vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "lnParallel2" )); + + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", + 1000 )); + + //verify all buckets drained on all sender nodes. + vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("lnParallel1")); + vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("lnParallel1")); + vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("lnParallel1")); + vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("lnParallel1")); + + //verify all buckets drained on all sender nodes. + vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("lnParallel2")); + vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("lnParallel2")); + vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("lnParallel2")); + vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("lnParallel2")); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 1000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 1000 )); + } + + @Category(FlakyTest.class) // GEODE-1008 and GEODE-1180: random ports, async actions, thread sleeps, time sensitive, waitForCriterion + @Test + public void testPartitionedParallelPropagationHA() throws Exception { + IgnoredException.addIgnoredException("Broken pipe"); + IgnoredException.addIgnoredException("Connection reset"); + IgnoredException.addIgnoredException("Unexpected IOException"); + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + vm6.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + vm7.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + + + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 2, 100, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 2, 100, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 2, 100, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 2, 100, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + vm2.invoke(createReceiverPartitionedRegionRedundancy1()); + vm3.invoke(createReceiverPartitionedRegionRedundancy1()); + + AsyncInvocation inv1 = vm7.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 5000 )); + Wait.pause(500); + AsyncInvocation inv2 = vm4.invokeAsync(() -> WANTestBase.killSender()); + AsyncInvocation inv3 = vm6.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 10000 )); + Wait.pause(1500); + AsyncInvocation inv4 = vm5.invokeAsync(() -> WANTestBase.killSender()); + inv1.join(); + inv2.join(); + inv3.join(); + inv4.join(); + + vm6.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 10000 )); + vm7.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 10000 )); + + //verify all buckets drained on the sender nodes that up and running. + vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 10000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 10000 )); + } + + @Test + public void testParallelPropagationWithFilter() throws Exception { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, + new MyGatewayEventFilter(), true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, + new MyGatewayEventFilter(), true )); + vm6.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, + new MyGatewayEventFilter(), true )); + vm7.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, + new MyGatewayEventFilter(), true )); + + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName(), "ln", 1, 100, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName(), "ln", 1, 100, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName(), "ln", 1, 100, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName(), "ln", 1, 100, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName(), null, 1, 100, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName(), null, 1, 100, isOffHeap() )); + + //wait for senders to be running before doing any puts. This will ensure that + //not a single events is lost + vm4.invoke(waitForSenderRunnable()); + vm5.invoke(waitForSenderRunnable()); + vm6.invoke(waitForSenderRunnable()); + vm7.invoke(waitForSenderRunnable()); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 1000 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName(), 800 )); + } + + + @Test + public void testParallelPropagationWithPutAll() throws Exception { + + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + vm6.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + vm7.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + + vm4.invoke(createPartitionedRegionRedundancy1Runnable()); + vm5.invoke(createPartitionedRegionRedundancy1Runnable()); + vm6.invoke(createPartitionedRegionRedundancy1Runnable()); + vm7.invoke(createPartitionedRegionRedundancy1Runnable()); + + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + vm2.invoke(createReceiverPartitionedRegionRedundancy1()); + vm3.invoke(createReceiverPartitionedRegionRedundancy1()); + + //before doing any puts, let the senders be running in order to ensure that + //not a single event will be lost + vm4.invoke(waitForSenderRunnable()); + vm5.invoke(waitForSenderRunnable()); + vm6.invoke(waitForSenderRunnable()); + vm7.invoke(waitForSenderRunnable()); + + vm4.invoke(() -> WANTestBase.doPutAll( getTestMethodName() + "_PR", + 100 , 50 )); + + //verify all buckets drained on all sender nodes. + vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 5000 )); + + } + + /** + * There was a bug that all destroy events were being put into different buckets of sender queue + * against the key 0. Bug# 44304 + * + * @throws Exception + */ + @Test + public void testParallelPropagationWithDestroy() throws Exception { + + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 100, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 100, false, false, null, true )); + vm6.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 100, false, false, null, true )); + vm7.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 100, false, false, null, true )); + + vm4.invoke(createPartitionedRegionRedundancy1Runnable()); + vm5.invoke(createPartitionedRegionRedundancy1Runnable()); + vm6.invoke(createPartitionedRegionRedundancy1Runnable()); + vm7.invoke(createPartitionedRegionRedundancy1Runnable()); + + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + vm2.invoke(createReceiverPartitionedRegionRedundancy1()); + vm3.invoke(createReceiverPartitionedRegionRedundancy1()); + + //before doing any puts, let the senders be running in order to ensure that + //not a single event will be lost + vm4.invoke(waitForSenderRunnable()); + vm5.invoke(waitForSenderRunnable()); + vm6.invoke(waitForSenderRunnable()); + vm7.invoke(waitForSenderRunnable()); + + vm4.invoke(() -> WANTestBase.pauseSender( "ln" )); + vm5.invoke(() -> WANTestBase.pauseSender( "ln" )); + vm6.invoke(() -> WANTestBase.pauseSender( "ln" )); + vm7.invoke(() -> WANTestBase.pauseSender( "ln" )); + + Wait.pause(2000); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); + vm4.invoke(() -> WANTestBase.doDestroys( getTestMethodName() + "_PR", 500 )); + + + vm4.invoke(() -> WANTestBase.validateParallelSenderQueueBucketSize( "ln", 15 )); + vm5.invoke(() -> WANTestBase.validateParallelSenderQueueBucketSize( "ln", 15 )); + vm6.invoke(() -> WANTestBase.validateParallelSenderQueueBucketSize( "ln", 15 )); + vm7.invoke(() -> WANTestBase.validateParallelSenderQueueBucketSize( "ln", 15 )); + + vm4.invoke(() -> WANTestBase.resumeSender( "ln" )); + vm5.invoke(() -> WANTestBase.resumeSender( "ln" )); + vm6.invoke(() -> WANTestBase.resumeSender( "ln" )); + vm7.invoke(() -> WANTestBase.resumeSender( "ln" )); + + //give some time for the queue to drain + Wait.pause(5000); + + vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" )); + vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" )); + vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" )); + vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 500 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 500 )); + + } + + /** + * Normal happy scenario test case. But with Tx operations + * @throws Exception + */ + @Test + public void testParallelPropagationTxOperations() throws Exception { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + createCacheInVMs(lnPort, vm4, vm5); + //vm6.invoke(() -> WANTestBase.createCache( lnPort )); + //vm7.invoke(() -> WANTestBase.createCache( lnPort )); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + //vm6.invoke(() -> WANTestBase.createSender( "ln", 2, + // true, 100, 10, false, false, null, true )); + //vm7.invoke(() -> WANTestBase.createSender( "ln", 2, + // true, 100, 10, false, false, null, true )); + + vm4.invoke(createPartitionedRegionRedundancy1Runnable()); + vm5.invoke(createPartitionedRegionRedundancy1Runnable()); +// vm6.invoke(() -> WANTestBase.createPartitionedRegion( +// testName + "_PR", "ln", true, 1, 100, isOffHeap() )); +// vm7.invoke(() -> WANTestBase.createPartitionedRegion( +// testName + "_PR", "ln", true, 1, 100, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5); +// vm6.invoke(() -> WANTestBase.startSender( "ln" )); +// vm7.invoke(() -> WANTestBase.startSender( "ln" )); + + vm2.invoke(createReceiverPartitionedRegionRedundancy1()); + vm3.invoke(createReceiverPartitionedRegionRedundancy1()); + + //before doing any puts, let the senders be running in order to ensure that + //not a single event will be lost + vm4.invoke(waitForSenderRunnable()); + vm5.invoke(waitForSenderRunnable()); +// vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); +// vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm4.invoke(() -> WANTestBase.doTxPuts( getTestMethodName() + "_PR")); + + //verify all buckets drained on all sender nodes. + vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); +// vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); +// vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 3 )); + } + + @Ignore + @Test + public void testParallelGatewaySenderQueueLocalSize() { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2); + createCacheInVMs(nyPort, vm2); + createReceiverInVMs(vm2); + createCacheInVMs(lnPort, vm4, vm5); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + + vm4.invoke(createPartitionedRegionRedundancy1Runnable()); + vm5.invoke(createPartitionedRegionRedundancy1Runnable()); + + startSenderInVMs("ln", vm4, vm5); + + vm4.invoke(waitForSenderRunnable()); + vm5.invoke(waitForSenderRunnable()); + + vm4.invoke(() -> WANTestBase.pauseSender( "ln" )); + vm5.invoke(() -> WANTestBase.pauseSender( "ln" )); + + /* + * Remember pausing sender does not guarantee that peek will be paused + * immediately as its quite possible event processor is already in peeking + * events and send them after peeking without a check for pause. hence below + * pause of 1 sec to allow dispatching to be paused + */ +// vm4.invoke(() -> WANTestBase.waitForSenderPausedState( "ln" )); +// vm5.invoke(() -> WANTestBase.waitForSenderPausedState( "ln" )); + Wait.pause(1000); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", + 10 )); + + vm4.invoke(() -> WANTestBase.validateQueueContents( "ln", 10 )); + vm5.invoke(() -> WANTestBase.validateQueueContents( "ln", 10 )); + + // instead of checking size as 5 and 5. check that combined size is 10 + Integer localSize1 = (Integer)vm4.invoke(() -> WANTestBase.getPRQLocalSize( "ln")); + Integer localSize2 = (Integer)vm5.invoke(() -> WANTestBase.getPRQLocalSize( "ln")); + assertEquals(10, localSize1 + localSize2); + } + + + + public void tParallelGatewaySenderQueueLocalSizeWithHA() { + IgnoredException.addIgnoredException("Broken pipe"); + IgnoredException.addIgnoredException("Connection reset"); + IgnoredException.addIgnoredException("Unexpected IOException"); + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2); + createReceiverInVMs(vm2); + createCacheInVMs(lnPort, vm4, vm5); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + + vm4.invoke(createPartitionedRegionRedundancy1Runnable()); + vm5.invoke(createPartitionedRegionRedundancy1Runnable()); + + startSenderInVMs("ln", vm4, vm5); + + vm4.invoke(waitForSenderRunnable()); + vm5.invoke(waitForSenderRunnable()); + + vm4.invoke(() -> WANTestBase.pauseSender( "ln" )); + vm5.invoke(() -> WANTestBase.pauseSender( "ln" )); + + /* + * Remember pausing sender does not guarantee that peek will be paused + * immediately as its quite possible event processor is already in peeking + * events and send them after peeking without a check for pause. hence below + * pause of 1 sec to allow dispatching to be paused + */ +// vm4.invoke(() -> WANTestBase.waitForSenderPausedState( "ln" )); +// vm5.invoke(() -> WANTestBase.waitForSenderPausedState( "ln" )); + Wait.pause(1000); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", + 10 )); + + vm4.invoke(() -> WANTestBase.validateQueueContents( "ln", 10 )); + vm5.invoke(() -> WANTestBase.validateQueueContents( "ln", 10 )); + + Integer localSize1 = (Integer)vm4.invoke(() -> WANTestBase.getPRQLocalSize( "ln")); + Integer localSize2 = (Integer)vm5.invoke(() -> WANTestBase.getPRQLocalSize( "ln")); + assertEquals(10, localSize1 + localSize2); + + vm5.invoke(() -> WANTestBase.killSender( )); + + vm4.invoke(() -> WANTestBase.validateQueueContents( "ln", 10 )); + vm4.invoke(() -> WANTestBase.checkPRQLocalSize( "ln", 10 )); + + } + + /** + * Added for defect #50364 Can't colocate region that has AEQ with a region that does not have that same AEQ + */ + @Test + public void testParallelSenderAttachedToChildRegionButNotToParentRegion() { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + //create cache and receiver on site2 + createCacheInVMs(nyPort, vm2); + createReceiverInVMs(vm2); + //create cache on site1 + createCacheInVMs(lnPort, vm3); + + //create sender on site1 + vm3.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + + //start sender on site1 + startSenderInVMs("ln", vm3); + + //create leader (parent) PR on site1 + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "PARENT_PR", null, 0, 100, isOffHeap() )); + String parentRegionFullPath = + (String) vm3.invoke(() -> WANTestBase.getRegionFullPath( getTestMethodName() + "PARENT_PR")); + + //create colocated (child) PR on site1 + vm3.invoke(() -> WANTestBase.createColocatedPartitionedRegion( + getTestMethodName() + "CHILD_PR", "ln", 0, 100, parentRegionFullPath )); + + //create leader and colocated PR on site2 + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "PARENT_PR", null, 0, 100, isOffHeap() )); + vm2.invoke(() -> WANTestBase.createColocatedPartitionedRegion( + getTestMethodName() + "CHILD_PR", null, 0, 100, parentRegionFullPath )); + + //do puts in colocated (child) PR on site1 + vm3.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "CHILD_PR", 1000 )); + + //verify the puts reach site2 + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "CHILD_PR", 1000 )); + } + + @Test + public void testParallelPropagationWithFilter_AfterAck() throws Exception { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm6, vm7); + createReceiverInVMs(vm6, vm7); + + createCacheInVMs(lnPort, vm2, vm3, vm4, vm5); + + vm2.invoke(() -> WANTestBase.createSender( "ln", 2, true, + 100, 10, false, false, new MyGatewayEventFilter_AfterAck(), true )); + vm3.invoke(() -> WANTestBase.createSender( "ln", 2, true, + 100, 10, false, false, new MyGatewayEventFilter_AfterAck(), true )); + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, true, + 100, 10, false, false, new MyGatewayEventFilter_AfterAck(), true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, true, + 100, 10, false, false, new MyGatewayEventFilter_AfterAck(), true )); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName(), "ln", 1, 100, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName(), "ln", 1, 100, isOffHeap() )); + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName(), "ln", 1, 100, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName(), "ln", 1, 100, isOffHeap() )); + + startSenderInVMs("ln", vm2, vm3, vm4, vm5); + + vm6.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName(), null, 1, 100, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName(), null, 1, 100, isOffHeap() )); + + // wait for senders to be running before doing any puts. This will ensure + // that + // not a single events is lost + vm2.invoke(waitForSenderRunnable()); + vm3.invoke(waitForSenderRunnable()); + vm4.invoke(waitForSenderRunnable()); + vm5.invoke(waitForSenderRunnable()); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 1000 )); + + vm2.invoke(() -> WANTestBase.validateQueueContents( "ln", + 0 )); + vm3.invoke(() -> WANTestBase.validateQueueContents( "ln", + 0 )); + vm4.invoke(() -> WANTestBase.validateQueueContents( "ln", + 0 )); + vm5.invoke(() -> WANTestBase.validateQueueContents( "ln", + 0 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName(), 1000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName(), 1000 )); + vm4.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName(), 1000 )); + vm5.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName(), 1000 )); + + vm6.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName(), 1000 )); + vm7.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName(), 1000 )); + + Integer vm2Acks = (Integer)vm2.invoke(() -> WANTestBase.validateAfterAck( "ln")); + Integer vm3Acks = (Integer)vm3.invoke(() -> WANTestBase.validateAfterAck( "ln")); + Integer vm4Acks = (Integer)vm4.invoke(() -> WANTestBase.validateAfterAck( "ln")); + Integer vm5Acks = (Integer)vm5.invoke(() -> WANTestBase.validateAfterAck( "ln")); + + assertEquals(2000, (vm2Acks + vm3Acks + vm4Acks + vm5Acks)); + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationLoopBackDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationLoopBackDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationLoopBackDUnitTest.java new file mode 100644 index 0000000..07a6223 --- /dev/null +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationLoopBackDUnitTest.java @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.wan.parallel; + +import org.junit.experimental.categories.Category; +import org.junit.Test; + +import static org.junit.Assert.*; + +import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase; +import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase; +import com.gemstone.gemfire.test.junit.categories.DistributedTest; + +import com.gemstone.gemfire.internal.cache.wan.WANTestBase; +import com.gemstone.gemfire.test.dunit.Wait; + +@Category(DistributedTest.class) +public class ParallelWANPropagationLoopBackDUnitTest extends WANTestBase { + + private static final long serialVersionUID = 1L; + + public ParallelWANPropagationLoopBackDUnitTest() { + super(); + } + + /** + * Test loop back issue between 2 WAN sites (LN & NY). LN -> NY -> LN. + * Site1 (LN): vm2, vm4, vm5 + * Site2 (NY): vm3, vm6, vm7 + */ + @Test + public void testParallelPropagationLoopBack() { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + //create receiver on site1 and site2 + createCacheInVMs(lnPort, vm2, vm4, vm5); + vm2.invoke(() -> WANTestBase.createReceiver()); + createCacheInVMs(nyPort, vm3, vm6, vm7); + vm3.invoke(() -> WANTestBase.createReceiver()); + + //create senders on site1 + vm2.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + + //create senders on site2 + vm3.invoke(() -> WANTestBase.createSender( "ny", 1, + true, 100, 10, false, false, null, true )); + vm6.invoke(() -> WANTestBase.createSender( "ny", 1, + true, 100, 10, false, false, null, true )); + vm7.invoke(() -> WANTestBase.createSender( "ny", 1, + true, 100, 10, false, false, null, true )); + + //create PR on site1 + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 0, 100, isOffHeap() )); + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 0, 100, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 0, 100, isOffHeap() )); + + //create PR on site2 + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ny", 0, 100, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ny", 0, 100, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ny", 0, 100, isOffHeap() )); + + //start sender on site1 + startSenderInVMs("ln", vm2, vm4, vm5); + + + //start sender on site2 + startSenderInVMs("ny", vm3, vm6, vm7); + + + //pause senders on site1 + vm2.invoke(() -> WANTestBase.pauseSender( "ln" )); + vm4.invoke(() -> WANTestBase.pauseSender( "ln" )); + vm5.invoke(() -> WANTestBase.pauseSender( "ln" )); + + //pause senders on site2 + vm3.invoke(() -> WANTestBase.pauseSender( "ny" )); + vm6.invoke(() -> WANTestBase.pauseSender( "ny" )); + vm7.invoke(() -> WANTestBase.pauseSender( "ny" )); + + //this is required since sender pause doesn't take effect immediately + Wait.pause(1000); + + //Do 100 puts on site1 + vm2.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", + 100 )); + //do next 100 puts on site2 + vm3.invoke(() -> WANTestBase.doPutsFrom( getTestMethodName() + "_PR", + 100, 200 )); + //verify queue size on both sites + vm2.invoke(() -> WANTestBase.verifyQueueSize( "ln", 100 )); + vm4.invoke(() -> WANTestBase.verifyQueueSize( "ln", 100 )); + vm5.invoke(() -> WANTestBase.verifyQueueSize( "ln", 100 )); + + vm3.invoke(() -> WANTestBase.verifyQueueSize( "ny", 100 )); + vm6.invoke(() -> WANTestBase.verifyQueueSize( "ny", 100 )); + vm7.invoke(() -> WANTestBase.verifyQueueSize( "ny", 100 )); + + //resume sender on site1 + vm2.invoke(() -> WANTestBase.resumeSender( "ln" )); + vm4.invoke(() -> WANTestBase.resumeSender( "ln" )); + vm5.invoke(() -> WANTestBase.resumeSender( "ln" )); + + //validate events reached site2 from site1 + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 200 )); + + //on site2, verify queue size again + //this ensures that loopback is not happening since the queue size is same as before + //the event coming from site1 are not enqueued again + vm3.invoke(() -> WANTestBase.verifyQueueSize( "ny", 100 )); + vm6.invoke(() -> WANTestBase.verifyQueueSize( "ny", 100 )); + vm7.invoke(() -> WANTestBase.verifyQueueSize( "ny", 100 )); + + //resume sender on site2 + vm3.invoke(() -> WANTestBase.resumeSender( "ny" )); + vm6.invoke(() -> WANTestBase.resumeSender( "ny" )); + vm7.invoke(() -> WANTestBase.resumeSender( "ny" )); + + //validate region size on both the sites + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 200 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 200 )); + } + + /** + * Test loop back issue among 3 WAN sites with Ring topology i.e. LN -> NY -> TK -> LN + * Site1 (LN): vm3, vm6 + * Site2 (NY): vm4, vm7 + * Site3 (TK): vm5 + */ + @Test + public void testParallelPropagationLoopBack3Sites() { + //Create locators + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + Integer tkPort = (Integer)vm2.invoke(() -> WANTestBase.createFirstRemoteLocator( 3, lnPort )); + + //create cache and receivers on all the 3 sites + createCacheInVMs(lnPort, vm3, vm6); + createReceiverInVMs(vm3, vm6); + createCacheInVMs(nyPort, vm4, vm7); + createReceiverInVMs(vm4, vm7); + createCacheInVMs(tkPort, vm5); + createReceiverInVMs(vm5); + + + //create senders on all the 3 sites + vm3.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + vm6.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + + vm4.invoke(() -> WANTestBase.createSender( "ny", 3, + true, 100, 10, false, false, null, true )); + vm7.invoke(() -> WANTestBase.createSender( "ny", 3, + true, 100, 10, false, false, null, true )); + + vm5.invoke(() -> WANTestBase.createSender( "tk", 1, + true, 100, 10, false, false, null, true )); + + //create PR on the 3 sites + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 0, 100, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 0, 100, isOffHeap() )); + + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ny", 0, 100, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ny", 0, 100, isOffHeap() )); + + vm5.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "tk", 0, 100, isOffHeap() )); + + //start senders on all the sites + startSenderInVMs("ln", vm3, vm6); + + startSenderInVMs("ny", vm4, vm7); + + vm5.invoke(() -> WANTestBase.startSender( "tk" )); + + //pause senders on site1 and site3. Site2 has the sender running to pass along events + vm3.invoke(() -> WANTestBase.pauseSender( "ln" )); + vm6.invoke(() -> WANTestBase.pauseSender( "ln" )); + + vm5.invoke(() -> WANTestBase.pauseSender( "tk" )); + + //need to have this pause since pauseSender doesn't take effect immediately + Wait.pause(1000); + + //do puts on site1 + vm3.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", + 100 )); + + //do more puts on site3 + vm5.invoke(() -> WANTestBase.doPutsFrom( getTestMethodName() + "_PR", + 100, 200 )); + + //verify queue size on site1 and site3 + vm3.invoke(() -> WANTestBase.verifyQueueSize( "ln", 100 )); + vm5.invoke(() -> WANTestBase.verifyQueueSize( "tk", 100 )); + + //resume sender on site1 so that events reach site2 and from there to site3 + vm3.invoke(() -> WANTestBase.resumeSender( "ln" )); + vm6.invoke(() -> WANTestBase.resumeSender( "ln" )); + + //validate region size on site2 (should have 100) and site3 (should have 200) + vm4.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 100 )); + vm5.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 200 )); + + //verify queue size remains same on site3 which means event loopback did not happen + //this means events coming from site1 are not enqueued back into the sender + vm5.invoke(() -> WANTestBase.verifyQueueSize( "tk", 100 )); + + //resume sender on site3 + vm5.invoke(() -> WANTestBase.resumeSender( "tk" )); + + //validate region size + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 200 )); + vm4.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 200 )); + vm5.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 200 )); + } + + /** + * Test loop back issue among 3 WAN sites with N to N topology + * i.e. each site connected to all other sites. + * Puts are done to only one DS. + * LN site: vm3, vm6 + * NY site: vm4, vm7 + * TK site: vm5 + */ + @Test + public void testParallelPropagationLoopBack3SitesNtoNTopologyPutFromOneDS() { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + Integer tkPort = (Integer)vm2.invoke(() -> WANTestBase.createFirstRemoteLocator( 3, lnPort )); + + createCacheInVMs(lnPort, vm3, vm6); + createCacheInVMs(nyPort, vm4, vm7); + createCacheInVMs(tkPort, vm5); + vm3.invoke(() -> WANTestBase.createReceiver()); + vm4.invoke(() -> WANTestBase.createReceiver()); + vm5.invoke(() -> WANTestBase.createReceiver()); + + //site1 + vm3.invoke(() -> WANTestBase.createSender( "ln1", 2, + true, 100, 10, false, false, null, true )); + vm6.invoke(() -> WANTestBase.createSender( "ln1", 2, + true, 100, 10, false, false, null, true )); + + vm3.invoke(() -> WANTestBase.createSender( "ln2", 3, + true, 100, 10, false, false, null, true )); + vm6.invoke(() -> WANTestBase.createSender( "ln2", 3, + true, 100, 10, false, false, null, true )); + + //site2 + vm4.invoke(() -> WANTestBase.createSender( "ny1", 1, + true, 100, 10, false, false, null, true )); + vm7.invoke(() -> WANTestBase.createSender( "ny1", 1, + true, 100, 10, false, false, null, true )); + + vm4.invoke(() -> WANTestBase.createSender( "ny2", 3, + true, 100, 10, false, false, null, true )); + vm7.invoke(() -> WANTestBase.createSender( "ny2", 3, + true, 100, 10, false, false, null, true )); + + //site3 + vm5.invoke(() -> WANTestBase.createSender( "tk1", 1, + true, 100, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "tk2", 2, + true, 100, 10, false, false, null, true )); + + //create PR + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln1,ln2", 0, 1, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln1,ln2", 0, 1, isOffHeap() )); + + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ny1,ny2", 0, 1, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ny1,ny2", 0, 1, isOffHeap() )); + + vm5.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "tk1,tk2", 0, 1, isOffHeap() )); + + //start all the senders + vm3.invoke(() -> WANTestBase.startSender( "ln1" )); + vm3.invoke(() -> WANTestBase.startSender( "ln2" )); + vm6.invoke(() -> WANTestBase.startSender( "ln1" )); + vm6.invoke(() -> WANTestBase.startSender( "ln2" )); + + vm4.invoke(() -> WANTestBase.startSender( "ny1" )); + vm4.invoke(() -> WANTestBase.startSender( "ny2" )); + vm7.invoke(() -> WANTestBase.startSender( "ny1" )); + vm7.invoke(() -> WANTestBase.startSender( "ny2" )); + + vm5.invoke(() -> WANTestBase.startSender( "tk1" )); + vm5.invoke(() -> WANTestBase.startSender( "tk2" )); + + //pause senders on all the sites + vm3.invoke(() -> WANTestBase.pauseSender( "ln1" )); + vm3.invoke(() -> WANTestBase.pauseSender( "ln2" )); + vm6.invoke(() -> WANTestBase.pauseSender( "ln1" )); + vm6.invoke(() -> WANTestBase.pauseSender( "ln2" )); + + vm4.invoke(() -> WANTestBase.pauseSender( "ny1" )); + vm4.invoke(() -> WANTestBase.pauseSender( "ny2" )); + vm7.invoke(() -> WANTestBase.pauseSender( "ny1" )); + vm7.invoke(() -> WANTestBase.pauseSender( "ny2" )); + + vm5.invoke(() -> WANTestBase.pauseSender( "tk1" )); + vm5.invoke(() -> WANTestBase.pauseSender( "tk2" )); + + //this is required since sender pause doesn't take effect immediately + Wait.pause(1000); + + //do puts on site1 + vm3.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", + 100 )); + + //verify queue size on site1 and site3 + vm3.invoke(() -> WANTestBase.verifyQueueSize( "ln1", 100 )); + vm3.invoke(() -> WANTestBase.verifyQueueSize( "ln2", 100 )); + + //resume sender (from site1 to site2) on site1 + vm3.invoke(() -> WANTestBase.resumeSender( "ln1" )); + vm6.invoke(() -> WANTestBase.resumeSender( "ln1" )); + + //validate region size on site2 + vm4.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 100 )); + + //verify queue size on site2 (sender 2 to 1) + //should remain at 0 as the events from site1 should not go back to site1 + vm4.invoke(() -> WANTestBase.verifyQueueSize( "ny1", 0 )); + + //verify queue size on site2 (sender 2 to 3) + //should remain at 0 as events from site1 will reach site3 directly..site2 need not send to site3 again + vm4.invoke(() -> WANTestBase.verifyQueueSize( "ny2", 0 )); + + //do more puts on site3 + vm5.invoke(() -> WANTestBase.doPutsFrom( getTestMethodName() + "_PR", + 100, 200 )); + + //resume sender (from site3 to site2) on site3 + vm5.invoke(() -> WANTestBase.resumeSender( "tk2" )); + + //validate region size on site2 + vm4.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 200 )); + + //verify queue size on site2 (sender 2 to 3) + //should remain at 0 as the events from site3 should not go back to site3 + vm4.invoke(() -> WANTestBase.verifyQueueSize( "ny2", 0 )); + + //verify queue size on site2 (sender 2 to 1) + //should remain at 0 as events from site3 will reach site1 directly..site2 need not send to site1 again + vm4.invoke(() -> WANTestBase.verifyQueueSize( "ny1", 0 )); + + //resume all senders + vm3.invoke(() -> WANTestBase.resumeSender( "ln2" )); + vm6.invoke(() -> WANTestBase.resumeSender( "ln2" )); + + vm4.invoke(() -> WANTestBase.resumeSender( "ny1" )); + vm4.invoke(() -> WANTestBase.resumeSender( "ny2" )); + vm7.invoke(() -> WANTestBase.resumeSender( "ny1" )); + vm7.invoke(() -> WANTestBase.resumeSender( "ny2" )); + + vm5.invoke(() -> WANTestBase.resumeSender( "tk1" )); + + //validate region size on all sites + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 200 )); + vm4.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 200 )); + vm5.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 200 )); + } + + + +}
