http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java deleted file mode 100644 index b754254..0000000 --- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java +++ /dev/null @@ -1,499 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.wan.parallel; - -import org.junit.Ignore; -import org.junit.experimental.categories.Category; -import org.junit.Test; - -import static org.junit.Assert.*; - -import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase; -import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase; -import com.gemstone.gemfire.test.junit.categories.DistributedTest; - -import static com.gemstone.gemfire.test.dunit.Wait.*; -import static com.gemstone.gemfire.test.dunit.IgnoredException.*; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Map; - -import org.junit.experimental.categories.Category; - -import com.gemstone.gemfire.internal.cache.wan.WANTestBase; -import com.gemstone.gemfire.test.dunit.AsyncInvocation; -import com.gemstone.gemfire.test.dunit.VM; -import com.gemstone.gemfire.test.junit.categories.FlakyTest; - -@Category(DistributedTest.class) -public class ParallelWANStatsDUnitTest extends WANTestBase{ - - private static final int NUM_PUTS = 100; - private static final long serialVersionUID = 1L; - - private String testName; - - public ParallelWANStatsDUnitTest() { - super(); - } - - @Override - protected final void postSetUpWANTestBase() throws Exception { - this.testName = getTestMethodName(); - } - - @Test - public void testPartitionedRegionParallelPropagation_BeforeDispatch() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createSendersWithConflation(lnPort); - - createSenderPRs(0); - - startPausedSenders(); - - createReceiverPR(vm2, 1); - createReceiverPR(vm3, 1); - - putKeyValues(); - - ArrayList<Integer> v4List = (ArrayList<Integer>)vm4.invoke(() -> WANTestBase.getSenderStats( "ln", NUM_PUTS )); - ArrayList<Integer> v5List = (ArrayList<Integer>)vm5.invoke(() -> WANTestBase.getSenderStats( "ln", NUM_PUTS )); - ArrayList<Integer> v6List = (ArrayList<Integer>)vm6.invoke(() -> WANTestBase.getSenderStats( "ln", NUM_PUTS )); - ArrayList<Integer> v7List = (ArrayList<Integer>)vm7.invoke(() -> WANTestBase.getSenderStats( "ln", NUM_PUTS )); - - assertEquals(NUM_PUTS, v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0) ); //queue size - assertEquals(NUM_PUTS, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); //eventsReceived - assertEquals(NUM_PUTS, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)); //events queued - assertEquals(0, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); //events distributed - assertEquals(0, v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4)); //batches distributed - assertEquals(0, v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)); //batches redistributed - - } - - @Test - public void testPartitionedRegionParallelPropagation_AfterDispatch_NoRedundancy() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2); - createReceiverInVMs(vm2); - - createSenders(lnPort); - - createReceiverPR(vm2, 0); - - createSenderPRs(0); - - startSenderInVMs("ln", vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.doPuts( testName, - NUM_PUTS )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - testName, NUM_PUTS )); - - - ArrayList<Integer> v4List = (ArrayList<Integer>)vm4.invoke(() -> WANTestBase.getSenderStats( "ln", 0 )); - ArrayList<Integer> v5List = (ArrayList<Integer>)vm5.invoke(() -> WANTestBase.getSenderStats( "ln", 0 )); - ArrayList<Integer> v6List = (ArrayList<Integer>)vm6.invoke(() -> WANTestBase.getSenderStats( "ln", 0 )); - ArrayList<Integer> v7List = (ArrayList<Integer>)vm7.invoke(() -> WANTestBase.getSenderStats( "ln", 0 )); - - assertEquals(0, v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0) ); //queue size - assertEquals(NUM_PUTS, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); //eventsReceived - assertEquals(NUM_PUTS, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)); //events queued - assertEquals(NUM_PUTS, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); //events distributed - assertTrue(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 10); //batches distributed - assertEquals(0, v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)); //batches redistributed - - vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(10, NUM_PUTS, NUM_PUTS )); - } - - @Test - public void testPartitionedRegionParallelPropagation_AfterDispatch_Redundancy_3() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2); - createReceiverInVMs(vm2); - - createSenders(lnPort); - - createReceiverPR(vm2, 0); - - createSenderPRs(3); - - startSenderInVMs("ln", vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.doPuts( testName, - NUM_PUTS )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - testName, NUM_PUTS )); - - ArrayList<Integer> v4List = (ArrayList<Integer>)vm4.invoke(() -> WANTestBase.getSenderStats( "ln", 0 )); - ArrayList<Integer> v5List = (ArrayList<Integer>)vm5.invoke(() -> WANTestBase.getSenderStats( "ln", 0 )); - ArrayList<Integer> v6List = (ArrayList<Integer>)vm6.invoke(() -> WANTestBase.getSenderStats( "ln", 0 )); - ArrayList<Integer> v7List = (ArrayList<Integer>)vm7.invoke(() -> WANTestBase.getSenderStats( "ln", 0 )); - - assertEquals(0, v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0) ); //queue size - assertEquals(400, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); //eventsReceived - assertEquals(400, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)); //events queued - assertEquals(NUM_PUTS, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); //events distributed - assertTrue(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 10); //batches distributed - assertEquals(0, v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)); //batches redistributed - - vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(10, NUM_PUTS, NUM_PUTS )); - } - - @Test - public void testWANStatsTwoWanSites_Bug44331() throws Exception { - Integer lnPort = createFirstLocatorWithDSId(1); - Integer nyPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - Integer tkPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 3, lnPort )); - - createCacheInVMs(nyPort, vm2); - createCacheInVMs(tkPort, vm3); - createReceiverInVMs(vm2); - createReceiverInVMs(vm3); - - vm4.invoke(() -> WANTestBase.createCache(lnPort )); - - vm4.invoke(() -> WANTestBase.createSender( "ln1", - 2, true, 100, 10, false, false, null, true )); - - vm4.invoke(() -> WANTestBase.createSender( "ln2", - 3, true, 100, 10, false, false, null, true )); - - createReceiverPR(vm2, 0); - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - testName, null, 0, 10, isOffHeap() )); - - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - testName, "ln1,ln2", 0, 10, isOffHeap() )); - - vm4.invoke(() -> WANTestBase.startSender( "ln1" )); - - vm4.invoke(() -> WANTestBase.startSender( "ln2" )); - - vm4.invoke(() -> WANTestBase.doPuts( testName, - NUM_PUTS )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - testName, NUM_PUTS )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - testName, NUM_PUTS )); - - ArrayList<Integer> v4Sender1List = (ArrayList<Integer>)vm4.invoke(() -> WANTestBase.getSenderStats( "ln1", 0 )); - ArrayList<Integer> v4Sender2List = (ArrayList<Integer>)vm4.invoke(() -> WANTestBase.getSenderStats( "ln2", 0 )); - - assertEquals(0, v4Sender1List.get(0).intValue()); //queue size - assertEquals(NUM_PUTS, v4Sender1List.get(1).intValue()); //eventsReceived - assertEquals(NUM_PUTS, v4Sender1List.get(2).intValue()); //events queued - assertEquals(NUM_PUTS, v4Sender1List.get(3).intValue()); //events distributed - assertTrue(v4Sender1List.get(4).intValue()>=10); //batches distributed - assertEquals(0, v4Sender1List.get(5).intValue()); //batches redistributed - - assertEquals(0, v4Sender2List.get(0).intValue()); //queue size - assertEquals(NUM_PUTS, v4Sender2List.get(1).intValue()); //eventsReceived - assertEquals(NUM_PUTS, v4Sender2List.get(2).intValue()); //events queued - assertEquals(NUM_PUTS, v4Sender2List.get(3).intValue()); //events distributed - assertTrue(v4Sender2List.get(4).intValue()>=10); //batches distributed - assertEquals(0, v4Sender2List.get(5).intValue()); //batches redistributed - - vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(10, NUM_PUTS, NUM_PUTS )); - vm3.invoke(() -> WANTestBase.checkGatewayReceiverStats(10, NUM_PUTS, NUM_PUTS )); - } - - @Test - public void testParallelPropagationHA() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2); - createReceiverInVMs(vm2); - - createSenders(lnPort); - - createReceiverPR(vm2, 0); - - createSenderPRs(3); - - startSenderInVMs("ln", vm4, vm5, vm6, vm7); - - AsyncInvocation inv1 = vm5.invokeAsync(() -> WANTestBase.doPuts( testName, 1000 )); - pause(200); - AsyncInvocation inv2 = vm4.invokeAsync(() -> WANTestBase.killSender()); - inv1.join(); - inv2.join(); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - testName, 1000 )); - - ArrayList<Integer> v5List = (ArrayList<Integer>)vm5.invoke(() -> WANTestBase.getSenderStats( "ln", 0)); - ArrayList<Integer> v6List = (ArrayList<Integer>)vm6.invoke(() -> WANTestBase.getSenderStats( "ln", 0)); - ArrayList<Integer> v7List = (ArrayList<Integer>)vm7.invoke(() -> WANTestBase.getSenderStats( "ln", 0)); - - assertEquals(0, v5List.get(0) + v6List.get(0) + v7List.get(0) ); //queue size - int receivedEvents = v5List.get(1) + v6List.get(1) + v7List.get(1); - //We may see a single retried event on all members due to the kill - assertTrue("Received " + receivedEvents, 3000 <= receivedEvents && 3003 >= receivedEvents); //eventsReceived - int queuedEvents = v5List.get(2) + v6List.get(2) + v7List.get(2); - assertTrue("Queued " + queuedEvents, 3000 <= queuedEvents && 3003 >= queuedEvents); //eventsQueued - //assertTrue(10000 <= v5List.get(3) + v6List.get(3) + v7List.get(3)); //events distributed : its quite possible that vm4 has distributed some of the events - //assertTrue(v5List.get(4) + v6List.get(4) + v7List.get(4) > 1000); //batches distributed : its quite possible that vm4 has distributed some of the batches. - assertEquals(0, v5List.get(5) + v6List.get(5) + v7List.get(5)); //batches redistributed - - vm2.invoke(() -> WANTestBase.checkGatewayReceiverStatsHA(NUM_PUTS, 1000, 1000 )); - } - - /** - * 1 region and sender configured on local site and 1 region and a - * receiver configured on remote site. Puts to the local region are in progress. - * Remote region is destroyed in the middle. - * - * @throws Exception - */ - @Test - public void testParallelPropagationWithRemoteRegionDestroy() throws Exception { - addIgnoredException("RegionDestroyedException"); - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2); - createReceiverPR(vm2, 0); - createReceiverInVMs(vm2); - - createSenders(lnPort); - - vm2.invoke(() -> WANTestBase.addCacheListenerAndDestroyRegion( - testName)); - - createSenderPRs(0); - - startSenderInVMs("ln", vm4, vm5, vm6, vm7); - - //start puts in RR_1 in another thread - vm4.invoke(() -> WANTestBase.doPuts( testName, 2000 )); - - //verify that all is well in local site. All the events should be present in local region - vm4.invoke(() -> WANTestBase.validateRegionSize( - testName, 2000 )); - - ArrayList<Integer> v4List = (ArrayList<Integer>)vm4.invoke(() -> WANTestBase.getSenderStats( "ln", -1)); - ArrayList<Integer> v5List = (ArrayList<Integer>)vm5.invoke(() -> WANTestBase.getSenderStats( "ln", -1)); - ArrayList<Integer> v6List = (ArrayList<Integer>)vm6.invoke(() -> WANTestBase.getSenderStats( "ln", -1)); - ArrayList<Integer> v7List = (ArrayList<Integer>)vm7.invoke(() -> WANTestBase.getSenderStats( "ln", -1)); - - - assertTrue(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 1); //batches distributed : its quite possible that vm4 has distributed some of the batches. - assertTrue(v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5) >= 1); //batches redistributed - } - - @Test - public void testParallelPropagationWithFilter() throws Exception { - - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2,lnPort )); - - createCacheInVMs(nyPort, vm2); - - createReceiverPR(vm2, 1); - - createReceiverInVMs(vm2); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - createSenderPRs(0); - - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, - new MyGatewayEventFilter(), true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, - new MyGatewayEventFilter(), true )); - vm6.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, - new MyGatewayEventFilter(), true )); - vm7.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, - new MyGatewayEventFilter(), true )); - - startSenderInVMs("ln", vm4, vm5, vm6, vm7); - - - - vm4.invoke(() -> WANTestBase.doPuts( testName, 1000 )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - testName, 800 )); - - ArrayList<Integer> v4List = (ArrayList<Integer>)vm4.invoke(() -> WANTestBase.getSenderStats( "ln", 0 )); - ArrayList<Integer> v5List = (ArrayList<Integer>)vm5.invoke(() -> WANTestBase.getSenderStats( "ln", 0 )); - ArrayList<Integer> v6List = (ArrayList<Integer>)vm6.invoke(() -> WANTestBase.getSenderStats( "ln", 0 )); - ArrayList<Integer> v7List = (ArrayList<Integer>)vm7.invoke(() -> WANTestBase.getSenderStats( "ln", 0 )); - - assertEquals(0, v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0) ); //queue size - assertEquals(1000, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); //eventsReceived - assertEquals(900, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)); //events queued - assertEquals(800, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); //events distributed - assertTrue(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 80); //batches distributed - assertEquals(0, v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)); //batches redistributed - assertEquals(200, v4List.get(6) + v5List.get(6) + v6List.get(6) + v7List.get(6)); //events filtered - - vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(80, 800, 800)); - } - - @Test - public void testParallelPropagationConflation() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2); - createReceiverInVMs(vm2); - - createSendersWithConflation(lnPort); - - createSenderPRs(0); - - startPausedSenders(); - - createReceiverPR(vm2, 1); - - Map keyValues = putKeyValues(); - final Map updateKeyValues = new HashMap(); - for(int i=0;i<50;i++) { - updateKeyValues.put(i, i+"_updated"); - } - - vm4.invoke(() -> WANTestBase.putGivenKeyValue( testName, updateKeyValues )); - - vm4.invoke(() -> WANTestBase.checkQueueSize( "ln", keyValues.size() + updateKeyValues.size() /*creates aren't conflated*/ )); - - // Do the puts again. Since these are updates, the previous updates will be conflated. - vm4.invoke(() -> WANTestBase.putGivenKeyValue( testName, updateKeyValues )); - - vm4.invoke(() -> WANTestBase.checkQueueSize( "ln", keyValues.size() + updateKeyValues.size() /*creates aren't conflated*/ )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - testName, 0 )); - - vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(0, 0, 0)); - - vm4.invoke(() -> WANTestBase.resumeSender( "ln" )); - vm5.invoke(() -> WANTestBase.resumeSender( "ln" )); - vm6.invoke(() -> WANTestBase.resumeSender( "ln" )); - vm7.invoke(() -> WANTestBase.resumeSender( "ln" )); - - keyValues.putAll(updateKeyValues); - vm2.invoke(() -> WANTestBase.validateRegionSize( - testName, keyValues.size() )); - - vm2.invoke(() -> WANTestBase.validateRegionContents( - testName, keyValues )); - - vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(0, 150, NUM_PUTS)); - - vm4.invoke(() -> WANTestBase.checkQueueSize( "ln", 0 )); - - ArrayList<Integer> v4List = (ArrayList<Integer>)vm4.invoke(() -> WANTestBase.getSenderStats( "ln", 0 )); - ArrayList<Integer> v5List = (ArrayList<Integer>)vm5.invoke(() -> WANTestBase.getSenderStats( "ln", 0 )); - ArrayList<Integer> v6List = (ArrayList<Integer>)vm6.invoke(() -> WANTestBase.getSenderStats( "ln", 0 )); - ArrayList<Integer> v7List = (ArrayList<Integer>)vm7.invoke(() -> WANTestBase.getSenderStats( "ln", 0 )); - - assertEquals(0, v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0) ); //queue size - assertEquals(200, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); //eventsReceived - assertEquals(200, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)); //events queued - assertEquals(150, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); //events distributed - assertTrue(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 10); //batches distributed - assertEquals(0, v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)); //batches redistributed - assertEquals(50, v4List.get(7) + v5List.get(7) + v6List.get(7) + v7List.get(7)); //events conflated - - } - - protected Map putKeyValues() { - final Map keyValues = new HashMap(); - for(int i=0; i< NUM_PUTS; i++) { - keyValues.put(i, i); - } - - vm4.invoke(() -> WANTestBase.putGivenKeyValue( testName, keyValues )); - - vm4.invoke(() -> WANTestBase.checkQueueSize( "ln", keyValues.size() )); - - return keyValues; - } - - protected void createReceiverPR(VM vm, int redundancy) { - vm.invoke(() -> WANTestBase.createPartitionedRegion( - testName, null, redundancy, 10, isOffHeap() )); - } - - protected void createSenderPRs(int redundancy) { - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - testName, "ln", redundancy, 10, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - testName, "ln", redundancy, 10, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createPartitionedRegion( - testName, "ln", redundancy, 10, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createPartitionedRegion( - testName, "ln", redundancy, 10, isOffHeap() )); - } - - protected void startPausedSenders() { - startSenderInVMs("ln", vm4, vm5, vm6, vm7); - - vm4.invoke(() ->pauseSender( "ln" )); - vm5.invoke(() ->pauseSender( "ln" )); - vm6.invoke(() ->pauseSender( "ln" )); - vm7.invoke(() ->pauseSender( "ln" )); - } - - protected void createSendersWithConflation(Integer lnPort) { - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, true, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, true, false, null, true )); - vm6.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, true, false, null, true )); - vm7.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, true, false, null, true )); - } - - protected void createSenders(Integer lnPort) { - vm4.invoke(() -> WANTestBase.createCache(lnPort )); - vm5.invoke(() -> WANTestBase.createCache(lnPort )); - vm6.invoke(() -> WANTestBase.createCache(lnPort )); - vm7.invoke(() -> WANTestBase.createCache(lnPort )); - - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - vm6.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - vm7.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderDistributedDeadlockDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderDistributedDeadlockDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderDistributedDeadlockDUnitTest.java deleted file mode 100644 index b38d35b..0000000 --- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderDistributedDeadlockDUnitTest.java +++ /dev/null @@ -1,405 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.wan.serial; - -import org.junit.experimental.categories.Category; -import org.junit.Test; - -import static org.junit.Assert.*; - -import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase; -import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase; -import com.gemstone.gemfire.test.junit.categories.DistributedTest; - -import com.gemstone.gemfire.cache.CacheFactory; -import com.gemstone.gemfire.cache.CacheTransactionManager; -import com.gemstone.gemfire.cache.Region; -import com.gemstone.gemfire.cache.execute.*; -import com.gemstone.gemfire.distributed.DistributedSystem; -import com.gemstone.gemfire.internal.cache.wan.WANTestBase; -import com.gemstone.gemfire.test.dunit.AsyncInvocation; -import com.gemstone.gemfire.test.dunit.Wait; - -//The tests here are to validate changes introduced because a distributed deadlock -//was found that caused issues for a production customer. -// -//There are 4 tests which use sender gateways with primaries on different -//JVM's. Two tests use replicated and two use partition regions and the -//the tests vary the conserve-sockets. -// -//currently the 4th test using PR, conserve-sockets=true hangs/fails and is commented -//out to prevent issues -@Category(DistributedTest.class) -public class SerialGatewaySenderDistributedDeadlockDUnitTest extends WANTestBase { - - public SerialGatewaySenderDistributedDeadlockDUnitTest() { - super(); - } - - //Uses replicated regions and conserve-sockets=false - @Test - public void testPrimarySendersOnDifferentVMsReplicated() throws Exception { - - Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstPeerLocator(1)); - - Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); - - createCachesWith(Boolean.FALSE, nyPort, lnPort); - - createSerialSenders(); - - createReplicatedRegions(nyPort); - - //get one primary sender on vm4 and another primary on vm5 - //the startup order matters here - startSerialSenders(); - - //exercise region and gateway operations with different messaging - exerciseWANOperations(); - AsyncInvocation invVM4transaction = vm4.invokeAsync(() -> WANTestBase.doTxPuts(getTestMethodName() + "_RR")); - AsyncInvocation invVM5transaction = vm5.invokeAsync(() -> WANTestBase.doTxPuts(getTestMethodName() + "_RR")); - AsyncInvocation invVM4 = vm4.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1000)); - AsyncInvocation invVM5 = vm5.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1000)); - - exerciseFunctions(); - - try { - invVM4transaction.join(); - invVM5transaction.join(); - invVM4.join(); - invVM5.join(); - - } catch (InterruptedException e) { - e.printStackTrace(); - fail(); - } - } - - //Uses partitioned regions and conserve-sockets=false - @Test - public void testPrimarySendersOnDifferentVMsPR() throws Exception { - Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstPeerLocator(1)); - - Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); - - createCachesWith(Boolean.FALSE, nyPort, lnPort); - - createSerialSenders(); - - createPartitionedRegions(nyPort); - - startSerialSenders(); - - exerciseWANOperations(); - AsyncInvocation invVM4transaction - = vm4.invokeAsync(() -> SerialGatewaySenderDistributedDeadlockDUnitTest.doTxPutsPR(getTestMethodName() + "_RR", 100, 1000)); - AsyncInvocation invVM5transaction - = vm5.invokeAsync(() -> SerialGatewaySenderDistributedDeadlockDUnitTest.doTxPutsPR(getTestMethodName() + "_RR", 100, 1000)); - - AsyncInvocation invVM4 = vm4.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1000)); - AsyncInvocation invVM5 = vm5.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1000)); - - exerciseFunctions(); - - try { - invVM4transaction.join(); - invVM5transaction.join(); - invVM4.join(); - invVM5.join(); - } catch (InterruptedException e) { - e.printStackTrace(); - fail(); - } - } - - //Uses replicated regions and conserve-sockets=true - @Test - public void testPrimarySendersOnDifferentVMsReplicatedSocketPolicy() throws Exception { - - Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstPeerLocator(1)); - - Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); - - createCachesWith(Boolean.TRUE, nyPort, lnPort); - - createSerialSenders(); - - createReplicatedRegions(nyPort); - - //get one primary sender on vm4 and another primary on vm5 - //the startup order matters here - startSerialSenders(); - - //exercise region and gateway operations with messaging - exerciseWANOperations(); - AsyncInvocation invVM4transaction = vm4.invokeAsync(() -> WANTestBase.doTxPuts(getTestMethodName() + "_RR")); - AsyncInvocation invVM5transaction = vm5.invokeAsync(() -> WANTestBase.doTxPuts(getTestMethodName() + "_RR")); - - AsyncInvocation invVM4 = vm4.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1000)); - AsyncInvocation invVM5 = vm5.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1000)); - - exerciseFunctions(); - - try { - invVM4transaction.join(); - invVM5transaction.join(); - invVM4.join(); - invVM5.join(); - } catch (InterruptedException e) { - e.printStackTrace(); - fail(); - } - } - - //Uses partitioned regions and conserve-sockets=true - //this always causes a distributed deadlock - @Test - public void testPrimarySendersOnDifferentVMsPRSocketPolicy() throws Exception { - Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstPeerLocator(1)); - - Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); - - createCachesWith(Boolean.TRUE, nyPort, lnPort); - - createSerialSenders(); - - createPartitionedRegions(nyPort); - - startSerialSenders(); - - exerciseWANOperations(); - AsyncInvocation invVM4transaction - = vm4.invokeAsync(() -> SerialGatewaySenderDistributedDeadlockDUnitTest.doTxPutsPR(getTestMethodName() + "_RR", 100, 1000)); - AsyncInvocation invVM5transaction - = vm5.invokeAsync(() -> SerialGatewaySenderDistributedDeadlockDUnitTest.doTxPutsPR(getTestMethodName() + "_RR", 100, 1000)); - - AsyncInvocation invVM4 = vm4.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1000)); - AsyncInvocation invVM5 = vm5.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1000)); - - exerciseFunctions(); - - try { - invVM4transaction.join(); - invVM5transaction.join(); - invVM4.join(); - invVM5.join(); - } catch (InterruptedException e) { - e.printStackTrace(); - fail(); - } - } - - //************************************************************************** - //Utility methods used by tests - //************************************************************************** - private void createReplicatedRegions(Integer nyPort) throws Exception { - //create receiver - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, false)); - vm2.invoke(() -> WANTestBase.createReceiver()); - - //create senders - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln1,ln2", false)); - - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln1,ln2", false)); - } - - private void createCachesWith(Boolean socketPolicy, Integer nyPort, Integer lnPort) { - vm2.invoke(() -> WANTestBase.createCacheConserveSockets(socketPolicy, nyPort)); - - vm4.invoke(() -> WANTestBase.createCacheConserveSockets(socketPolicy, lnPort)); - - vm5.invoke(() -> WANTestBase.createCacheConserveSockets(socketPolicy, lnPort)); - } - - private void exerciseFunctions() throws Exception { - //do function calls that use a shared connection - for (int x = 0; x < 1000; x++) { - //setting it to Boolean.TRUE it should pass the test - vm4.invoke(() -> SerialGatewaySenderDistributedDeadlockDUnitTest.doFunctionPuts(getTestMethodName() + "_RR", 1, Boolean.TRUE)); - vm5.invoke(() -> SerialGatewaySenderDistributedDeadlockDUnitTest.doFunctionPuts(getTestMethodName() + "_RR", 1, Boolean.TRUE)); - } - for (int x = 0; x < 1000; x++) { - //setting the Boolean.FALSE below will cause a deadlock in some GFE versions - //setting it to Boolean.TRUE as above it should pass the test - //this is similar to the customer found distributed deadlock - vm4.invoke(() -> SerialGatewaySenderDistributedDeadlockDUnitTest.doFunctionPuts(getTestMethodName() + "_RR", 1, Boolean.FALSE)); - vm5.invoke(() -> SerialGatewaySenderDistributedDeadlockDUnitTest.doFunctionPuts(getTestMethodName() + "_RR", 1, Boolean.FALSE)); - } - } - - private void createPartitionedRegions(Integer nyPort) throws Exception { - //create remote receiver - vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_RR", - "", 0, 113, false)); - - vm2.invoke(() -> WANTestBase.createReceiver()); - - //create sender vms - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_RR", "ln1,ln2", 1, 113, false)); - - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_RR", "ln1,ln2", 1, 113, false)); - } - - private void exerciseWANOperations() throws Exception { - //note - some of these should be made async to truly exercise the - //messaging between the WAN gateways and members - - //exercise region and gateway operations - vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 100)); - vm5.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 100)); - Wait.pause(2000); //wait for events to propagate - vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 100)); - vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 100)); - vm5.invoke(() -> WANTestBase.doDestroys(getTestMethodName() + "_RR", 100)); - Wait.pause(2000);//wait for events to propagate - vm5.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 0)); - vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 0)); - vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 100)); - vm5.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 100)); - Wait.pause(2000); //wait for events to propagate - vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 100)); - vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 100)); - vm4.invoke(() -> SerialGatewaySenderDistributedDeadlockDUnitTest.doInvalidates(getTestMethodName() + "_RR", 100, 100)); - vm4.invoke(() -> WANTestBase.doPutAll(getTestMethodName() + "_RR", 100, 10)); - vm5.invoke(() -> WANTestBase.doPutAll(getTestMethodName() + "_RR", 100, 10)); - Wait.pause(2000);//wait for events to propagate - vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 1000)); - vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 1000)); - vm4.invoke(() -> WANTestBase.doDestroys(getTestMethodName() + "_RR", 1000)); - Wait.pause(2000);//wait for events to propagate - vm5.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 0)); - vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 0)); - vm4.invoke(() -> WANTestBase.doPutsPDXSerializable(getTestMethodName() + "_RR", 100)); - Wait.pause(2000); - vm5.invoke(() -> WANTestBase.validateRegionSize_PDX(getTestMethodName() + "_RR", 100)); - vm2.invoke(() -> WANTestBase.validateRegionSize_PDX(getTestMethodName() + "_RR", 100)); - } - - private void startSerialSenders() throws Exception { - //get one primary sender on vm4 and another primary on vm5 - //the startup order matters here so that primaries are - //on different JVM's - vm4.invoke(() -> WANTestBase.startSender("ln1")); - - vm5.invoke(() -> WANTestBase.startSender("ln2")); - - //start secondaries - vm5.invoke(() -> WANTestBase.startSender("ln1")); - - vm4.invoke(() -> WANTestBase.startSender("ln2")); - } - - private void createSerialSenders() throws Exception { - - vm4.invoke(() -> WANTestBase.createSender("ln1", 2, - false, 100, 10, false, false, null, true)); - - vm5.invoke(() -> WANTestBase.createSender("ln1", 2, - false, 100, 10, false, false, null, true)); - - vm4.invoke(() -> WANTestBase.createSender("ln2", 2, - false, 100, 10, false, false, null, true)); - - vm5.invoke(() -> WANTestBase.createSender("ln2", 2, - false, 100, 10, false, false, null, true)); - } - - public static void doFunctionPuts(String name, int num, Boolean useThreadOwnedSocket) throws Exception { - Region region = CacheFactory.getAnyInstance().getRegion(name); - FunctionService.registerFunction(new TestFunction()); - Execution exe = FunctionService.onRegion(region); - for (int x = 0; x < num; x++) { - exe.withArgs(useThreadOwnedSocket).execute("com.gemstone.gemfire.internal.cache.wan.serial.TestFunction"); - } - } - - public static void doTxPutsPR(String regionName, int numPuts, int size) throws Exception { - Region r = cache.getRegion(Region.SEPARATOR + regionName); - CacheTransactionManager mgr = cache.getCacheTransactionManager(); - for (int x = 0; x < numPuts; x++) { - int temp = (int) (Math.floor(Math.random() * size)); - try { - mgr.begin(); - r.put(temp, temp); - mgr.commit(); - } catch (com.gemstone.gemfire.cache.TransactionDataNotColocatedException txe) { - //ignore colocation issues or primary bucket issues - } catch (com.gemstone.gemfire.cache.CommitConflictException cce) { - //ignore - conflicts are ok and expected - } - } - } - - public static void doInvalidates(String regionName, int numInvalidates, int size) throws Exception { - Region r = cache.getRegion(Region.SEPARATOR + regionName); - for (int x = 0; x < numInvalidates; x++) { - int temp = (int) (Math.floor(Math.random() * size)); - try { - if (r.containsValueForKey(temp)) { - r.invalidate(temp); - } - } catch (com.gemstone.gemfire.cache.EntryNotFoundException entryNotFoundException) { - //ignore as an entry may not exist - } - } - } - -} - -class TestFunction implements Function { - - @Override - public boolean hasResult() { - return false; - } - - @Override - public void execute(FunctionContext fc) { - boolean option = (Boolean) fc.getArguments(); - if (option) { - DistributedSystem.setThreadsSocketPolicy(false); - } - RegionFunctionContext context = (RegionFunctionContext) fc; - Region local = context.getDataSet(); - local.put(randKeyValue(10), randKeyValue(10000)); - } - - @Override - public String getId() { - return this.getClass().getName(); - } - - @Override - public boolean optimizeForWrite() { - return false; - } - - @Override - public boolean isHA() { - return false; - } - - private int randKeyValue(int size) { - double temp = Math.floor(Math.random() * size); - return (int) temp; - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderEventListenerDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderEventListenerDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderEventListenerDUnitTest.java deleted file mode 100644 index 8c255c1..0000000 --- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderEventListenerDUnitTest.java +++ /dev/null @@ -1,383 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.wan.serial; - -import com.jayway.awaitility.Awaitility; -import org.junit.Ignore; -import org.junit.experimental.categories.Category; -import org.junit.Test; - -import static org.junit.Assert.*; - -import com.gemstone.gemfire.test.junit.categories.DistributedTest; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; - -import org.junit.experimental.categories.Category; - -import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener; -import com.gemstone.gemfire.cache.wan.GatewaySender; -import com.gemstone.gemfire.internal.AvailablePortHelper; -import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender; -import com.gemstone.gemfire.internal.cache.wan.MyGatewaySenderEventListener; -import com.gemstone.gemfire.internal.cache.wan.MyGatewaySenderEventListener2; -import com.gemstone.gemfire.internal.cache.wan.WANTestBase; -import com.gemstone.gemfire.test.dunit.Wait; -import com.gemstone.gemfire.test.dunit.WaitCriterion; -import com.gemstone.gemfire.test.junit.categories.FlakyTest; - -@Category(DistributedTest.class) -public class SerialGatewaySenderEventListenerDUnitTest extends WANTestBase { - - private static final long serialVersionUID = 1L; - - public SerialGatewaySenderEventListenerDUnitTest() { - super(); - } - - /** - * Test validates whether the listener attached receives all the events. - * this test hangs after the Darrel's checkin 36685. Need to work with Darrel.Commenting it out so that test suit will not hang - */ - @Ignore - @Test - public void testGatewaySenderEventListenerInvocationWithoutLocator() { - int mPort = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); - vm4.invoke(() -> WANTestBase.createCacheWithoutLocator(mPort )); - vm5.invoke(() -> WANTestBase.createCacheWithoutLocator(mPort )); - vm6.invoke(() -> WANTestBase.createCacheWithoutLocator(mPort )); - vm7.invoke(() -> WANTestBase.createCacheWithoutLocator(mPort )); - - vm4.invoke(() -> WANTestBase.createSenderWithListener( "ln", 2, - false, 100, 10, false, false, null, false, true)); - vm5.invoke(() -> WANTestBase.createSenderWithListener( "ln", 2, - false, 100, 10, false, false, null, false, true)); - - startSenderInVMs("ln", vm4, vm5); - - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - - final Map keyValues = new HashMap(); - for(int i=0; i< 1000; i++) { - keyValues.put(i, i); - } - - vm4.invoke(() -> WANTestBase.putGivenKeyValue( getTestMethodName() + "_RR", - keyValues )); - - vm4.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", keyValues.size() )); - - vm5.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", keyValues.size() )); - - vm4.invoke(() -> WANTestBase.printEventListenerMap()); - vm5.invoke(() -> WANTestBase.printEventListenerMap()); - - fail("tried to invoke missing method"); -// vm4.invoke(() -> SerialGatewaySenderEventListenerDUnitTest.validateReceivedEventsMapSizeListener1("ln", keyValues )); - } - - /** - * Test validates whether the listener attached receives all the events. - */ - @Test - public void testGatewaySenderEventListenerInvocation() { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createSenderWithListener( "ln", 2, - false, 100, 10, false, false, null, false, true)); - vm5.invoke(() -> WANTestBase.createSenderWithListener( "ln", 2, - false, 100, 10, false, false, null, false, true)); - - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5); - - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - - final Map keyValues = new HashMap(); - for(int i=0; i< 1000; i++) { - keyValues.put(i, i); - } - - vm4.invoke(() -> WANTestBase.putGivenKeyValue( getTestMethodName() + "_RR", - keyValues )); - - vm4.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", keyValues.size() )); - - vm5.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", keyValues.size() )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 0 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 0 )); - - vm4.invoke(() -> SerialGatewaySenderEventListenerDUnitTest.validateReceivedEventsMapSizeListener1("ln", keyValues )); - } - - /** - * Test validates whether the listener attached receives all the events. - * When there are 2 listeners attached to the GatewaySender. - */ - @Test - public void testGatewaySender2EventListenerInvocation() { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createSenderWithListener( "ln", 2, - false, 100, 10, false, false, null, true, true)); - vm5.invoke(() -> WANTestBase.createSenderWithListener( "ln", 2, - false, 100, 10, false, false, null, true, true)); - - startSenderInVMs("ln", vm4, vm5); - - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - - final Map keyValues = new HashMap(); - for(int i=0; i< 1000; i++) { - keyValues.put(i, i); - } - - - vm4.invoke(() -> WANTestBase.putGivenKeyValue( getTestMethodName() + "_RR", - keyValues )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 0 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 0 )); - - // TODO: move validateReceivedEventsMapSizeListener2 to a shared util class - vm4.invoke(() -> SerialGatewaySenderEventListenerDUnitTest.validateReceivedEventsMapSizeListener2("ln", keyValues )); - } - - /** - * Test validates whether the PoolImpl is created. Ideally when a listener is attached - * pool should not be created. - */ - @Test - public void testGatewaySenderEventListenerPoolImpl() { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4); - - vm4.invoke(() -> WANTestBase.createSenderWithListener( "ln", 2, - false, 100, 10, false, false, null, false, false )); - - vm4.invoke(() -> SerialGatewaySenderEventListenerDUnitTest.validateNoPoolCreation("ln" )); - } - - // Test start/stop/resume on listener invocation - //this test hangs after the Darrel's checkin 36685. Need to work with Darrel.Commenting it out so that test suit will not hang - @Ignore - @Test - public void testGatewaySenderEventListener_GatewayOperations() { - - int mPort = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); - vm4.invoke(() -> WANTestBase.createCacheWithoutLocator(mPort )); - vm5.invoke(() -> WANTestBase.createCacheWithoutLocator(mPort )); - vm6.invoke(() -> WANTestBase.createCacheWithoutLocator(mPort )); - vm7.invoke(() -> WANTestBase.createCacheWithoutLocator(mPort )); - - vm4.invoke(() -> WANTestBase.createSenderWithListener( "ln", 2, - false, 100, 10, false, false, null, false, true)); - - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - - final Map initialKeyValues = new HashMap(); - for(int i=0; i< 1000; i++) { - initialKeyValues.put(i, i); - } - - vm4.invoke(() -> WANTestBase.putGivenKeyValue( getTestMethodName() + "_RR", - initialKeyValues )); - - fail("tried to invoke missing method"); -// vm4.invoke(() -> SerialGatewaySenderEventListenerDUnitTest.validateReceivedEventsMapSizeListener1("ln", initialKeyValues )); - - vm4.invoke(() -> WANTestBase.stopSender( "ln" )); - - final Map keyValues = new HashMap(); - for(int i=1000; i< 2000; i++) { - keyValues.put(i, i); - } - - vm4.invoke(() -> WANTestBase.putGivenKeyValue( getTestMethodName() + "_RR", - keyValues )); - - fail("tried to invoke missing method"); -// vm4.invoke(() -> SerialGatewaySenderEventListenerDUnitTest.validateReceivedEventsMapSizeListener1("ln", initialKeyValues )); - - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - - final Map finalKeyValues = new HashMap(); - for(int i=2000; i< 3000; i++) { - finalKeyValues.put(i, i); - } - - vm4.invoke(() -> WANTestBase.putGivenKeyValue( getTestMethodName() + "_RR", - finalKeyValues )); - - finalKeyValues.putAll(initialKeyValues); - fail("tried to invoke missing method"); -// vm4.invoke(() -> SerialGatewaySenderEventListenerDUnitTest.validateReceivedEventsMapSizeListener1("ln", finalKeyValues )); - - } - - public static void validateNoPoolCreation(final String siteId) { - Set<GatewaySender> senders = cache.getGatewaySenders(); - for(GatewaySender sender: senders) { - if (sender.getId().equals(siteId)) { - AbstractGatewaySender sImpl = (AbstractGatewaySender)sender; - assertNull(sImpl.getProxy()); - } - } - } - - public static void validateReceivedEventsMapSizeListener1(final String senderId, final Map map) { - - Set<GatewaySender> senders = cache.getGatewaySenders(); - GatewaySender sender = null; - for(GatewaySender s : senders){ - if(s.getId().equals(senderId)){ - sender = s; - break; - } - } - - final List<AsyncEventListener> listeners = ((AbstractGatewaySender)sender).getAsyncEventListeners(); - if(listeners.size() == 1) { - final AsyncEventListener l = listeners.get(0); - - WaitCriterion wc = new WaitCriterion() { - Map listenerMap; - public boolean done() { - listenerMap = ((MyGatewaySenderEventListener)l) - .getEventsMap(); - boolean sizeCorrect = map.size() == listenerMap.size(); - boolean keySetCorrect = listenerMap.keySet().containsAll(map.keySet()); - boolean valuesCorrect = listenerMap.values().containsAll(map.values()); - return sizeCorrect && keySetCorrect && valuesCorrect; - } - - public String description() { - return "Waiting for all sites to get updated, the sizes are " + listenerMap.size() + " and " + map.size(); - } - }; - Wait.waitForCriterion(wc, 60000, 500, true); - } - } - - public static void validateReceivedEventsMapSizeListener2(final String senderId, final Map map) { - - Set<GatewaySender> senders = cache.getGatewaySenders(); - GatewaySender sender = null; - for(GatewaySender s : senders){ - if(s.getId().equals(senderId)){ - sender = s; - break; - } - } - - final List<AsyncEventListener> listeners = ((AbstractGatewaySender)sender).getAsyncEventListeners(); - if(listeners.size() == 2) { - final AsyncEventListener l1 = listeners.get(0); - final AsyncEventListener l2 = listeners.get(1); - Awaitility.await().atMost(60000, TimeUnit.MILLISECONDS).until(()->{ - Map listenerMap1 = ((MyGatewaySenderEventListener)l1) - .getEventsMap(); - - Map listenerMap2 = ((MyGatewaySenderEventListener2)l2) - .getEventsMap(); - int listener1MapSize = listenerMap1.size(); - int listener2MapSize = listenerMap1.size(); - int expectedMapSize = map.size(); - boolean sizeCorrect = expectedMapSize == listener1MapSize; - boolean keySetCorrect = listenerMap1.keySet().containsAll(map.keySet()); - boolean valuesCorrect = listenerMap1.values().containsAll(map.values()); - - boolean sizeCorrect2 = expectedMapSize== listener2MapSize; - boolean keySetCorrect2 = listenerMap2.keySet().containsAll(map.keySet()); - boolean valuesCorrect2 = listenerMap2.values().containsAll(map.values()); - - assertEquals("Failed while waiting for all sites to get updated with the correct events. \nThe " + - "size of listener 1's map = "+ listener1MapSize + "\n The size of listener 2's map = " + - ""+ listener2MapSize + "\n The expected map size =" + expectedMapSize , - true, sizeCorrect && keySetCorrect && valuesCorrect && sizeCorrect2 && keySetCorrect2 && valuesCorrect2); - }); - } - } -} - http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java deleted file mode 100644 index 200e5b6..0000000 --- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java +++ /dev/null @@ -1,665 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.wan.serial; - -import org.junit.experimental.categories.Category; -import org.junit.Test; - -import static org.junit.Assert.*; - -import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase; -import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase; -import com.gemstone.gemfire.test.junit.categories.DistributedTest; - -import java.util.HashSet; -import java.util.Set; - -import com.gemstone.gemfire.cache.RegionDestroyedException; -import com.gemstone.gemfire.cache.client.internal.locator.QueueConnectionRequest; -import com.gemstone.gemfire.cache.client.internal.locator.QueueConnectionResponse; -import com.gemstone.gemfire.cache.wan.GatewaySender; -import com.gemstone.gemfire.distributed.Locator; -import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; -import com.gemstone.gemfire.distributed.internal.InternalLocator; -import com.gemstone.gemfire.distributed.internal.ServerLocator; -import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID; -import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender; -import com.gemstone.gemfire.internal.cache.wan.GatewaySenderException; -import com.gemstone.gemfire.internal.cache.wan.WANTestBase; -import com.gemstone.gemfire.test.dunit.AsyncInvocation; -import com.gemstone.gemfire.test.dunit.IgnoredException; -import com.gemstone.gemfire.test.dunit.LogWriterUtils; -import com.gemstone.gemfire.test.dunit.RMIException; -import com.gemstone.gemfire.test.dunit.SerializableRunnable; -import com.gemstone.gemfire.test.dunit.VM; - -/** - * - */ -@Category(DistributedTest.class) -public class SerialGatewaySenderOperationsDUnitTest extends WANTestBase { - - private static final long serialVersionUID = 1L; - - public SerialGatewaySenderOperationsDUnitTest() { - super(); - } - - @Override - protected final void postSetUpWANTestBase() throws Exception { - IgnoredException.addIgnoredException("Broken pipe"); - IgnoredException.addIgnoredException("Connection reset"); - IgnoredException.addIgnoredException("Unexpected IOException"); - IgnoredException.addIgnoredException("Connection refused"); - IgnoredException.addIgnoredException("could not get remote locator information"); - - //Stopping the gateway closed the region, - //which causes this exception to get logged - IgnoredException.addIgnoredException(RegionDestroyedException.class.getSimpleName()); - } - - @Test - public void testSerialGatewaySenderOperationsWithoutStarting() { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createSenderCaches(lnPort); - - createSenderVM4(); - createSenderVM5(); - - createReceiverRegions(); - - createSenderRegions(); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", - 10 )); - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", - 100 )); - - vm4.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifyGatewaySenderOperations( "ln" )); - vm5.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifyGatewaySenderOperations( "ln" )); - - } - - protected void createSenderRegions() { - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - } - - protected void createReceiverRegions() { - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - } - - protected void createSenderCaches(Integer lnPort) { - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - vm6.invoke(() -> WANTestBase.createCache( lnPort )); - vm7.invoke(() -> WANTestBase.createCache( lnPort )); - } - - protected void createSenderVM5() { - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 10, false, true, null, true )); - } - - protected void createSenderVM4() { - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 10, false, true, null, true )); - } - - - @Test - public void testStartPauseResumeSerialGatewaySender() { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createSenderCaches(lnPort); - - createSenderVM4(); - createSenderVM5(); - - createReceiverRegions(); - - createSenderRegions(); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", - 10 )); - - startSenderInVMs("ln", vm4, vm5); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", - 100 )); - - vm4.invoke(() -> WANTestBase.pauseSender( "ln" )); - vm5.invoke(() -> WANTestBase.pauseSender( "ln" )); - - vm4.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderPausedState( "ln" )); - vm5.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderPausedState( "ln" )); - - AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", 10 )); - - vm4.invoke(() -> WANTestBase.resumeSender( "ln" )); - vm5.invoke(() -> WANTestBase.resumeSender( "ln" )); - - vm4.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderResumedState( "ln" )); - vm5.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderResumedState( "ln" )); - - try { - inv1.join(); - } catch (InterruptedException e) { - e.printStackTrace(); - fail("Interrupted the async invocation."); - } - - LogWriterUtils.getLogWriter().info("Completed puts in the region"); - - validateQueueContents(vm4, "ln", 0); - validateQueueContents(vm5, "ln", 0); - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 100 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 100 )); - - } - - @Test - public void testStopSerialGatewaySender() throws Throwable { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createSenderCaches(lnPort); - - createSenderVM4(); - createSenderVM5(); - - createReceiverRegions(); - - createSenderRegions(); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", - 20 )); - - startSenderInVMs("ln", vm4, vm5); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", - 20 )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 20 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 20 )); - - vm2.invoke(() -> WANTestBase.stopReceivers()); - vm3.invoke(() -> WANTestBase.stopReceivers()); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", - 20 )); - - vm4.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 20 )); - vm5.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 20 )); - - vm4.invoke(() -> WANTestBase.stopSender( "ln" )); - vm5.invoke(() -> WANTestBase.stopSender( "ln" )); - - vm4.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderStoppedState( "ln" )); - vm5.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderStoppedState( "ln" )); - - vm4.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 0 )); - vm5.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 0 )); - /** - * Should have no effect on GatewaySenderState - */ - vm4.invoke(() -> WANTestBase.resumeSender( "ln" )); - vm5.invoke(() -> WANTestBase.resumeSender( "ln" )); - - vm4.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderStoppedState( "ln" )); - vm5.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderStoppedState( "ln" )); - - AsyncInvocation vm4async = vm4.invokeAsync(() -> WANTestBase.startSender( "ln" )); - AsyncInvocation vm5async = vm5.invokeAsync(() -> WANTestBase.startSender( "ln" )); - int START_WAIT_TIME = 30000; - vm4async.getResult(START_WAIT_TIME); - vm5async.getResult(START_WAIT_TIME); - - vm4.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 20 )); - vm5.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 20 )); - - vm5.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", - 110 )); - - vm4.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 130 )); - vm5.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 130 )); - - vm2.invoke(() -> WANTestBase.startReceivers()); - vm3.invoke(() -> WANTestBase.startReceivers()); - - vm4.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderResumedState( "ln" )); - vm5.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderResumedState( "ln" )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 110 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 110 )); - - vm4.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 0 )); - vm5.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 0 )); - } - - @Test - public void testStopOneSerialGatewaySenderBothPrimary() throws Throwable { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createSenderCaches(lnPort); - - createSenderVM4(); - createSenderVM5(); - - createReceiverRegions(); - - createSenderRegions(); - - startSenderInVMs("ln", vm4, vm5); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", - 100 )); - - vm4.invoke(() -> WANTestBase.stopSender( "ln" )); - - vm4.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderStoppedState( "ln" )); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", - 200 )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 200 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 200 )); - - //Do some puts while restarting a sender - AsyncInvocation asyncPuts = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", - 300 )); - - Thread.sleep(10); - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - - asyncPuts.getResult(); - LogWriterUtils.getLogWriter().info("Completed puts in the region"); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 300 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 300 )); - - vm4.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 0 )); - - - } - - @Test - public void testStopOneSerialGatewaySender_PrimarySecondary() { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createSenderCaches(lnPort); - - createSenderVM4(); - createSenderVM5(); - - createReceiverRegions(); - - createSenderRegions(); - - startSenderInVMs("ln", vm4, vm5); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", - 10 )); - - vm4.invoke(() -> WANTestBase.stopSender( "ln" )); - - vm4.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderStoppedState( "ln" )); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", - 100 )); - - LogWriterUtils.getLogWriter().info("Completed puts in the region"); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 100 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 100 )); - } - - @Test - public void testStopOneSender_StartAnotherSender() { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2); - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - vm2.invoke(() -> WANTestBase.createReceiver()); - - createCacheInVMs(lnPort, vm4); - createSenderVM4(); - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", - 10 )); - vm4.invoke(() -> WANTestBase.stopSender( "ln" )); - vm4.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderStoppedState( "ln" )); - - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - createSenderVM5(); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.startSender( "ln" )); - - vm5.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", - 100 )); - LogWriterUtils.getLogWriter().info("Completed puts in the region"); - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 100 )); - } - - @Test - public void test_Bug44153_StopOneSender_StartAnotherSender_CheckQueueSize() { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - createSenderVM4(); - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", - 10 )); - validateQueueContents(vm4, "ln", 10); - vm4.invoke(() -> WANTestBase.stopSender( "ln" )); - - vm4.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderStoppedState( "ln" )); - - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - createSenderVM5(); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.startSender( "ln" )); - - vm5.invoke(() -> WANTestBase.doPutsFrom( getTestMethodName() + "_RR", 10, 110 )); - - validateQueueContents(vm5, "ln", 100); - vm5.invoke(() -> WANTestBase.stopSender( "ln" )); - vm5.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderStoppedState( "ln" )); - - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - validateQueueContents(vm4, "ln", 10); - vm4.invoke(() -> WANTestBase.stopSender( "ln" )); - - vm5.invoke(() -> WANTestBase.startSender( "ln" )); - createCacheInVMs(nyPort, vm2); - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - vm2.invoke(() -> WANTestBase.createReceiver()); - - LogWriterUtils.getLogWriter().info("Completed puts in the region"); - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 100 )); - vm5.invoke(() -> WANTestBase.stopSender( "ln" )); - - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 110 )); - vm4.invoke(() -> WANTestBase.stopSender( "ln" )); - } - - private void validateQueueContents(VM vm, String site, int size) { - vm.invoke(() -> WANTestBase.validateQueueContents( site, - size )); - } - - /** - * Destroy SerialGatewaySender on all the nodes. - */ - @Test - public void testDestroySerialGatewaySenderOnAllNodes() { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createSenderCaches(lnPort); - - createSenderVM4(); - createSenderVM5(); - - createReceiverRegions(); - - createSenderRegions(); - - startSenderInVMs("ln", vm4, vm5); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", - 10 )); - - //before destroying, stop the sender - vm4.invoke(() -> WANTestBase.stopSender( "ln" )); - vm5.invoke(() -> WANTestBase.stopSender( "ln" )); - - vm4.invoke(() -> WANTestBase.removeSenderFromTheRegion( "ln", getTestMethodName() + "_RR" )); - vm5.invoke(() -> WANTestBase.removeSenderFromTheRegion( "ln", getTestMethodName() + "_RR" )); - - vm4.invoke(() -> WANTestBase.destroySender( "ln" )); - vm5.invoke(() -> WANTestBase.destroySender( "ln" )); - - vm4.invoke(() -> WANTestBase.verifySenderDestroyed( "ln", false )); - vm5.invoke(() -> WANTestBase.verifySenderDestroyed( "ln", false )); - } - - /** - * Destroy SerialGatewaySender on a single node. - */ - @Test - public void testDestroySerialGatewaySenderOnSingleNode() { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createSenderCaches(lnPort); - - createSenderVM4(); - createSenderVM5(); - - createReceiverRegions(); - - createSenderRegions(); - - startSenderInVMs("ln", vm4, vm5); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", - 10 )); - - //before destroying, stop the sender - vm4.invoke(() -> WANTestBase.stopSender( "ln" )); - - vm4.invoke(() -> WANTestBase.removeSenderFromTheRegion( "ln", getTestMethodName() + "_RR" )); - - vm4.invoke(() -> WANTestBase.destroySender( "ln" )); - - vm4.invoke(() -> WANTestBase.verifySenderDestroyed( "ln", false )); - vm5.invoke(() -> WANTestBase.verifySenderRunningState( "ln" )); - } - - /** - * Since the sender is attached to a region and in use, it can not be destroyed. - * Hence, exception is thrown by the sender API. - */ - @Test - public void testDestroySerialGatewaySenderExceptionScenario() { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createSenderCaches(lnPort); - - createSenderVM4(); - createSenderVM5(); - - createReceiverRegions(); - - createSenderRegions(); - - startSenderInVMs("ln", vm4, vm5); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", - 10 )); - - try { - vm4.invoke(() -> WANTestBase.destroySender( "ln" )); - } catch (RMIException e) { - assertTrue("Cause of the exception should be GatewaySenderException", e.getCause() instanceof GatewaySenderException); - } - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 10 )); - } - - - @Test - public void testGatewaySenderNotRegisteredAsCacheServer() { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5); - - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 10, false, true, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 10, false, true, null, true )); - - startSenderInVMs("ln", vm4, vm5); - - SerializableRunnable check = new SerializableRunnable("assert no cache servers") { - public void run() { - InternalLocator inl = (InternalLocator)Locator.getLocator(); - ServerLocator server = inl.getServerLocatorAdvisee(); - LogWriterUtils.getLogWriter().info("Server load map is " + server.getLoadMap()); - assertTrue("expected an empty map but found " + server.getLoadMap(), - server.getLoadMap().isEmpty()); - QueueConnectionRequest request = new QueueConnectionRequest( - ClientProxyMembershipID.getNewProxyMembership(InternalDistributedSystem.getConnectedInstance()), - 1, new HashSet<>(), "", false); - QueueConnectionResponse response = (QueueConnectionResponse)server.processRequest(request); - assertTrue("expected no servers but found " + response.getServers(), - response.getServers().isEmpty()); - } - }; - vm0.invoke(check); - vm1.invoke(check); - - } - - - - public static void verifySenderPausedState(String senderId) { - Set<GatewaySender> senders = cache.getGatewaySenders(); - AbstractGatewaySender sender = null; - for (GatewaySender s : senders) { - if (s.getId().equals(senderId)) { - sender = (AbstractGatewaySender)s; - break; - } - } - assertTrue(sender.isPaused()); - } - - public static void verifySenderResumedState(String senderId) { - Set<GatewaySender> senders = cache.getGatewaySenders(); - AbstractGatewaySender sender = null; - for (GatewaySender s : senders) { - if (s.getId().equals(senderId)) { - sender = (AbstractGatewaySender)s; - break; - } - } - assertFalse(sender.isPaused()); - assertTrue(sender.isRunning()); - } - - public static void verifySenderStoppedState(String senderId) { - Set<GatewaySender> senders = cache.getGatewaySenders(); - AbstractGatewaySender sender = null; - for (GatewaySender s : senders) { - if (s.getId().equals(senderId)) { - sender = (AbstractGatewaySender)s; - break; - } - } - assertFalse(sender.isRunning()); - assertFalse(sender.isPaused()); - } - - public static void verifyGatewaySenderOperations(String senderId) { - Set<GatewaySender> senders = cache.getGatewaySenders(); - GatewaySender sender = null; - for (GatewaySender s : senders) { - if (s.getId().equals(senderId)) { - sender = s; - break; - } - } - assertFalse(sender.isPaused()); - assertFalse(((AbstractGatewaySender)sender).isRunning()); - sender.pause(); - sender.resume(); - sender.stop(); - } -}
