http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropagationDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropagationDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropagationDUnitTest.java deleted file mode 100644 index 0cb60be..0000000 --- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropagationDUnitTest.java +++ /dev/null @@ -1,1336 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.wan.serial; - -import static org.junit.Assert.*; - -import java.io.IOException; -import java.util.Map; - -import org.junit.Ignore; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import com.gemstone.gemfire.cache.CacheException; -import com.gemstone.gemfire.cache.EntryExistsException; -import com.gemstone.gemfire.cache.client.ServerOperationException; -import com.gemstone.gemfire.cache30.CacheSerializableRunnable; -import com.gemstone.gemfire.distributed.internal.DistributionConfig; -import com.gemstone.gemfire.internal.cache.wan.BatchException70; -import com.gemstone.gemfire.internal.cache.wan.WANTestBase; -import com.gemstone.gemfire.test.dunit.AsyncInvocation; -import com.gemstone.gemfire.test.dunit.IgnoredException; -import com.gemstone.gemfire.test.dunit.LogWriterUtils; -import com.gemstone.gemfire.test.dunit.SerializableRunnableIF; -import com.gemstone.gemfire.test.dunit.Wait; -import com.gemstone.gemfire.test.junit.categories.DistributedTest; -import com.gemstone.gemfire.test.junit.categories.FlakyTest; - -@Category(DistributedTest.class) -public class SerialWANPropagationDUnitTest extends WANTestBase { - - @Override - public final void postSetUpWANTestBase() throws Exception { - IgnoredException.addIgnoredException("Connection reset"); - IgnoredException.addIgnoredException("Broken pipe"); - IgnoredException.addIgnoredException("Connection refused"); - IgnoredException.addIgnoredException("could not get remote locator information"); - IgnoredException.addIgnoredException("Unexpected IOException"); - } - - /** - * this test is disabled due to a high rate of failure in unit test runs - * see ticket #52190 - */ - @Ignore("TODO: test is disabled because of #52190") - @Test - public void testReplicatedSerialPropagation_withoutRemoteLocator() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - //keep the batch size high enough to reduce the number of exceptions in the log - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 400, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 400, false, false, null, true )); - - startSenderInVMs("ln", vm4, vm5); - - vm4.invoke(createReplicatedRegionRunnable()); - vm5.invoke(createReplicatedRegionRunnable()); - vm6.invoke(createReplicatedRegionRunnable()); - vm7.invoke(createReplicatedRegionRunnable()); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", - 1000 )); - - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - - vm2.invoke(() -> WANTestBase.createReceiver()); - vm3.invoke(() -> WANTestBase.createReceiver()); - - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - - vm4.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - } - - protected SerializableRunnableIF createReplicatedRegionRunnable() { - return () -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() ); - } - - @Test - public void testReplicatedSerialPropagation_withoutRemoteSite() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - //keep the batch size high enough to reduce the number of exceptions in the log - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 400, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 400, false, false, null, true )); - - startSenderInVMs("ln", vm4, vm5); - - vm4.invoke(createReplicatedRegionRunnable()); - vm5.invoke(createReplicatedRegionRunnable()); - vm6.invoke(createReplicatedRegionRunnable()); - vm7.invoke(createReplicatedRegionRunnable()); - - IgnoredException.addIgnoredException(BatchException70.class.getName()); - IgnoredException.addIgnoredException(ServerOperationException.class.getName()); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", - 1000 )); - - createCacheInVMs(nyPort, vm2, vm3); - - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - - vm2.invoke(() -> WANTestBase.createReceiver()); - vm3.invoke(() -> WANTestBase.createReceiver()); - - vm4.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - } - - /** - * Added to reproduce the bug #46595 - */ - @Test - public void testReplicatedSerialPropagationWithoutRemoteSite_defect46595() - throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - // reduce the batch-size so maximum number of batches will be sent - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 5, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 5, false, false, null, true )); - - startSenderInVMs("ln", vm4, vm5); - - vm4.invoke(createReplicatedRegionRunnable()); - vm5.invoke(createReplicatedRegionRunnable()); - vm6.invoke(createReplicatedRegionRunnable()); - vm7.invoke(createReplicatedRegionRunnable()); - - IgnoredException.addIgnoredException(BatchException70.class.getName()); - IgnoredException.addIgnoredException(ServerOperationException.class.getName()); - IgnoredException.addIgnoredException(IOException.class.getName()); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", - 10000 )); - - // pause for some time before starting up the remote site - Wait.pause(10000); - - createCacheInVMs(nyPort, vm2, vm3); - - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - - vm2.invoke(() -> WANTestBase.createReceiver()); - vm3.invoke(() -> WANTestBase.createReceiver()); - - vm4.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 10000 )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 10000 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 10000 )); - } - - @Test - public void testReplicatedSerialPropagation() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 10, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 10, false, false, null, true )); - - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5); - - vm4.invoke(createReplicatedRegionRunnable()); - vm5.invoke(createReplicatedRegionRunnable()); - vm6.invoke(createReplicatedRegionRunnable()); - vm7.invoke(createReplicatedRegionRunnable()); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", - 1000 )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - } - - @Test - public void testReplicatedSerialPropagationWithLocalSiteClosedAndRebuilt() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 10, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 10, false, false, null, true )); - - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5); - - vm4.invoke(createReplicatedRegionRunnable()); - vm5.invoke(createReplicatedRegionRunnable()); - vm6.invoke(createReplicatedRegionRunnable()); - vm7.invoke(createReplicatedRegionRunnable()); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", - 1000 )); - - //---------close local site and build again----------------------------------------- - vm4.invoke(() -> WANTestBase.killSender( )); - vm5.invoke(() -> WANTestBase.killSender( )); - vm6.invoke(() -> WANTestBase.killSender( )); - vm7.invoke(() -> WANTestBase.killSender( )); - - Integer regionSize = - (Integer) vm2.invoke(() -> WANTestBase.getRegionSize(getTestMethodName() + "_RR" )); - LogWriterUtils.getLogWriter().info("Region size on remote is: " + regionSize); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 10, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 10, false, false, null, true )); - - vm4.invoke(createReplicatedRegionRunnable()); - vm5.invoke(createReplicatedRegionRunnable()); - vm6.invoke(createReplicatedRegionRunnable()); - vm7.invoke(createReplicatedRegionRunnable()); - - startSenderInVMs("ln", vm4, vm5); - - vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - - IgnoredException.addIgnoredException(EntryExistsException.class.getName()); - IgnoredException.addIgnoredException(BatchException70.class.getName()); - IgnoredException.addIgnoredException(ServerOperationException.class.getName()); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", - 1000 )); - //---------------------------------------------------------------------------------- - - //verify remote site receives all the events - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - } - - /** - * Two regions configured with the same sender and put is in progress - * on both the regions. - * One of the two regions is destroyed in the middle. - */ - @Test - public void testReplicatedSerialPropagationWithLocalRegionDestroy() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - //these are part of remote site - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - //these are part of local site - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - //senders are created on local site - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 20, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 20, false, false, null, true )); - - //create one RR (RR_1) on remote site - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", null, isOffHeap() )); - - //create another RR (RR_2) on remote site - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_2", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_2", null, isOffHeap() )); - - //start the senders on local site - startSenderInVMs("ln", vm4, vm5); - - //create one RR (RR_1) on local site - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", "ln", isOffHeap() )); - - //create another RR (RR_2) on local site - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_2", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_2", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_2", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_2", "ln", isOffHeap() )); - - //start puts in RR_1 in another thread - AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_RR_1", 1000 )); - //do puts in RR_2 in main thread - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR_2", 500 )); - //destroy RR_2 after above puts are complete - vm4.invoke(() -> WANTestBase.destroyRegion( getTestMethodName() + "_RR_2")); - - inv1.join(); - - //vm4.invoke(() -> WANTestBase.verifyQueueSize( "ln", 0 )); - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR_1", 1000 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR_2", 500 )); - } - - /** - * 1 region and sender configured on local site and 1 region and a - * receiver configured on remote site. Puts to the local region are in progress. - * Remote region is destroyed in the middle. - */ - @Test - public void testReplicatedSerialPropagationWithRemoteRegionDestroy() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - //these are part of remote site - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - //these are part of local site - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - //senders are created on local site - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 500, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 500, false, false, null, true )); - - //create one RR (RR_1) on remote site - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", null, isOffHeap() )); - - //start the senders on local site - startSenderInVMs("ln", vm4, vm5); - - //create one RR (RR_1) on local site - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", "ln", isOffHeap() )); - - IgnoredException.addIgnoredException(BatchException70.class.getName()); - IgnoredException.addIgnoredException(ServerOperationException.class.getName()); - - //start puts in RR_1 in another thread - AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_RR_1", 1000 )); - //destroy RR_1 in remote site - vm2.invoke(() -> WANTestBase.destroyRegion( getTestMethodName() + "_RR_1")); - - inv1.join(); - - //verify that all is well in local site. All the events should be present in local region - vm4.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR_1", 1000 )); - //assuming some events might have been dispatched before the remote region was destroyed, - //sender's region queue will have events less than 1000 but the queue will not be empty. - //NOTE: this much verification might be sufficient in DUnit. Hydra will take care of - //more in depth validations. - vm4.invoke(() -> WANTestBase.verifyRegionQueueNotEmpty("ln" )); - } - - /** - * Two regions configured in local with the same sender and put is in progress - * on both the regions. Same two regions are configured on remote site as well. - * One of the two regions is destroyed in the middle on remote site. - */ - @Test - public void testReplicatedSerialPropagationWithRemoteRegionDestroy2() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - //these are part of remote site - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - //these are part of local site - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - //senders are created on local site - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 200, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 200, false, false, null, true )); - - //create one RR (RR_1) on remote site - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", null, isOffHeap() )); - - //create another RR (RR_2) on remote site - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_2", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_2", null, isOffHeap() )); - - //start the senders on local site - startSenderInVMs("ln", vm4, vm5); - - //create one RR (RR_1) on local site - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", "ln", isOffHeap() )); - - //create another RR (RR_2) on local site - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_2", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_2", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_2", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_2", "ln", isOffHeap() )); - //destroy RR_2 on remote site in the middle - vm2.invoke(() -> WANTestBase.destroyRegion( getTestMethodName() + "_RR_2")); - - //expected exceptions in the logs - IgnoredException.addIgnoredException(BatchException70.class.getName()); - IgnoredException.addIgnoredException(ServerOperationException.class.getName()); - - //start puts in RR_2 in another thread - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR_2", 1000 )); - - //start puts in RR_1 in another thread - AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_RR_1", 1000 )); - - inv1.join(); - - //though region RR_2 is destroyed, RR_1 should still get all the events put in it - //in local site - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR_1", 1000 )); - } - - @Test - public void testReplicatedSerialPropagationWithRemoteRegionDestroy3() - throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - // these are part of remote site - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - // these are part of local site - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - // senders are created on local site - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 200, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 200, false, false, null, true )); - - // create one RR (RR_1) on remote site - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", null, isOffHeap() )); - - // create another RR (RR_2) on remote site - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_2", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_2", null, isOffHeap() )); - - // start the senders on local site - startSenderInVMs("ln", vm4, vm5); - - // create one RR (RR_1) on local site - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", "ln", isOffHeap() )); - - // create another RR (RR_2) on local site - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_2", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_2", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_2", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_2", "ln", isOffHeap() )); - - IgnoredException.addIgnoredException(BatchException70.class.getName()); - IgnoredException.addIgnoredException(ServerOperationException.class.getName()); - - // start puts in RR_1 in another thread - AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_RR_1", 1000 )); - // start puts in RR_2 in another thread - AsyncInvocation inv2 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_RR_2", 1000 )); - // destroy RR_2 on remote site in the middle - vm2.invoke(() -> WANTestBase.destroyRegion( getTestMethodName() - + "_RR_2" )); - - inv1.join(); - inv2.join(); - - // though region RR_2 is destroyed, RR_1 should still get all the events put - // in it - // in local site - try { - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR_1", 1000 )); - } finally { - System.setProperty( - DistributionConfig.GEMFIRE_PREFIX + "GatewaySender.REMOVE_FROM_QUEUE_ON_EXCEPTION", "False"); - vm4.invoke(new CacheSerializableRunnable("UnSetting system property ") { - public void run2() throws CacheException { - System.setProperty( - DistributionConfig.GEMFIRE_PREFIX + "GatewaySender.REMOVE_FROM_QUEUE_ON_EXCEPTION", "False"); - } - }); - - vm5.invoke(new CacheSerializableRunnable("UnSetting system property ") { - public void run2() throws CacheException { - System.setProperty( - DistributionConfig.GEMFIRE_PREFIX + "GatewaySender.REMOVE_FROM_QUEUE_ON_EXCEPTION", "False"); - } - }); - } - } - - /** - * one region and sender configured on local site and the same region and a - * receiver configured on remote site. Puts to the local region are in progress. - * Receiver on remote site is stopped in the middle by closing remote site cache. - */ - @Test - public void testReplicatedSerialPropagationWithRemoteReceiverStopped() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - //these are part of remote site - createCacheInVMs(nyPort, vm2); - vm2.invoke(() -> WANTestBase.createReceiver()); - - //these are part of local site - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - //senders are created on local site. Batch size is kept to a high (170) so - //there will be less number of exceptions (occur during dispatchBatch) in the log - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 350, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 350, false, false, null, true )); - - //create one RR (RR_1) on remote site - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", null, isOffHeap() )); - - //start the senders on local site - startSenderInVMs("ln", vm4, vm5); - - //create one RR (RR_1) on local site - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", "ln", isOffHeap() )); - - //start puts in RR_1 in another thread - AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_RR_1", 500 )); - //close cache in remote site. This will automatically kill the remote receivers. - vm2.invoke(() -> WANTestBase.closeCache()); - - inv1.join(); - - //verify that all is well in local site - vm4.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR_1", 500 )); - vm4.invoke(() -> WANTestBase.verifyRegionQueueNotEmpty("ln" )); - } - - @Test - public void testReplicatedSerialPropagationWithRemoteReceiverRestarted() - throws Exception { - Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - // these are part of remote site - vm2.invoke(() -> WANTestBase.createCache( nyPort )); - vm2.invoke(() -> WANTestBase.createReceiver()); - - // these are part of local site - createCacheInVMs(lnPort, vm4, vm5); - - // senders are created on local site. Batch size is kept to a high (170) so - // there will be less number of exceptions (occur during dispatchBatch) in - // the log - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 350, false, false, null, true )); - - // create one RR (RR_1) on remote site - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", null, isOffHeap() )); - - // start the senders on local site - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - - // create one RR (RR_1) on local site - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", "ln", isOffHeap() )); - - - // start puts in RR_1 in another thread - AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_RR_1", 8000 )); - // close cache in remote site. This will automatically kill the remote - // receivers. - Wait.pause(2000); - vm2.invoke(() -> WANTestBase.closeCache()); - - inv1.join(); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR_1", 1000 )); - - // verify that all is well in local site - vm4.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR_1", 8000 )); - - createCacheInVMs(nyPort, vm2); - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", null, isOffHeap() )); - vm2.invoke(() -> WANTestBase.createReceiver()); - - vm4.invoke(() -> WANTestBase.validateQueueContents( "ln", - 0 )); - - vm2.invoke(() -> WANTestBase.checkMinimumGatewayReceiverStats( 1, 1 )); - } - - @Test - public void testReplicatedSerialPropagationWithRemoteReceiverRestarted_SenderReceiverPersistent() - throws Exception { - Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - // these are part of remote site - createCacheInVMs(nyPort, vm2); - vm2.invoke(() -> WANTestBase.createReceiver()); - - // these are part of local site - createCacheInVMs(lnPort, vm4, vm5); - - // senders are created on local site. Batch size is kept to a high (170) so - // there will be less number of exceptions (occur during dispatchBatch) in - // the log - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, false, 100, 350, false, true, null, true )); - vm5.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 350, false, true, null, true)); - - // create one RR (RR_1) on remote site - vm2.invoke(() -> WANTestBase.createPersistentReplicatedRegion(getTestMethodName() + "_RR_1", null, isOffHeap())); - vm2.invoke(() -> WANTestBase.addListenerToSleepAfterCreateEvent(2000, getTestMethodName() + "_RR_1")); - // start the senders on local site - startSenderInVMs("ln", vm4, vm5); - - // create one RR (RR_1) on local site - vm4.invoke(() -> WANTestBase.createReplicatedRegion(getTestMethodName() + "_RR_1", "ln", isOffHeap())); - - // start puts in RR_1 in another thread - AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_RR_1", 8000 )); - // close cache in remote site. This will automatically kill the remote - // receivers. - vm2.invoke(() -> WANTestBase.closeCache()); - - inv1.join(); - - // verify that all is well in local site - vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR_1", 8000)); - - vm4.invoke(() -> WANTestBase.verifyRegionQueueNotEmpty( "ln" )); - - createCacheInVMs(nyPort, vm2); - vm2.invoke(() -> WANTestBase.createPersistentReplicatedRegion(getTestMethodName() + "_RR_1", null, isOffHeap())); - vm2.invoke(() -> WANTestBase.createReceiver()); - - vm4.invoke(() -> WANTestBase.validateQueueContents("ln", 0)); - - vm2.invoke(() -> WANTestBase.checkMinimumGatewayReceiverStats(1, 1)); - - vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR_1", 8000)); - } - - @Test - public void testReplicatedSerialPropagationWithRemoteSiteBouncedBack_ReceiverPersistent() - throws Exception { - Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - // these are part of remote site - createCacheInVMs(nyPort, vm2); - vm2.invoke(() -> WANTestBase.createReceiver()); - - // these are part of local site - createCacheInVMs(lnPort, vm4, vm5); - - // senders are created on local site. Batch size is kept to a high (170) so - // there will be less number of exceptions (occur during dispatchBatch) in - // the log - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 350, false, false, null, true )); - - // create one RR (RR_1) on remote site - vm2.invoke(() -> WANTestBase.createPersistentReplicatedRegion( - getTestMethodName() + "_RR_1", null, isOffHeap() )); - - // start the senders on local site - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - - // create one RR (RR_1) on local site - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", "ln", isOffHeap() )); - - // start puts in RR_1 in another thread - AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_RR_1", 8000 )); - // close cache in remote site. This will automatically kill the remote - // receivers. - Wait.pause(2000); - vm1.invoke(() -> WANTestBase.shutdownLocator()); - vm2.invoke(() -> WANTestBase.closeCache()); - - inv1.join(); - - // Do some extra puts after cache close so that some events are in the queue. - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR_1", 1000 )); - - // verify that all is well in local site - vm4.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR_1", 8000 )); - - vm4.invoke(() -> WANTestBase.verifyRegionQueueNotEmpty( "ln" )); - - vm1.invoke(() -> WANTestBase.bringBackLocatorOnOldPort( - 2, lnPort, nyPort )); - - createCacheInVMs(nyPort, vm2); - - vm2.invoke(() -> WANTestBase.createPersistentReplicatedRegion( - getTestMethodName() + "_RR_1", null, isOffHeap() )); - - vm2.invoke(() -> WANTestBase.createReceiver()); - - vm4.invoke(() -> WANTestBase.validateQueueContents( "ln", - 0 )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR_1", 8000 )); - - vm2.invoke(() -> WANTestBase.checkMinimumGatewayReceiverStats( 1, 1 )); - } - - @Test - public void testReplicatedSerialPropagationWithRemoteSiteBouncedBackWithMultipleRemoteLocators() - throws Exception { - Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort1 = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - Integer nyPort2 = (Integer) vm3.invoke(() -> WANTestBase.createSecondRemoteLocator( 2, nyPort1, lnPort )); - - // these are part of remote site - createCacheInVMs(nyPort1, vm2); - vm2.invoke(() -> WANTestBase.createReceiver()); - - // these are part of local site - createCacheInVMs(lnPort, vm4, vm5); - - // senders are created on local site. Batch size is kept to a high (170) so - // there will be less number of exceptions (occur during dispatchBatch) in - // the log - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 350, false, false, null, true )); - - // create one RR (RR_1) on remote site - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", null, isOffHeap() )); - - // start the senders on local site - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - - // create one RR (RR_1) on local site - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", "ln", isOffHeap() )); - - // start puts in RR_1 in another thread - AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_RR_1", 8000 )); - // close cache in remote site. This will automatically kill the remote - // receivers. - Wait.pause(2000); - vm1.invoke(() -> WANTestBase.shutdownLocator()); - vm2.invoke(() -> WANTestBase.closeCache()); - - inv1.join(); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR_1", 1000 )); - // verify that all is well in local site - vm4.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR_1", 8000 )); - - vm4.invoke(() -> WANTestBase.verifyRegionQueueNotEmpty( "ln" )); - - createCacheInVMs(nyPort2, vm6); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", null, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReceiver()); - - vm4.invoke(() -> WANTestBase.validateQueueContents( "ln", - 0 )); - - vm6.invoke(() -> WANTestBase.checkMinimumGatewayReceiverStats( 1, 1 )); - } - - @Test - public void testReplicatedSerialPropagationWithRemoteReceiverRestartedOnOtherNode() throws Exception { - Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - // these are part of remote site - createCacheInVMs(nyPort, vm2, vm3); - - // these are part of local site - createCacheInVMs(lnPort, vm4); - - // senders are created on local site. Batch size is kept to a high (170) so - // there will be less number of exceptions (occur during dispatchBatch) in - // the log - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, false, 100, 350, false, false, null, true )); - - // create one RR (RR_1) on remote site - vm2.invoke(() -> WANTestBase.createPersistentReplicatedRegion(getTestMethodName() + "_RR_1", null, isOffHeap())); - vm2.invoke(() -> WANTestBase.createReceiver()); - - vm3.invoke(() -> WANTestBase.createPersistentReplicatedRegion(getTestMethodName() + "_RR_1", null, isOffHeap())); - - vm2.invoke(() -> addListenerToSleepAfterCreateEvent(2000, getTestMethodName() + "_RR_1")); - vm3.invoke(() -> addListenerToSleepAfterCreateEvent(2000, getTestMethodName() + "_RR_1")); - - // create one RR (RR_1) on local site - vm4.invoke(() -> WANTestBase.createReplicatedRegion(getTestMethodName() + "_RR_1", "ln", isOffHeap())); - // start the senders on local site - vm4.invoke(() -> WANTestBase.startSender("ln")); - - // start puts in RR_1 in another thread - AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_RR_1", 8000 )); - // close cache in remote site. This will automatically kill the remote - // receivers. - vm2.invoke(() -> WANTestBase.closeCache()); - vm3.invoke(() -> WANTestBase.closeCache()); - - inv1.join(); - - // verify that all is well in local site - vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR_1", 8000)); - - vm4.invoke(() -> WANTestBase.verifyRegionQueueNotEmpty( "ln" )); - - createCacheInVMs(nyPort, vm3); - vm3.invoke(() -> WANTestBase.createPersistentReplicatedRegion(getTestMethodName() + "_RR_1", null, isOffHeap())); - vm3.invoke(() -> WANTestBase.createReceiver()); - - vm4.invoke(() -> WANTestBase.validateQueueContents("ln", 0)); - - vm3.invoke(() -> WANTestBase.checkMinimumGatewayReceiverStats( 1, 1 )); - vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR_1", 8000)); - } - - @Test - public void testReplicatedSerialPropagationToTwoWanSites() throws Exception { - Integer lnPort = createFirstLocatorWithDSId(1); - Integer nyPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - Integer tkPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 3, lnPort )); - - createCacheInVMs(nyPort, vm2); - vm2.invoke(() -> WANTestBase.createReceiver()); - createCacheInVMs(tkPort, vm3); - vm3.invoke(() -> WANTestBase.createReceiver()); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createSender( "lnSerial1", - 2, false, 100, 10, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "lnSerial1", - 2, false, 100, 10, false, false, null, true )); - - vm4.invoke(() -> WANTestBase.createSender( "lnSerial2", - 3, false, 100, 10, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "lnSerial2", - 3, false, 100, 10, false, false, null, true )); - - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - - startSenderInVMs("lnSerial1", vm4, vm5); - startSenderInVMs("lnSerial2", vm4, vm5); - - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "lnSerial1,lnSerial2", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "lnSerial1,lnSerial2", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "lnSerial1,lnSerial2", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "lnSerial1,lnSerial2", isOffHeap() )); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", - 1000 )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - } - - @Test - public void testReplicatedSerialPropagationHA() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 10, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 10, false, false, null, true )); - - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5); - - vm4.invoke(createReplicatedRegionRunnable()); - vm5.invoke(createReplicatedRegionRunnable()); - vm6.invoke(createReplicatedRegionRunnable()); - vm7.invoke(createReplicatedRegionRunnable()); - - AsyncInvocation inv1 = vm5.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", 10000 )); - Wait.pause(2000); - AsyncInvocation inv2 = vm4.invokeAsync(() -> WANTestBase.killSender()); - - inv1.join(); - inv2.join(); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 10000 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 10000 )); - } - - /** - * Local site:: vm4: Primary vm5: Secondary - * - * Remote site:: vm2, vm3, vm6, vm7: All hosting receivers - * - * vm4 is killed, so vm5 takes primary charge - * - * SUR: disabling due to connection information not available in open source - * enable this once in closed source - */ - @Ignore("TODO: test is disabled") - @Test - public void testReplicatedSerialPropagationHA_ReceiverAffinity() - throws Exception { - - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3, vm6, vm7); - createReceiverInVMs(vm2, vm3, vm6, vm7); - - LogWriterUtils.getLogWriter().info("Started receivers on remote site"); - - WANTestBase.createCacheInVMs(lnPort, vm4, vm5); - - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 10, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 10, false, false, null, true )); - - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5); - - LogWriterUtils.getLogWriter().info("Started senders on local site"); - - vm4.invoke(createReplicatedRegionRunnable()); - vm5.invoke(createReplicatedRegionRunnable()); - - AsyncInvocation inv1 = vm5.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", 10000 )); - LogWriterUtils.getLogWriter().info("Started async puts on local site"); - Wait.pause(1000); - - Map oldConnectionInfo = (Map)vm4.invoke(() -> WANTestBase.getSenderToReceiverConnectionInfo( "ln" )); - assertNotNull(oldConnectionInfo); - String oldServerHost = (String)oldConnectionInfo.get("serverHost"); - int oldServerPort = (Integer)oldConnectionInfo.get("serverPort"); - LogWriterUtils.getLogWriter().info("Got sender to receiver connection information"); - - AsyncInvocation inv2 = vm4.invokeAsync(() -> WANTestBase.killSender()); - inv2.join(); - LogWriterUtils.getLogWriter().info("Killed primary sender on local site"); - Wait.pause(5000);// give some time for vm5 to take primary charge - - Map newConnectionInfo = (Map)vm5.invoke(() -> WANTestBase.getSenderToReceiverConnectionInfo( "ln" )); - assertNotNull(newConnectionInfo); - String newServerHost = (String)newConnectionInfo.get("serverHost"); - int newServerPort = (Integer)newConnectionInfo.get("serverPort"); - LogWriterUtils.getLogWriter().info("Got new sender to receiver connection information"); - assertEquals(oldServerHost, newServerHost); - assertEquals(oldServerPort, newServerPort); - - LogWriterUtils.getLogWriter() - .info( - "Matched the new connection info with old connection info. Receiver affinity verified."); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 10000 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 10000 )); - } - - /** - * Local site:: vm4: Primary vm5: Secondary - * - * Remote site:: vm2, vm3, vm6, vm7: All hosting receivers - * - * vm4 is killed, so vm5 takes primary charge. vm4 brought up. vm5 is killed, - * so vm4 takes primary charge again. - * - * SUR: commenting due to connection information not available in open source - * enable this once in closed source - */ - @Ignore("TODO: test is disabled") - @Test - public void testReplicatedSerialPropagationHA_ReceiverAffinityScenario2() - throws Exception { - - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3, vm6, vm7); - createReceiverInVMs(vm2, vm3, vm6, vm7); - - LogWriterUtils.getLogWriter().info("Started receivers on remote site"); - - createCacheInVMs(lnPort, vm4, vm5); - - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 10, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 10, false, false, null, true )); - - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5); - - LogWriterUtils.getLogWriter().info("Started senders on local site"); - - vm4.invoke(createReplicatedRegionRunnable()); - vm5.invoke(createReplicatedRegionRunnable()); - - AsyncInvocation inv1 = vm5.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", 10000 )); - LogWriterUtils.getLogWriter().info("Started async puts on local site"); - Wait.pause(1000); - - Map oldConnectionInfo = (Map)vm4.invoke(() -> WANTestBase.getSenderToReceiverConnectionInfo( "ln" )); - assertNotNull(oldConnectionInfo); - String oldServerHost = (String)oldConnectionInfo.get("serverHost"); - int oldServerPort = (Integer)oldConnectionInfo.get("serverPort"); - LogWriterUtils.getLogWriter().info("Got sender to receiver connection information"); - - // ---------------------------- KILL vm4 - // -------------------------------------- - AsyncInvocation inv2 = vm4.invokeAsync(() -> WANTestBase.killSender()); - inv2.join(); - LogWriterUtils.getLogWriter().info("Killed vm4 (primary sender) on local site"); - // ----------------------------------------------------------------------------- - - vm5.invoke(() -> WANTestBase.waitForSenderToBecomePrimary( "ln" )); - LogWriterUtils.getLogWriter().info("vm5 sender has now acquired primary status"); - Wait.pause(5000);// give time to process unprocessedEventsMap - - // ---------------------------REBUILD vm4 - // -------------------------------------- - LogWriterUtils.getLogWriter().info("Rebuilding vm4...."); - createCacheInVMs(lnPort, vm4); - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 10, false, false, null, true )); - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - vm4.invoke(createReplicatedRegionRunnable()); - LogWriterUtils.getLogWriter().info("Rebuilt vm4"); - // ----------------------------------------------------------------------------- - - // --------------------------- KILL vm5 - // ---------------------------------------- - inv1.join();// once the puts are done, kill vm5 - LogWriterUtils.getLogWriter().info("puts in vm5 are done"); - - inv2 = vm5.invokeAsync(() -> WANTestBase.killSender()); - inv2.join(); - vm4.invoke(() -> WANTestBase.waitForSenderToBecomePrimary( "ln" )); - // ----------------------------------------------------------------------------- - - Map newConnectionInfo = (Map)vm4.invoke(() -> WANTestBase.getSenderToReceiverConnectionInfo( "ln" )); - assertNotNull(newConnectionInfo); - String newServerHost = (String)newConnectionInfo.get("serverHost"); - int newServerPort = (Integer)newConnectionInfo.get("serverPort"); - LogWriterUtils.getLogWriter().info("Got new sender to receiver connection information"); - assertEquals(oldServerHost, newServerHost); - assertEquals(oldServerPort, newServerPort); - LogWriterUtils.getLogWriter() - .info( - "Matched the new connection info with old connection info. Receiver affinity verified."); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 10000 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 10000 )); - } - - @Test - public void testNormalRegionSerialPropagation() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2); - vm2.invoke(() -> WANTestBase.createReceiver()); - - createCacheInVMs(lnPort, vm4, vm5); - - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 10, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 10, false, false, null, true )); - - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - vm5.invoke(() -> WANTestBase.startSender( "ln" )); - - vm4.invoke(() -> WANTestBase.createNormalRegion( - getTestMethodName() + "_RR", "ln" )); - vm5.invoke(() -> WANTestBase.createNormalRegion( - getTestMethodName() + "_RR", "ln" )); - - vm5.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", - 1000 )); - - vm4.invoke(() -> WANTestBase.checkQueueStats( "ln", 0, - 0, 0, 0)); - - vm5.invoke(() -> WANTestBase.checkQueueStats( "ln", 0, - 1000, 0, 0 )); - - vm5.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000)); - - vm4.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 0)); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 0 )); - - vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(0, 0, 0)); - } - - /** - * Added for defect #48582 NPE when WAN sender configured but not started. - * All to all topology with 2 WAN sites: - * Site 1 (LN site): vm4, vm5, vm6, vm7 - * Site 2 (NY site): vm2, vm3 - */ - @Test - public void testReplicatedSerialPropagationWithRemoteSenderConfiguredButNotStarted() { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - createReceiverInVMs(vm4, vm5); - - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 10, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 10, false, false, null, true )); - - startSenderInVMs("ln", vm4, vm5); - - vm4.invoke(createReplicatedRegionRunnable()); - vm5.invoke(createReplicatedRegionRunnable()); - vm6.invoke(createReplicatedRegionRunnable()); - vm7.invoke(createReplicatedRegionRunnable()); - - vm2.invoke(() -> WANTestBase.createSender( "ny", 1, - false, 100, 10, false, false, null, true )); - vm3.invoke(() -> WANTestBase.createSender( "ny", 1, - false, 100, 10, false, false, null, true )); - - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ny", isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ny", isOffHeap() )); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", - 1000 )); - - startSenderInVMs("ny", vm2, vm3); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropagationLoopBackDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropagationLoopBackDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropagationLoopBackDUnitTest.java deleted file mode 100644 index 6859249..0000000 --- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropagationLoopBackDUnitTest.java +++ /dev/null @@ -1,513 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.wan.serial; - -import org.junit.experimental.categories.Category; -import org.junit.Test; - -import static org.junit.Assert.*; - -import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase; -import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase; -import com.gemstone.gemfire.test.junit.categories.DistributedTest; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.junit.experimental.categories.Category; - -import com.gemstone.gemfire.internal.cache.wan.WANTestBase; -import com.gemstone.gemfire.test.dunit.Wait; -import com.gemstone.gemfire.test.junit.categories.FlakyTest; - -@Category(DistributedTest.class) -public class SerialWANPropagationLoopBackDUnitTest extends WANTestBase { - - private static final long serialVersionUID = 1L; - - public SerialWANPropagationLoopBackDUnitTest() { - super(); - } - - @Test - public void testReplicatedSerialPropagationLoopBack() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - vm2.invoke(() -> WANTestBase.createCache( lnPort )); - vm3.invoke(() -> WANTestBase.createCache( nyPort )); - vm2.invoke(() -> WANTestBase.createReceiver()); - vm3.invoke(() -> WANTestBase.createReceiver()); - - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - vm6.invoke(() -> WANTestBase.createCache( nyPort )); - vm7.invoke(() -> WANTestBase.createCache( nyPort )); - - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 10, false, false, null, true )); - vm6.invoke(() -> WANTestBase.createSender( "ny", 1, - false, 100, 10, false, false, null, true )); - - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ny", isOffHeap() )); - - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - vm6.invoke(() -> WANTestBase.startSender( "ny" )); - - vm4.invoke(() -> WANTestBase.addQueueListener( "ln", - false )); - vm6.invoke(() -> WANTestBase.addQueueListener( "ny", - false )); - - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap())); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ny", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ny", isOffHeap())); - - final Map keyValues = new HashMap(); - for(int i=0; i< 1; i++) { - keyValues.put(i, i); - } - vm4.invoke(() -> WANTestBase.putGivenKeyValue( getTestMethodName() + "_RR", - keyValues )); - - keyValues.clear(); - for(int i=1; i< 2; i++) { - keyValues.put(i, i); - } - vm6.invoke(() -> WANTestBase.putGivenKeyValue( getTestMethodName() + "_RR", - keyValues )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 2 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 2 )); - - vm4.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 2 )); - vm5.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 2 )); - vm6.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 2 )); - vm7.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 2 )); - - - Wait.pause(5000); - vm4.invoke(() -> WANTestBase.verifyQueueSize( "ln", 0 )); - vm6.invoke(() -> WANTestBase.verifyQueueSize( "ny", 0 )); - - Map queueMap1 = (HashMap)vm4.invoke(() -> WANTestBase.checkQueue()); - Map queueMap2 = (HashMap)vm6.invoke(() -> WANTestBase.checkQueue()); - - List createList1 = (List)queueMap1.get("Create"); - List updateList1 = (List)queueMap1.get("Update"); - List createList2 = (List)queueMap2.get("Create"); - List updateList2 = (List)queueMap2.get("Update"); - - assertEquals(0, updateList1.size()); - assertEquals(0, updateList2.size()); - - assertEquals(1, createList1.size()); - assertEquals(1, createList2.size()); - } - - @Test - public void testReplicatedSerialPropagationLoopBack3SitesLoop() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - Integer tkPort = (Integer)vm2.invoke(() -> WANTestBase.createFirstRemoteLocator( 3, lnPort )); - - createCacheInVMs(lnPort, vm3, vm6); - createCacheInVMs(nyPort, vm4, vm7); - createCacheInVMs(tkPort, vm5); - - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap())); - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ny", isOffHeap())); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "tk", isOffHeap())); - - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ny", isOffHeap() )); - - vm3.invoke(() -> WANTestBase.createReceiver()); - vm4.invoke(() -> WANTestBase.createReceiver()); - vm5.invoke(() -> WANTestBase.createReceiver()); - - vm6.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 10, false, false, null, true )); - vm7.invoke(() -> WANTestBase.createSender( "ny", 3, - false, 100, 10, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "tk", 1, - false, 100, 10, false, false, null, true )); - - vm6.invoke(() -> WANTestBase.startSender( "ln" )); - vm7.invoke(() -> WANTestBase.startSender( "ny" )); - vm5.invoke(() -> WANTestBase.startSender( "tk" )); - - // using vm5 for sender in ds 3. cache is already created. - vm6.invoke(() -> WANTestBase.addQueueListener( "ln", - false )); - vm7.invoke(() -> WANTestBase.addQueueListener( "ny", - false )); - vm5.invoke(() -> WANTestBase.addQueueListener( "tk", - false )); - - int totalSize = 3; - int increment = 1; - - final Map keyValues = new HashMap(); - for(int i=0; i< increment; i++) { - keyValues.put(i, i); - } - vm3.invoke(() -> WANTestBase.putGivenKeyValue( getTestMethodName() + "_RR", - keyValues )); - - keyValues.clear(); - for(int i = increment; i< 2*increment; i++) { - keyValues.put(i, i); - } - vm4.invoke(() -> WANTestBase.putGivenKeyValue( getTestMethodName() + "_RR", - keyValues )); - - keyValues.clear(); - for(int i=2*increment; i< totalSize; i++) { - keyValues.put(i, i); - } - vm5.invoke(() -> WANTestBase.putGivenKeyValue( getTestMethodName() + "_RR", - keyValues )); - - - vm5.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", totalSize )); - vm6.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", totalSize )); - vm7.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", totalSize )); - vm4.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", totalSize )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", totalSize )); - - - vm6.invoke(() -> WANTestBase.verifyQueueSize( "ln", 0 )); - vm7.invoke(() -> WANTestBase.verifyQueueSize( "ny", 0 )); - vm5.invoke(() -> WANTestBase.verifyQueueSize( "tk", 0 )); - - - - Map queueMap1 = (HashMap)vm6.invoke(() -> WANTestBase.checkQueue()); - Map queueMap2 = (HashMap)vm7.invoke(() -> WANTestBase.checkQueue()); - Map queueMap3 = (HashMap)vm5.invoke(() -> WANTestBase.checkQueue()); - - List createList1 = (List)queueMap1.get("Create"); - List updateList1 = (List)queueMap1.get("Update"); - List createList2 = (List)queueMap2.get("Create"); - List updateList2 = (List)queueMap2.get("Update"); - List createList3 = (List)queueMap3.get("Create"); - List updateList3 = (List)queueMap3.get("Update"); - - assertEquals(0, updateList1.size()); - assertEquals(0, updateList2.size()); - assertEquals(0, updateList3.size()); - - assertEquals(2, createList1.size()); - assertEquals(2, createList2.size()); - assertEquals(2, createList3.size()); - } - - - @Test - public void testReplicatedSerialPropagationLoopBack3SitesNtoNPutInAllDS() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - Integer tkPort = (Integer)vm2.invoke(() -> WANTestBase.createFirstRemoteLocator( 3, lnPort )); - - createCacheInVMs(lnPort, vm3, vm6); - createCacheInVMs(nyPort, vm4, vm7); - createCacheInVMs(tkPort, vm5); - - vm3.invoke(() -> WANTestBase.createReceiver()); - vm4.invoke(() -> WANTestBase.createReceiver()); - vm5.invoke(() -> WANTestBase.createReceiver()); - - // using vm5 for sender in ds 3. cache is already created. - - vm6.invoke(() -> WANTestBase.createSender( "ln1", 2, - false, 100, 10, false, false, null, true )); - vm7.invoke(() -> WANTestBase.createSender( "ny1", 3, - false, 100, 10, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "tk1", 1, - false, 100, 10, false, false, null, true )); - - vm6.invoke(() -> WANTestBase.createSender( "ln2", 3, - false, 100, 10, false, false, null, true )); - vm7.invoke(() -> WANTestBase.createSender( "ny2", 1, - false, 100, 10, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "tk2", 2, - false, 100, 10, false, false, null, true )); - - - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln1,ln2", isOffHeap())); - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ny1,ny2", isOffHeap())); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "tk1,tk2", isOffHeap() )); - - vm6.invoke(() -> WANTestBase.startSender( "ln1" )); - vm7.invoke(() -> WANTestBase.startSender( "ny1" )); - vm5.invoke(() -> WANTestBase.startSender( "tk1" )); - - vm6.invoke(() -> WANTestBase.startSender( "ln2" )); - vm7.invoke(() -> WANTestBase.startSender( "ny2" )); - vm5.invoke(() -> WANTestBase.startSender( "tk2" )); - - - vm6.invoke(() -> WANTestBase.addQueueListener( "ln1", - false )); - vm7.invoke(() -> WANTestBase.addQueueListener( "ny1", - false )); - vm5.invoke(() -> WANTestBase.addQueueListener( "tk1", - false )); - vm6.invoke(() -> WANTestBase.addSecondQueueListener( "ln2", - false )); - vm7.invoke(() -> WANTestBase.addSecondQueueListener( "ny2", - false )); - vm5.invoke(() -> WANTestBase.addSecondQueueListener( "tk2", - false )); - - - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln1,ln2", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ny1,ny2", isOffHeap() )); - - final Map keyValues = new HashMap(); - for(int i=0; i< 1; i++) { - keyValues.put(i, i); - } - vm3.invoke(() -> WANTestBase.putGivenKeyValue( getTestMethodName() + "_RR", - keyValues )); - - keyValues.clear(); - for(int i=1; i< 2; i++) { - keyValues.put(i, i); - } - vm4.invoke(() -> WANTestBase.putGivenKeyValue( getTestMethodName() + "_RR", - keyValues )); - - keyValues.clear(); - for(int i=2; i< 3; i++) { - - keyValues.put(i, i); - } - vm5.invoke(() -> WANTestBase.putGivenKeyValue( getTestMethodName() + "_RR", - keyValues )); - - Wait.pause(2000); - vm5.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 3 )); - vm6.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 3 )); - vm7.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 3 )); - - Wait.pause(5000); - vm6.invoke(() -> WANTestBase.verifyQueueSize( "ln1", 0 )); - vm7.invoke(() -> WANTestBase.verifyQueueSize( "ny1", 0 )); - vm5.invoke(() -> WANTestBase.verifyQueueSize( "tk1", 0 )); - vm6.invoke(() -> WANTestBase.verifyQueueSize( "ln2", 0 )); - vm7.invoke(() -> WANTestBase.verifyQueueSize( "ny2", 0 )); - vm5.invoke(() -> WANTestBase.verifyQueueSize( "tk2", 0 )); - - - Map queueMap1 = (HashMap)vm6.invoke(() -> WANTestBase.checkQueue()); - Map queueMap2 = (HashMap)vm7.invoke(() -> WANTestBase.checkQueue()); - Map queueMap3 = (HashMap)vm5.invoke(() -> WANTestBase.checkQueue()); - Map queueMap4 = (HashMap)vm6.invoke(() -> WANTestBase.checkQueue2()); - Map queueMap5 = (HashMap)vm7.invoke(() -> WANTestBase.checkQueue2()); - Map queueMap6 = (HashMap)vm5.invoke(() -> WANTestBase.checkQueue2()); - - List createList1 = (List)queueMap1.get("Create"); - List updateList1 = (List)queueMap1.get("Update"); - List createList2 = (List)queueMap2.get("Create"); - List updateList2 = (List)queueMap2.get("Update"); - List createList3 = (List)queueMap3.get("Create"); - List updateList3 = (List)queueMap3.get("Update"); - - List createList4 = (List)queueMap4.get("Create"); - List updateList4 = (List)queueMap4.get("Update"); - - List createList5 = (List)queueMap5.get("Create"); - List updateList5 = (List)queueMap5.get("Update"); - - List createList6 = (List)queueMap6.get("Create"); - List updateList6 = (List)queueMap6.get("Update"); - - - assertEquals(0, updateList1.size()); - assertEquals(0, updateList2.size()); - assertEquals(0, updateList3.size()); - assertEquals(0, updateList4.size()); - assertEquals(0, updateList5.size()); - assertEquals(0, updateList6.size()); - - assertEquals(1, createList1.size()); - assertEquals(1, createList2.size()); - assertEquals(1, createList3.size()); - assertEquals(1, createList4.size()); - assertEquals(1, createList5.size()); - assertEquals(1, createList6.size()); - } - - - @Test - public void testReplicatedSerialPropagationLoopBack3SitesNtoNPutFromOneDS() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - Integer tkPort = (Integer)vm2.invoke(() -> WANTestBase.createFirstRemoteLocator( 3, lnPort )); - - vm3.invoke(() -> WANTestBase.createCache( lnPort )); - vm4.invoke(() -> WANTestBase.createCache( nyPort )); - vm5.invoke(() -> WANTestBase.createCache( tkPort )); - vm3.invoke(() -> WANTestBase.createReceiver()); - vm4.invoke(() -> WANTestBase.createReceiver()); - vm5.invoke(() -> WANTestBase.createReceiver()); - - - vm3.invoke(() -> WANTestBase.createSender( "ln1", 2, - false, 100, 10, false, false, null, true )); - vm3.invoke(() -> WANTestBase.createSender( "ln2", 3, - false, 100, 10, false, false, null, true )); - - vm4.invoke(() -> WANTestBase.createSender( "ny1", 3, - false, 100, 10, false, false, null, true )); - vm4.invoke(() -> WANTestBase.createSender( "ny2", 1, - false, 100, 10, false, false, null, true )); - - vm5.invoke(() -> WANTestBase.createSender( "tk1", 1, - false, 100, 10, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "tk2", 2, - false, 100, 10, false, false, null, true )); - - - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln1,ln2", isOffHeap() )); - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ny1,ny2", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "tk1,tk2", isOffHeap() )); - - - vm3.invoke(() -> WANTestBase.startSender( "ln1" )); - vm4.invoke(() -> WANTestBase.startSender( "ny1" )); - vm5.invoke(() -> WANTestBase.startSender( "tk1" )); - - vm3.invoke(() -> WANTestBase.startSender( "ln2" )); - vm4.invoke(() -> WANTestBase.startSender( "ny2" )); - vm5.invoke(() -> WANTestBase.startSender( "tk2" )); - - - vm3.invoke(() -> WANTestBase.addQueueListener( "ln1", - false )); - vm4.invoke(() -> WANTestBase.addQueueListener( "ny1", - false )); - vm5.invoke(() -> WANTestBase.addQueueListener( "tk1", - false )); - vm3.invoke(() -> WANTestBase.addSecondQueueListener( "ln2", - false )); - vm4.invoke(() -> WANTestBase.addSecondQueueListener( "ny2", - false )); - vm5.invoke(() -> WANTestBase.addSecondQueueListener( "tk2", - false )); - - final Map keyValues = new HashMap(); - for(int i=0; i< 1; i++) { - keyValues.put(i, i); - } - vm3.invoke(() -> WANTestBase.putGivenKeyValue( getTestMethodName() + "_RR", - keyValues )); - - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1 )); - vm4.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1 )); - vm5.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1 )); - - Wait.pause(5000); - vm3.invoke(() -> WANTestBase.verifyQueueSize( "ln1", 0 )); - vm4.invoke(() -> WANTestBase.verifyQueueSize( "ny1", 0 )); - vm5.invoke(() -> WANTestBase.verifyQueueSize( "tk1", 0 )); - vm3.invoke(() -> WANTestBase.verifyQueueSize( "ln2", 0 )); - vm4.invoke(() -> WANTestBase.verifyQueueSize( "ny2", 0 )); - vm5.invoke(() -> WANTestBase.verifyQueueSize( "tk2", 0 )); - - - Map queueMap1 = (HashMap)vm3.invoke(() -> WANTestBase.checkQueue()); - Map queueMap2 = (HashMap)vm4.invoke(() -> WANTestBase.checkQueue()); - Map queueMap3 = (HashMap)vm5.invoke(() -> WANTestBase.checkQueue()); - Map queueMap4 = (HashMap)vm3.invoke(() -> WANTestBase.checkQueue2()); - Map queueMap5 = (HashMap)vm4.invoke(() -> WANTestBase.checkQueue2()); - Map queueMap6 = (HashMap)vm5.invoke(() -> WANTestBase.checkQueue2()); - - List createList1 = (List)queueMap1.get("Create"); - List updateList1 = (List)queueMap1.get("Update"); - List createList2 = (List)queueMap2.get("Create"); - List updateList2 = (List)queueMap2.get("Update"); - List createList3 = (List)queueMap3.get("Create"); - List updateList3 = (List)queueMap3.get("Update"); - - List createList4 = (List)queueMap4.get("Create"); - List updateList4 = (List)queueMap4.get("Update"); - - List createList5 = (List)queueMap5.get("Create"); - List updateList5 = (List)queueMap5.get("Update"); - - List createList6 = (List)queueMap6.get("Create"); - List updateList6 = (List)queueMap6.get("Update"); - - - assertEquals(0, updateList1.size()); - assertEquals(0, updateList2.size()); - assertEquals(0, updateList3.size()); - assertEquals(0, updateList4.size()); - assertEquals(0, updateList5.size()); - assertEquals(0, updateList6.size()); - - assertEquals(1, createList1.size()); - assertEquals(0, createList2.size()); - assertEquals(0, createList3.size()); - assertEquals(1, createList4.size()); - assertEquals(0, createList5.size()); - assertEquals(0, createList6.size()); - } - -}
