http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java new file mode 100644 index 0000000..b754254 --- /dev/null +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java @@ -0,0 +1,499 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.wan.parallel; + +import org.junit.Ignore; +import org.junit.experimental.categories.Category; +import org.junit.Test; + +import static org.junit.Assert.*; + +import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase; +import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase; +import com.gemstone.gemfire.test.junit.categories.DistributedTest; + +import static com.gemstone.gemfire.test.dunit.Wait.*; +import static com.gemstone.gemfire.test.dunit.IgnoredException.*; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; + +import org.junit.experimental.categories.Category; + +import com.gemstone.gemfire.internal.cache.wan.WANTestBase; +import com.gemstone.gemfire.test.dunit.AsyncInvocation; +import com.gemstone.gemfire.test.dunit.VM; +import com.gemstone.gemfire.test.junit.categories.FlakyTest; + +@Category(DistributedTest.class) +public class ParallelWANStatsDUnitTest extends WANTestBase{ + + private static final int NUM_PUTS = 100; + private static final long serialVersionUID = 1L; + + private String testName; + + public ParallelWANStatsDUnitTest() { + super(); + } + + @Override + protected final void postSetUpWANTestBase() throws Exception { + this.testName = getTestMethodName(); + } + + @Test + public void testPartitionedRegionParallelPropagation_BeforeDispatch() throws Exception { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createSendersWithConflation(lnPort); + + createSenderPRs(0); + + startPausedSenders(); + + createReceiverPR(vm2, 1); + createReceiverPR(vm3, 1); + + putKeyValues(); + + ArrayList<Integer> v4List = (ArrayList<Integer>)vm4.invoke(() -> WANTestBase.getSenderStats( "ln", NUM_PUTS )); + ArrayList<Integer> v5List = (ArrayList<Integer>)vm5.invoke(() -> WANTestBase.getSenderStats( "ln", NUM_PUTS )); + ArrayList<Integer> v6List = (ArrayList<Integer>)vm6.invoke(() -> WANTestBase.getSenderStats( "ln", NUM_PUTS )); + ArrayList<Integer> v7List = (ArrayList<Integer>)vm7.invoke(() -> WANTestBase.getSenderStats( "ln", NUM_PUTS )); + + assertEquals(NUM_PUTS, v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0) ); //queue size + assertEquals(NUM_PUTS, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); //eventsReceived + assertEquals(NUM_PUTS, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)); //events queued + assertEquals(0, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); //events distributed + assertEquals(0, v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4)); //batches distributed + assertEquals(0, v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)); //batches redistributed + + } + + @Test + public void testPartitionedRegionParallelPropagation_AfterDispatch_NoRedundancy() throws Exception { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2); + createReceiverInVMs(vm2); + + createSenders(lnPort); + + createReceiverPR(vm2, 0); + + createSenderPRs(0); + + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.doPuts( testName, + NUM_PUTS )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + testName, NUM_PUTS )); + + + ArrayList<Integer> v4List = (ArrayList<Integer>)vm4.invoke(() -> WANTestBase.getSenderStats( "ln", 0 )); + ArrayList<Integer> v5List = (ArrayList<Integer>)vm5.invoke(() -> WANTestBase.getSenderStats( "ln", 0 )); + ArrayList<Integer> v6List = (ArrayList<Integer>)vm6.invoke(() -> WANTestBase.getSenderStats( "ln", 0 )); + ArrayList<Integer> v7List = (ArrayList<Integer>)vm7.invoke(() -> WANTestBase.getSenderStats( "ln", 0 )); + + assertEquals(0, v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0) ); //queue size + assertEquals(NUM_PUTS, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); //eventsReceived + assertEquals(NUM_PUTS, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)); //events queued + assertEquals(NUM_PUTS, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); //events distributed + assertTrue(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 10); //batches distributed + assertEquals(0, v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)); //batches redistributed + + vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(10, NUM_PUTS, NUM_PUTS )); + } + + @Test + public void testPartitionedRegionParallelPropagation_AfterDispatch_Redundancy_3() throws Exception { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2); + createReceiverInVMs(vm2); + + createSenders(lnPort); + + createReceiverPR(vm2, 0); + + createSenderPRs(3); + + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.doPuts( testName, + NUM_PUTS )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + testName, NUM_PUTS )); + + ArrayList<Integer> v4List = (ArrayList<Integer>)vm4.invoke(() -> WANTestBase.getSenderStats( "ln", 0 )); + ArrayList<Integer> v5List = (ArrayList<Integer>)vm5.invoke(() -> WANTestBase.getSenderStats( "ln", 0 )); + ArrayList<Integer> v6List = (ArrayList<Integer>)vm6.invoke(() -> WANTestBase.getSenderStats( "ln", 0 )); + ArrayList<Integer> v7List = (ArrayList<Integer>)vm7.invoke(() -> WANTestBase.getSenderStats( "ln", 0 )); + + assertEquals(0, v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0) ); //queue size + assertEquals(400, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); //eventsReceived + assertEquals(400, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)); //events queued + assertEquals(NUM_PUTS, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); //events distributed + assertTrue(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 10); //batches distributed + assertEquals(0, v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)); //batches redistributed + + vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(10, NUM_PUTS, NUM_PUTS )); + } + + @Test + public void testWANStatsTwoWanSites_Bug44331() throws Exception { + Integer lnPort = createFirstLocatorWithDSId(1); + Integer nyPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + Integer tkPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 3, lnPort )); + + createCacheInVMs(nyPort, vm2); + createCacheInVMs(tkPort, vm3); + createReceiverInVMs(vm2); + createReceiverInVMs(vm3); + + vm4.invoke(() -> WANTestBase.createCache(lnPort )); + + vm4.invoke(() -> WANTestBase.createSender( "ln1", + 2, true, 100, 10, false, false, null, true )); + + vm4.invoke(() -> WANTestBase.createSender( "ln2", + 3, true, 100, 10, false, false, null, true )); + + createReceiverPR(vm2, 0); + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + testName, null, 0, 10, isOffHeap() )); + + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + testName, "ln1,ln2", 0, 10, isOffHeap() )); + + vm4.invoke(() -> WANTestBase.startSender( "ln1" )); + + vm4.invoke(() -> WANTestBase.startSender( "ln2" )); + + vm4.invoke(() -> WANTestBase.doPuts( testName, + NUM_PUTS )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + testName, NUM_PUTS )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + testName, NUM_PUTS )); + + ArrayList<Integer> v4Sender1List = (ArrayList<Integer>)vm4.invoke(() -> WANTestBase.getSenderStats( "ln1", 0 )); + ArrayList<Integer> v4Sender2List = (ArrayList<Integer>)vm4.invoke(() -> WANTestBase.getSenderStats( "ln2", 0 )); + + assertEquals(0, v4Sender1List.get(0).intValue()); //queue size + assertEquals(NUM_PUTS, v4Sender1List.get(1).intValue()); //eventsReceived + assertEquals(NUM_PUTS, v4Sender1List.get(2).intValue()); //events queued + assertEquals(NUM_PUTS, v4Sender1List.get(3).intValue()); //events distributed + assertTrue(v4Sender1List.get(4).intValue()>=10); //batches distributed + assertEquals(0, v4Sender1List.get(5).intValue()); //batches redistributed + + assertEquals(0, v4Sender2List.get(0).intValue()); //queue size + assertEquals(NUM_PUTS, v4Sender2List.get(1).intValue()); //eventsReceived + assertEquals(NUM_PUTS, v4Sender2List.get(2).intValue()); //events queued + assertEquals(NUM_PUTS, v4Sender2List.get(3).intValue()); //events distributed + assertTrue(v4Sender2List.get(4).intValue()>=10); //batches distributed + assertEquals(0, v4Sender2List.get(5).intValue()); //batches redistributed + + vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(10, NUM_PUTS, NUM_PUTS )); + vm3.invoke(() -> WANTestBase.checkGatewayReceiverStats(10, NUM_PUTS, NUM_PUTS )); + } + + @Test + public void testParallelPropagationHA() throws Exception { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2); + createReceiverInVMs(vm2); + + createSenders(lnPort); + + createReceiverPR(vm2, 0); + + createSenderPRs(3); + + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + AsyncInvocation inv1 = vm5.invokeAsync(() -> WANTestBase.doPuts( testName, 1000 )); + pause(200); + AsyncInvocation inv2 = vm4.invokeAsync(() -> WANTestBase.killSender()); + inv1.join(); + inv2.join(); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + testName, 1000 )); + + ArrayList<Integer> v5List = (ArrayList<Integer>)vm5.invoke(() -> WANTestBase.getSenderStats( "ln", 0)); + ArrayList<Integer> v6List = (ArrayList<Integer>)vm6.invoke(() -> WANTestBase.getSenderStats( "ln", 0)); + ArrayList<Integer> v7List = (ArrayList<Integer>)vm7.invoke(() -> WANTestBase.getSenderStats( "ln", 0)); + + assertEquals(0, v5List.get(0) + v6List.get(0) + v7List.get(0) ); //queue size + int receivedEvents = v5List.get(1) + v6List.get(1) + v7List.get(1); + //We may see a single retried event on all members due to the kill + assertTrue("Received " + receivedEvents, 3000 <= receivedEvents && 3003 >= receivedEvents); //eventsReceived + int queuedEvents = v5List.get(2) + v6List.get(2) + v7List.get(2); + assertTrue("Queued " + queuedEvents, 3000 <= queuedEvents && 3003 >= queuedEvents); //eventsQueued + //assertTrue(10000 <= v5List.get(3) + v6List.get(3) + v7List.get(3)); //events distributed : its quite possible that vm4 has distributed some of the events + //assertTrue(v5List.get(4) + v6List.get(4) + v7List.get(4) > 1000); //batches distributed : its quite possible that vm4 has distributed some of the batches. + assertEquals(0, v5List.get(5) + v6List.get(5) + v7List.get(5)); //batches redistributed + + vm2.invoke(() -> WANTestBase.checkGatewayReceiverStatsHA(NUM_PUTS, 1000, 1000 )); + } + + /** + * 1 region and sender configured on local site and 1 region and a + * receiver configured on remote site. Puts to the local region are in progress. + * Remote region is destroyed in the middle. + * + * @throws Exception + */ + @Test + public void testParallelPropagationWithRemoteRegionDestroy() throws Exception { + addIgnoredException("RegionDestroyedException"); + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2); + createReceiverPR(vm2, 0); + createReceiverInVMs(vm2); + + createSenders(lnPort); + + vm2.invoke(() -> WANTestBase.addCacheListenerAndDestroyRegion( + testName)); + + createSenderPRs(0); + + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + //start puts in RR_1 in another thread + vm4.invoke(() -> WANTestBase.doPuts( testName, 2000 )); + + //verify that all is well in local site. All the events should be present in local region + vm4.invoke(() -> WANTestBase.validateRegionSize( + testName, 2000 )); + + ArrayList<Integer> v4List = (ArrayList<Integer>)vm4.invoke(() -> WANTestBase.getSenderStats( "ln", -1)); + ArrayList<Integer> v5List = (ArrayList<Integer>)vm5.invoke(() -> WANTestBase.getSenderStats( "ln", -1)); + ArrayList<Integer> v6List = (ArrayList<Integer>)vm6.invoke(() -> WANTestBase.getSenderStats( "ln", -1)); + ArrayList<Integer> v7List = (ArrayList<Integer>)vm7.invoke(() -> WANTestBase.getSenderStats( "ln", -1)); + + + assertTrue(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 1); //batches distributed : its quite possible that vm4 has distributed some of the batches. + assertTrue(v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5) >= 1); //batches redistributed + } + + @Test + public void testParallelPropagationWithFilter() throws Exception { + + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2,lnPort )); + + createCacheInVMs(nyPort, vm2); + + createReceiverPR(vm2, 1); + + createReceiverInVMs(vm2); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + createSenderPRs(0); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, + new MyGatewayEventFilter(), true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, + new MyGatewayEventFilter(), true )); + vm6.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, + new MyGatewayEventFilter(), true )); + vm7.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, + new MyGatewayEventFilter(), true )); + + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + + + vm4.invoke(() -> WANTestBase.doPuts( testName, 1000 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + testName, 800 )); + + ArrayList<Integer> v4List = (ArrayList<Integer>)vm4.invoke(() -> WANTestBase.getSenderStats( "ln", 0 )); + ArrayList<Integer> v5List = (ArrayList<Integer>)vm5.invoke(() -> WANTestBase.getSenderStats( "ln", 0 )); + ArrayList<Integer> v6List = (ArrayList<Integer>)vm6.invoke(() -> WANTestBase.getSenderStats( "ln", 0 )); + ArrayList<Integer> v7List = (ArrayList<Integer>)vm7.invoke(() -> WANTestBase.getSenderStats( "ln", 0 )); + + assertEquals(0, v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0) ); //queue size + assertEquals(1000, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); //eventsReceived + assertEquals(900, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)); //events queued + assertEquals(800, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); //events distributed + assertTrue(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 80); //batches distributed + assertEquals(0, v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)); //batches redistributed + assertEquals(200, v4List.get(6) + v5List.get(6) + v6List.get(6) + v7List.get(6)); //events filtered + + vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(80, 800, 800)); + } + + @Test + public void testParallelPropagationConflation() throws Exception { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2); + createReceiverInVMs(vm2); + + createSendersWithConflation(lnPort); + + createSenderPRs(0); + + startPausedSenders(); + + createReceiverPR(vm2, 1); + + Map keyValues = putKeyValues(); + final Map updateKeyValues = new HashMap(); + for(int i=0;i<50;i++) { + updateKeyValues.put(i, i+"_updated"); + } + + vm4.invoke(() -> WANTestBase.putGivenKeyValue( testName, updateKeyValues )); + + vm4.invoke(() -> WANTestBase.checkQueueSize( "ln", keyValues.size() + updateKeyValues.size() /*creates aren't conflated*/ )); + + // Do the puts again. Since these are updates, the previous updates will be conflated. + vm4.invoke(() -> WANTestBase.putGivenKeyValue( testName, updateKeyValues )); + + vm4.invoke(() -> WANTestBase.checkQueueSize( "ln", keyValues.size() + updateKeyValues.size() /*creates aren't conflated*/ )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + testName, 0 )); + + vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(0, 0, 0)); + + vm4.invoke(() -> WANTestBase.resumeSender( "ln" )); + vm5.invoke(() -> WANTestBase.resumeSender( "ln" )); + vm6.invoke(() -> WANTestBase.resumeSender( "ln" )); + vm7.invoke(() -> WANTestBase.resumeSender( "ln" )); + + keyValues.putAll(updateKeyValues); + vm2.invoke(() -> WANTestBase.validateRegionSize( + testName, keyValues.size() )); + + vm2.invoke(() -> WANTestBase.validateRegionContents( + testName, keyValues )); + + vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(0, 150, NUM_PUTS)); + + vm4.invoke(() -> WANTestBase.checkQueueSize( "ln", 0 )); + + ArrayList<Integer> v4List = (ArrayList<Integer>)vm4.invoke(() -> WANTestBase.getSenderStats( "ln", 0 )); + ArrayList<Integer> v5List = (ArrayList<Integer>)vm5.invoke(() -> WANTestBase.getSenderStats( "ln", 0 )); + ArrayList<Integer> v6List = (ArrayList<Integer>)vm6.invoke(() -> WANTestBase.getSenderStats( "ln", 0 )); + ArrayList<Integer> v7List = (ArrayList<Integer>)vm7.invoke(() -> WANTestBase.getSenderStats( "ln", 0 )); + + assertEquals(0, v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0) ); //queue size + assertEquals(200, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); //eventsReceived + assertEquals(200, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)); //events queued + assertEquals(150, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); //events distributed + assertTrue(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 10); //batches distributed + assertEquals(0, v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)); //batches redistributed + assertEquals(50, v4List.get(7) + v5List.get(7) + v6List.get(7) + v7List.get(7)); //events conflated + + } + + protected Map putKeyValues() { + final Map keyValues = new HashMap(); + for(int i=0; i< NUM_PUTS; i++) { + keyValues.put(i, i); + } + + vm4.invoke(() -> WANTestBase.putGivenKeyValue( testName, keyValues )); + + vm4.invoke(() -> WANTestBase.checkQueueSize( "ln", keyValues.size() )); + + return keyValues; + } + + protected void createReceiverPR(VM vm, int redundancy) { + vm.invoke(() -> WANTestBase.createPartitionedRegion( + testName, null, redundancy, 10, isOffHeap() )); + } + + protected void createSenderPRs(int redundancy) { + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + testName, "ln", redundancy, 10, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPartitionedRegion( + testName, "ln", redundancy, 10, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createPartitionedRegion( + testName, "ln", redundancy, 10, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createPartitionedRegion( + testName, "ln", redundancy, 10, isOffHeap() )); + } + + protected void startPausedSenders() { + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + vm4.invoke(() ->pauseSender( "ln" )); + vm5.invoke(() ->pauseSender( "ln" )); + vm6.invoke(() ->pauseSender( "ln" )); + vm7.invoke(() ->pauseSender( "ln" )); + } + + protected void createSendersWithConflation(Integer lnPort) { + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, true, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, true, false, null, true )); + vm6.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, true, false, null, true )); + vm7.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, true, false, null, true )); + } + + protected void createSenders(Integer lnPort) { + vm4.invoke(() -> WANTestBase.createCache(lnPort )); + vm5.invoke(() -> WANTestBase.createCache(lnPort )); + vm6.invoke(() -> WANTestBase.createCache(lnPort )); + vm7.invoke(() -> WANTestBase.createCache(lnPort )); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + vm6.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + vm7.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderDistributedDeadlockDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderDistributedDeadlockDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderDistributedDeadlockDUnitTest.java new file mode 100644 index 0000000..b38d35b --- /dev/null +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderDistributedDeadlockDUnitTest.java @@ -0,0 +1,405 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.wan.serial; + +import org.junit.experimental.categories.Category; +import org.junit.Test; + +import static org.junit.Assert.*; + +import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase; +import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase; +import com.gemstone.gemfire.test.junit.categories.DistributedTest; + +import com.gemstone.gemfire.cache.CacheFactory; +import com.gemstone.gemfire.cache.CacheTransactionManager; +import com.gemstone.gemfire.cache.Region; +import com.gemstone.gemfire.cache.execute.*; +import com.gemstone.gemfire.distributed.DistributedSystem; +import com.gemstone.gemfire.internal.cache.wan.WANTestBase; +import com.gemstone.gemfire.test.dunit.AsyncInvocation; +import com.gemstone.gemfire.test.dunit.Wait; + +//The tests here are to validate changes introduced because a distributed deadlock +//was found that caused issues for a production customer. +// +//There are 4 tests which use sender gateways with primaries on different +//JVM's. Two tests use replicated and two use partition regions and the +//the tests vary the conserve-sockets. +// +//currently the 4th test using PR, conserve-sockets=true hangs/fails and is commented +//out to prevent issues +@Category(DistributedTest.class) +public class SerialGatewaySenderDistributedDeadlockDUnitTest extends WANTestBase { + + public SerialGatewaySenderDistributedDeadlockDUnitTest() { + super(); + } + + //Uses replicated regions and conserve-sockets=false + @Test + public void testPrimarySendersOnDifferentVMsReplicated() throws Exception { + + Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstPeerLocator(1)); + + Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); + + createCachesWith(Boolean.FALSE, nyPort, lnPort); + + createSerialSenders(); + + createReplicatedRegions(nyPort); + + //get one primary sender on vm4 and another primary on vm5 + //the startup order matters here + startSerialSenders(); + + //exercise region and gateway operations with different messaging + exerciseWANOperations(); + AsyncInvocation invVM4transaction = vm4.invokeAsync(() -> WANTestBase.doTxPuts(getTestMethodName() + "_RR")); + AsyncInvocation invVM5transaction = vm5.invokeAsync(() -> WANTestBase.doTxPuts(getTestMethodName() + "_RR")); + AsyncInvocation invVM4 = vm4.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1000)); + AsyncInvocation invVM5 = vm5.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1000)); + + exerciseFunctions(); + + try { + invVM4transaction.join(); + invVM5transaction.join(); + invVM4.join(); + invVM5.join(); + + } catch (InterruptedException e) { + e.printStackTrace(); + fail(); + } + } + + //Uses partitioned regions and conserve-sockets=false + @Test + public void testPrimarySendersOnDifferentVMsPR() throws Exception { + Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstPeerLocator(1)); + + Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); + + createCachesWith(Boolean.FALSE, nyPort, lnPort); + + createSerialSenders(); + + createPartitionedRegions(nyPort); + + startSerialSenders(); + + exerciseWANOperations(); + AsyncInvocation invVM4transaction + = vm4.invokeAsync(() -> SerialGatewaySenderDistributedDeadlockDUnitTest.doTxPutsPR(getTestMethodName() + "_RR", 100, 1000)); + AsyncInvocation invVM5transaction + = vm5.invokeAsync(() -> SerialGatewaySenderDistributedDeadlockDUnitTest.doTxPutsPR(getTestMethodName() + "_RR", 100, 1000)); + + AsyncInvocation invVM4 = vm4.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1000)); + AsyncInvocation invVM5 = vm5.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1000)); + + exerciseFunctions(); + + try { + invVM4transaction.join(); + invVM5transaction.join(); + invVM4.join(); + invVM5.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + fail(); + } + } + + //Uses replicated regions and conserve-sockets=true + @Test + public void testPrimarySendersOnDifferentVMsReplicatedSocketPolicy() throws Exception { + + Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstPeerLocator(1)); + + Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); + + createCachesWith(Boolean.TRUE, nyPort, lnPort); + + createSerialSenders(); + + createReplicatedRegions(nyPort); + + //get one primary sender on vm4 and another primary on vm5 + //the startup order matters here + startSerialSenders(); + + //exercise region and gateway operations with messaging + exerciseWANOperations(); + AsyncInvocation invVM4transaction = vm4.invokeAsync(() -> WANTestBase.doTxPuts(getTestMethodName() + "_RR")); + AsyncInvocation invVM5transaction = vm5.invokeAsync(() -> WANTestBase.doTxPuts(getTestMethodName() + "_RR")); + + AsyncInvocation invVM4 = vm4.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1000)); + AsyncInvocation invVM5 = vm5.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1000)); + + exerciseFunctions(); + + try { + invVM4transaction.join(); + invVM5transaction.join(); + invVM4.join(); + invVM5.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + fail(); + } + } + + //Uses partitioned regions and conserve-sockets=true + //this always causes a distributed deadlock + @Test + public void testPrimarySendersOnDifferentVMsPRSocketPolicy() throws Exception { + Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstPeerLocator(1)); + + Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); + + createCachesWith(Boolean.TRUE, nyPort, lnPort); + + createSerialSenders(); + + createPartitionedRegions(nyPort); + + startSerialSenders(); + + exerciseWANOperations(); + AsyncInvocation invVM4transaction + = vm4.invokeAsync(() -> SerialGatewaySenderDistributedDeadlockDUnitTest.doTxPutsPR(getTestMethodName() + "_RR", 100, 1000)); + AsyncInvocation invVM5transaction + = vm5.invokeAsync(() -> SerialGatewaySenderDistributedDeadlockDUnitTest.doTxPutsPR(getTestMethodName() + "_RR", 100, 1000)); + + AsyncInvocation invVM4 = vm4.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1000)); + AsyncInvocation invVM5 = vm5.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1000)); + + exerciseFunctions(); + + try { + invVM4transaction.join(); + invVM5transaction.join(); + invVM4.join(); + invVM5.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + fail(); + } + } + + //************************************************************************** + //Utility methods used by tests + //************************************************************************** + private void createReplicatedRegions(Integer nyPort) throws Exception { + //create receiver + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, false)); + vm2.invoke(() -> WANTestBase.createReceiver()); + + //create senders + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln1,ln2", false)); + + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln1,ln2", false)); + } + + private void createCachesWith(Boolean socketPolicy, Integer nyPort, Integer lnPort) { + vm2.invoke(() -> WANTestBase.createCacheConserveSockets(socketPolicy, nyPort)); + + vm4.invoke(() -> WANTestBase.createCacheConserveSockets(socketPolicy, lnPort)); + + vm5.invoke(() -> WANTestBase.createCacheConserveSockets(socketPolicy, lnPort)); + } + + private void exerciseFunctions() throws Exception { + //do function calls that use a shared connection + for (int x = 0; x < 1000; x++) { + //setting it to Boolean.TRUE it should pass the test + vm4.invoke(() -> SerialGatewaySenderDistributedDeadlockDUnitTest.doFunctionPuts(getTestMethodName() + "_RR", 1, Boolean.TRUE)); + vm5.invoke(() -> SerialGatewaySenderDistributedDeadlockDUnitTest.doFunctionPuts(getTestMethodName() + "_RR", 1, Boolean.TRUE)); + } + for (int x = 0; x < 1000; x++) { + //setting the Boolean.FALSE below will cause a deadlock in some GFE versions + //setting it to Boolean.TRUE as above it should pass the test + //this is similar to the customer found distributed deadlock + vm4.invoke(() -> SerialGatewaySenderDistributedDeadlockDUnitTest.doFunctionPuts(getTestMethodName() + "_RR", 1, Boolean.FALSE)); + vm5.invoke(() -> SerialGatewaySenderDistributedDeadlockDUnitTest.doFunctionPuts(getTestMethodName() + "_RR", 1, Boolean.FALSE)); + } + } + + private void createPartitionedRegions(Integer nyPort) throws Exception { + //create remote receiver + vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_RR", + "", 0, 113, false)); + + vm2.invoke(() -> WANTestBase.createReceiver()); + + //create sender vms + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_RR", "ln1,ln2", 1, 113, false)); + + vm5.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_RR", "ln1,ln2", 1, 113, false)); + } + + private void exerciseWANOperations() throws Exception { + //note - some of these should be made async to truly exercise the + //messaging between the WAN gateways and members + + //exercise region and gateway operations + vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 100)); + vm5.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 100)); + Wait.pause(2000); //wait for events to propagate + vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 100)); + vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 100)); + vm5.invoke(() -> WANTestBase.doDestroys(getTestMethodName() + "_RR", 100)); + Wait.pause(2000);//wait for events to propagate + vm5.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 0)); + vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 0)); + vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 100)); + vm5.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 100)); + Wait.pause(2000); //wait for events to propagate + vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 100)); + vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 100)); + vm4.invoke(() -> SerialGatewaySenderDistributedDeadlockDUnitTest.doInvalidates(getTestMethodName() + "_RR", 100, 100)); + vm4.invoke(() -> WANTestBase.doPutAll(getTestMethodName() + "_RR", 100, 10)); + vm5.invoke(() -> WANTestBase.doPutAll(getTestMethodName() + "_RR", 100, 10)); + Wait.pause(2000);//wait for events to propagate + vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 1000)); + vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 1000)); + vm4.invoke(() -> WANTestBase.doDestroys(getTestMethodName() + "_RR", 1000)); + Wait.pause(2000);//wait for events to propagate + vm5.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 0)); + vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 0)); + vm4.invoke(() -> WANTestBase.doPutsPDXSerializable(getTestMethodName() + "_RR", 100)); + Wait.pause(2000); + vm5.invoke(() -> WANTestBase.validateRegionSize_PDX(getTestMethodName() + "_RR", 100)); + vm2.invoke(() -> WANTestBase.validateRegionSize_PDX(getTestMethodName() + "_RR", 100)); + } + + private void startSerialSenders() throws Exception { + //get one primary sender on vm4 and another primary on vm5 + //the startup order matters here so that primaries are + //on different JVM's + vm4.invoke(() -> WANTestBase.startSender("ln1")); + + vm5.invoke(() -> WANTestBase.startSender("ln2")); + + //start secondaries + vm5.invoke(() -> WANTestBase.startSender("ln1")); + + vm4.invoke(() -> WANTestBase.startSender("ln2")); + } + + private void createSerialSenders() throws Exception { + + vm4.invoke(() -> WANTestBase.createSender("ln1", 2, + false, 100, 10, false, false, null, true)); + + vm5.invoke(() -> WANTestBase.createSender("ln1", 2, + false, 100, 10, false, false, null, true)); + + vm4.invoke(() -> WANTestBase.createSender("ln2", 2, + false, 100, 10, false, false, null, true)); + + vm5.invoke(() -> WANTestBase.createSender("ln2", 2, + false, 100, 10, false, false, null, true)); + } + + public static void doFunctionPuts(String name, int num, Boolean useThreadOwnedSocket) throws Exception { + Region region = CacheFactory.getAnyInstance().getRegion(name); + FunctionService.registerFunction(new TestFunction()); + Execution exe = FunctionService.onRegion(region); + for (int x = 0; x < num; x++) { + exe.withArgs(useThreadOwnedSocket).execute("com.gemstone.gemfire.internal.cache.wan.serial.TestFunction"); + } + } + + public static void doTxPutsPR(String regionName, int numPuts, int size) throws Exception { + Region r = cache.getRegion(Region.SEPARATOR + regionName); + CacheTransactionManager mgr = cache.getCacheTransactionManager(); + for (int x = 0; x < numPuts; x++) { + int temp = (int) (Math.floor(Math.random() * size)); + try { + mgr.begin(); + r.put(temp, temp); + mgr.commit(); + } catch (com.gemstone.gemfire.cache.TransactionDataNotColocatedException txe) { + //ignore colocation issues or primary bucket issues + } catch (com.gemstone.gemfire.cache.CommitConflictException cce) { + //ignore - conflicts are ok and expected + } + } + } + + public static void doInvalidates(String regionName, int numInvalidates, int size) throws Exception { + Region r = cache.getRegion(Region.SEPARATOR + regionName); + for (int x = 0; x < numInvalidates; x++) { + int temp = (int) (Math.floor(Math.random() * size)); + try { + if (r.containsValueForKey(temp)) { + r.invalidate(temp); + } + } catch (com.gemstone.gemfire.cache.EntryNotFoundException entryNotFoundException) { + //ignore as an entry may not exist + } + } + } + +} + +class TestFunction implements Function { + + @Override + public boolean hasResult() { + return false; + } + + @Override + public void execute(FunctionContext fc) { + boolean option = (Boolean) fc.getArguments(); + if (option) { + DistributedSystem.setThreadsSocketPolicy(false); + } + RegionFunctionContext context = (RegionFunctionContext) fc; + Region local = context.getDataSet(); + local.put(randKeyValue(10), randKeyValue(10000)); + } + + @Override + public String getId() { + return this.getClass().getName(); + } + + @Override + public boolean optimizeForWrite() { + return false; + } + + @Override + public boolean isHA() { + return false; + } + + private int randKeyValue(int size) { + double temp = Math.floor(Math.random() * size); + return (int) temp; + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventListenerDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventListenerDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventListenerDUnitTest.java new file mode 100644 index 0000000..8c255c1 --- /dev/null +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventListenerDUnitTest.java @@ -0,0 +1,383 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.wan.serial; + +import com.jayway.awaitility.Awaitility; +import org.junit.Ignore; +import org.junit.experimental.categories.Category; +import org.junit.Test; + +import static org.junit.Assert.*; + +import com.gemstone.gemfire.test.junit.categories.DistributedTest; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.junit.experimental.categories.Category; + +import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener; +import com.gemstone.gemfire.cache.wan.GatewaySender; +import com.gemstone.gemfire.internal.AvailablePortHelper; +import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender; +import com.gemstone.gemfire.internal.cache.wan.MyGatewaySenderEventListener; +import com.gemstone.gemfire.internal.cache.wan.MyGatewaySenderEventListener2; +import com.gemstone.gemfire.internal.cache.wan.WANTestBase; +import com.gemstone.gemfire.test.dunit.Wait; +import com.gemstone.gemfire.test.dunit.WaitCriterion; +import com.gemstone.gemfire.test.junit.categories.FlakyTest; + +@Category(DistributedTest.class) +public class SerialGatewaySenderEventListenerDUnitTest extends WANTestBase { + + private static final long serialVersionUID = 1L; + + public SerialGatewaySenderEventListenerDUnitTest() { + super(); + } + + /** + * Test validates whether the listener attached receives all the events. + * this test hangs after the Darrel's checkin 36685. Need to work with Darrel.Commenting it out so that test suit will not hang + */ + @Ignore + @Test + public void testGatewaySenderEventListenerInvocationWithoutLocator() { + int mPort = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); + vm4.invoke(() -> WANTestBase.createCacheWithoutLocator(mPort )); + vm5.invoke(() -> WANTestBase.createCacheWithoutLocator(mPort )); + vm6.invoke(() -> WANTestBase.createCacheWithoutLocator(mPort )); + vm7.invoke(() -> WANTestBase.createCacheWithoutLocator(mPort )); + + vm4.invoke(() -> WANTestBase.createSenderWithListener( "ln", 2, + false, 100, 10, false, false, null, false, true)); + vm5.invoke(() -> WANTestBase.createSenderWithListener( "ln", 2, + false, 100, 10, false, false, null, false, true)); + + startSenderInVMs("ln", vm4, vm5); + + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + + final Map keyValues = new HashMap(); + for(int i=0; i< 1000; i++) { + keyValues.put(i, i); + } + + vm4.invoke(() -> WANTestBase.putGivenKeyValue( getTestMethodName() + "_RR", + keyValues )); + + vm4.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", keyValues.size() )); + + vm5.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", keyValues.size() )); + + vm4.invoke(() -> WANTestBase.printEventListenerMap()); + vm5.invoke(() -> WANTestBase.printEventListenerMap()); + + fail("tried to invoke missing method"); +// vm4.invoke(() -> SerialGatewaySenderEventListenerDUnitTest.validateReceivedEventsMapSizeListener1("ln", keyValues )); + } + + /** + * Test validates whether the listener attached receives all the events. + */ + @Test + public void testGatewaySenderEventListenerInvocation() { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createSenderWithListener( "ln", 2, + false, 100, 10, false, false, null, false, true)); + vm5.invoke(() -> WANTestBase.createSenderWithListener( "ln", 2, + false, 100, 10, false, false, null, false, true)); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5); + + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + + final Map keyValues = new HashMap(); + for(int i=0; i< 1000; i++) { + keyValues.put(i, i); + } + + vm4.invoke(() -> WANTestBase.putGivenKeyValue( getTestMethodName() + "_RR", + keyValues )); + + vm4.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", keyValues.size() )); + + vm5.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", keyValues.size() )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 0 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 0 )); + + vm4.invoke(() -> SerialGatewaySenderEventListenerDUnitTest.validateReceivedEventsMapSizeListener1("ln", keyValues )); + } + + /** + * Test validates whether the listener attached receives all the events. + * When there are 2 listeners attached to the GatewaySender. + */ + @Test + public void testGatewaySender2EventListenerInvocation() { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createSenderWithListener( "ln", 2, + false, 100, 10, false, false, null, true, true)); + vm5.invoke(() -> WANTestBase.createSenderWithListener( "ln", 2, + false, 100, 10, false, false, null, true, true)); + + startSenderInVMs("ln", vm4, vm5); + + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + + final Map keyValues = new HashMap(); + for(int i=0; i< 1000; i++) { + keyValues.put(i, i); + } + + + vm4.invoke(() -> WANTestBase.putGivenKeyValue( getTestMethodName() + "_RR", + keyValues )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 0 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 0 )); + + // TODO: move validateReceivedEventsMapSizeListener2 to a shared util class + vm4.invoke(() -> SerialGatewaySenderEventListenerDUnitTest.validateReceivedEventsMapSizeListener2("ln", keyValues )); + } + + /** + * Test validates whether the PoolImpl is created. Ideally when a listener is attached + * pool should not be created. + */ + @Test + public void testGatewaySenderEventListenerPoolImpl() { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4); + + vm4.invoke(() -> WANTestBase.createSenderWithListener( "ln", 2, + false, 100, 10, false, false, null, false, false )); + + vm4.invoke(() -> SerialGatewaySenderEventListenerDUnitTest.validateNoPoolCreation("ln" )); + } + + // Test start/stop/resume on listener invocation + //this test hangs after the Darrel's checkin 36685. Need to work with Darrel.Commenting it out so that test suit will not hang + @Ignore + @Test + public void testGatewaySenderEventListener_GatewayOperations() { + + int mPort = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); + vm4.invoke(() -> WANTestBase.createCacheWithoutLocator(mPort )); + vm5.invoke(() -> WANTestBase.createCacheWithoutLocator(mPort )); + vm6.invoke(() -> WANTestBase.createCacheWithoutLocator(mPort )); + vm7.invoke(() -> WANTestBase.createCacheWithoutLocator(mPort )); + + vm4.invoke(() -> WANTestBase.createSenderWithListener( "ln", 2, + false, 100, 10, false, false, null, false, true)); + + vm4.invoke(() -> WANTestBase.startSender( "ln" )); + + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + + final Map initialKeyValues = new HashMap(); + for(int i=0; i< 1000; i++) { + initialKeyValues.put(i, i); + } + + vm4.invoke(() -> WANTestBase.putGivenKeyValue( getTestMethodName() + "_RR", + initialKeyValues )); + + fail("tried to invoke missing method"); +// vm4.invoke(() -> SerialGatewaySenderEventListenerDUnitTest.validateReceivedEventsMapSizeListener1("ln", initialKeyValues )); + + vm4.invoke(() -> WANTestBase.stopSender( "ln" )); + + final Map keyValues = new HashMap(); + for(int i=1000; i< 2000; i++) { + keyValues.put(i, i); + } + + vm4.invoke(() -> WANTestBase.putGivenKeyValue( getTestMethodName() + "_RR", + keyValues )); + + fail("tried to invoke missing method"); +// vm4.invoke(() -> SerialGatewaySenderEventListenerDUnitTest.validateReceivedEventsMapSizeListener1("ln", initialKeyValues )); + + vm4.invoke(() -> WANTestBase.startSender( "ln" )); + + final Map finalKeyValues = new HashMap(); + for(int i=2000; i< 3000; i++) { + finalKeyValues.put(i, i); + } + + vm4.invoke(() -> WANTestBase.putGivenKeyValue( getTestMethodName() + "_RR", + finalKeyValues )); + + finalKeyValues.putAll(initialKeyValues); + fail("tried to invoke missing method"); +// vm4.invoke(() -> SerialGatewaySenderEventListenerDUnitTest.validateReceivedEventsMapSizeListener1("ln", finalKeyValues )); + + } + + public static void validateNoPoolCreation(final String siteId) { + Set<GatewaySender> senders = cache.getGatewaySenders(); + for(GatewaySender sender: senders) { + if (sender.getId().equals(siteId)) { + AbstractGatewaySender sImpl = (AbstractGatewaySender)sender; + assertNull(sImpl.getProxy()); + } + } + } + + public static void validateReceivedEventsMapSizeListener1(final String senderId, final Map map) { + + Set<GatewaySender> senders = cache.getGatewaySenders(); + GatewaySender sender = null; + for(GatewaySender s : senders){ + if(s.getId().equals(senderId)){ + sender = s; + break; + } + } + + final List<AsyncEventListener> listeners = ((AbstractGatewaySender)sender).getAsyncEventListeners(); + if(listeners.size() == 1) { + final AsyncEventListener l = listeners.get(0); + + WaitCriterion wc = new WaitCriterion() { + Map listenerMap; + public boolean done() { + listenerMap = ((MyGatewaySenderEventListener)l) + .getEventsMap(); + boolean sizeCorrect = map.size() == listenerMap.size(); + boolean keySetCorrect = listenerMap.keySet().containsAll(map.keySet()); + boolean valuesCorrect = listenerMap.values().containsAll(map.values()); + return sizeCorrect && keySetCorrect && valuesCorrect; + } + + public String description() { + return "Waiting for all sites to get updated, the sizes are " + listenerMap.size() + " and " + map.size(); + } + }; + Wait.waitForCriterion(wc, 60000, 500, true); + } + } + + public static void validateReceivedEventsMapSizeListener2(final String senderId, final Map map) { + + Set<GatewaySender> senders = cache.getGatewaySenders(); + GatewaySender sender = null; + for(GatewaySender s : senders){ + if(s.getId().equals(senderId)){ + sender = s; + break; + } + } + + final List<AsyncEventListener> listeners = ((AbstractGatewaySender)sender).getAsyncEventListeners(); + if(listeners.size() == 2) { + final AsyncEventListener l1 = listeners.get(0); + final AsyncEventListener l2 = listeners.get(1); + Awaitility.await().atMost(60000, TimeUnit.MILLISECONDS).until(()->{ + Map listenerMap1 = ((MyGatewaySenderEventListener)l1) + .getEventsMap(); + + Map listenerMap2 = ((MyGatewaySenderEventListener2)l2) + .getEventsMap(); + int listener1MapSize = listenerMap1.size(); + int listener2MapSize = listenerMap1.size(); + int expectedMapSize = map.size(); + boolean sizeCorrect = expectedMapSize == listener1MapSize; + boolean keySetCorrect = listenerMap1.keySet().containsAll(map.keySet()); + boolean valuesCorrect = listenerMap1.values().containsAll(map.values()); + + boolean sizeCorrect2 = expectedMapSize== listener2MapSize; + boolean keySetCorrect2 = listenerMap2.keySet().containsAll(map.keySet()); + boolean valuesCorrect2 = listenerMap2.values().containsAll(map.values()); + + assertEquals("Failed while waiting for all sites to get updated with the correct events. \nThe " + + "size of listener 1's map = "+ listener1MapSize + "\n The size of listener 2's map = " + + ""+ listener2MapSize + "\n The expected map size =" + expectedMapSize , + true, sizeCorrect && keySetCorrect && valuesCorrect && sizeCorrect2 && keySetCorrect2 && valuesCorrect2); + }); + } + } +} + http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java new file mode 100644 index 0000000..200e5b6 --- /dev/null +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java @@ -0,0 +1,665 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.wan.serial; + +import org.junit.experimental.categories.Category; +import org.junit.Test; + +import static org.junit.Assert.*; + +import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase; +import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase; +import com.gemstone.gemfire.test.junit.categories.DistributedTest; + +import java.util.HashSet; +import java.util.Set; + +import com.gemstone.gemfire.cache.RegionDestroyedException; +import com.gemstone.gemfire.cache.client.internal.locator.QueueConnectionRequest; +import com.gemstone.gemfire.cache.client.internal.locator.QueueConnectionResponse; +import com.gemstone.gemfire.cache.wan.GatewaySender; +import com.gemstone.gemfire.distributed.Locator; +import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; +import com.gemstone.gemfire.distributed.internal.InternalLocator; +import com.gemstone.gemfire.distributed.internal.ServerLocator; +import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID; +import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender; +import com.gemstone.gemfire.internal.cache.wan.GatewaySenderException; +import com.gemstone.gemfire.internal.cache.wan.WANTestBase; +import com.gemstone.gemfire.test.dunit.AsyncInvocation; +import com.gemstone.gemfire.test.dunit.IgnoredException; +import com.gemstone.gemfire.test.dunit.LogWriterUtils; +import com.gemstone.gemfire.test.dunit.RMIException; +import com.gemstone.gemfire.test.dunit.SerializableRunnable; +import com.gemstone.gemfire.test.dunit.VM; + +/** + * + */ +@Category(DistributedTest.class) +public class SerialGatewaySenderOperationsDUnitTest extends WANTestBase { + + private static final long serialVersionUID = 1L; + + public SerialGatewaySenderOperationsDUnitTest() { + super(); + } + + @Override + protected final void postSetUpWANTestBase() throws Exception { + IgnoredException.addIgnoredException("Broken pipe"); + IgnoredException.addIgnoredException("Connection reset"); + IgnoredException.addIgnoredException("Unexpected IOException"); + IgnoredException.addIgnoredException("Connection refused"); + IgnoredException.addIgnoredException("could not get remote locator information"); + + //Stopping the gateway closed the region, + //which causes this exception to get logged + IgnoredException.addIgnoredException(RegionDestroyedException.class.getSimpleName()); + } + + @Test + public void testSerialGatewaySenderOperationsWithoutStarting() { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createSenderCaches(lnPort); + + createSenderVM4(); + createSenderVM5(); + + createReceiverRegions(); + + createSenderRegions(); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", + 10 )); + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", + 100 )); + + vm4.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifyGatewaySenderOperations( "ln" )); + vm5.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifyGatewaySenderOperations( "ln" )); + + } + + protected void createSenderRegions() { + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + } + + protected void createReceiverRegions() { + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + } + + protected void createSenderCaches(Integer lnPort) { + vm4.invoke(() -> WANTestBase.createCache( lnPort )); + vm5.invoke(() -> WANTestBase.createCache( lnPort )); + vm6.invoke(() -> WANTestBase.createCache( lnPort )); + vm7.invoke(() -> WANTestBase.createCache( lnPort )); + } + + protected void createSenderVM5() { + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, true, null, true )); + } + + protected void createSenderVM4() { + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, true, null, true )); + } + + + @Test + public void testStartPauseResumeSerialGatewaySender() { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createSenderCaches(lnPort); + + createSenderVM4(); + createSenderVM5(); + + createReceiverRegions(); + + createSenderRegions(); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", + 10 )); + + startSenderInVMs("ln", vm4, vm5); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", + 100 )); + + vm4.invoke(() -> WANTestBase.pauseSender( "ln" )); + vm5.invoke(() -> WANTestBase.pauseSender( "ln" )); + + vm4.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderPausedState( "ln" )); + vm5.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderPausedState( "ln" )); + + AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", 10 )); + + vm4.invoke(() -> WANTestBase.resumeSender( "ln" )); + vm5.invoke(() -> WANTestBase.resumeSender( "ln" )); + + vm4.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderResumedState( "ln" )); + vm5.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderResumedState( "ln" )); + + try { + inv1.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + fail("Interrupted the async invocation."); + } + + LogWriterUtils.getLogWriter().info("Completed puts in the region"); + + validateQueueContents(vm4, "ln", 0); + validateQueueContents(vm5, "ln", 0); + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 100 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 100 )); + + } + + @Test + public void testStopSerialGatewaySender() throws Throwable { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createSenderCaches(lnPort); + + createSenderVM4(); + createSenderVM5(); + + createReceiverRegions(); + + createSenderRegions(); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", + 20 )); + + startSenderInVMs("ln", vm4, vm5); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", + 20 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 20 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 20 )); + + vm2.invoke(() -> WANTestBase.stopReceivers()); + vm3.invoke(() -> WANTestBase.stopReceivers()); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", + 20 )); + + vm4.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 20 )); + vm5.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 20 )); + + vm4.invoke(() -> WANTestBase.stopSender( "ln" )); + vm5.invoke(() -> WANTestBase.stopSender( "ln" )); + + vm4.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderStoppedState( "ln" )); + vm5.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderStoppedState( "ln" )); + + vm4.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 0 )); + vm5.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 0 )); + /** + * Should have no effect on GatewaySenderState + */ + vm4.invoke(() -> WANTestBase.resumeSender( "ln" )); + vm5.invoke(() -> WANTestBase.resumeSender( "ln" )); + + vm4.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderStoppedState( "ln" )); + vm5.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderStoppedState( "ln" )); + + AsyncInvocation vm4async = vm4.invokeAsync(() -> WANTestBase.startSender( "ln" )); + AsyncInvocation vm5async = vm5.invokeAsync(() -> WANTestBase.startSender( "ln" )); + int START_WAIT_TIME = 30000; + vm4async.getResult(START_WAIT_TIME); + vm5async.getResult(START_WAIT_TIME); + + vm4.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 20 )); + vm5.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 20 )); + + vm5.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", + 110 )); + + vm4.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 130 )); + vm5.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 130 )); + + vm2.invoke(() -> WANTestBase.startReceivers()); + vm3.invoke(() -> WANTestBase.startReceivers()); + + vm4.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderResumedState( "ln" )); + vm5.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderResumedState( "ln" )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 110 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 110 )); + + vm4.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 0 )); + vm5.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 0 )); + } + + @Test + public void testStopOneSerialGatewaySenderBothPrimary() throws Throwable { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createSenderCaches(lnPort); + + createSenderVM4(); + createSenderVM5(); + + createReceiverRegions(); + + createSenderRegions(); + + startSenderInVMs("ln", vm4, vm5); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", + 100 )); + + vm4.invoke(() -> WANTestBase.stopSender( "ln" )); + + vm4.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderStoppedState( "ln" )); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", + 200 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 200 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 200 )); + + //Do some puts while restarting a sender + AsyncInvocation asyncPuts = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", + 300 )); + + Thread.sleep(10); + vm4.invoke(() -> WANTestBase.startSender( "ln" )); + + asyncPuts.getResult(); + LogWriterUtils.getLogWriter().info("Completed puts in the region"); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 300 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 300 )); + + vm4.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 0 )); + + + } + + @Test + public void testStopOneSerialGatewaySender_PrimarySecondary() { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createSenderCaches(lnPort); + + createSenderVM4(); + createSenderVM5(); + + createReceiverRegions(); + + createSenderRegions(); + + startSenderInVMs("ln", vm4, vm5); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", + 10 )); + + vm4.invoke(() -> WANTestBase.stopSender( "ln" )); + + vm4.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderStoppedState( "ln" )); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", + 100 )); + + LogWriterUtils.getLogWriter().info("Completed puts in the region"); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 100 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 100 )); + } + + @Test + public void testStopOneSender_StartAnotherSender() { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2); + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + vm2.invoke(() -> WANTestBase.createReceiver()); + + createCacheInVMs(lnPort, vm4); + createSenderVM4(); + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm4.invoke(() -> WANTestBase.startSender( "ln" )); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", + 10 )); + vm4.invoke(() -> WANTestBase.stopSender( "ln" )); + vm4.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderStoppedState( "ln" )); + + vm5.invoke(() -> WANTestBase.createCache( lnPort )); + createSenderVM5(); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm5.invoke(() -> WANTestBase.startSender( "ln" )); + + vm5.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", + 100 )); + LogWriterUtils.getLogWriter().info("Completed puts in the region"); + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 100 )); + } + + @Test + public void test_Bug44153_StopOneSender_StartAnotherSender_CheckQueueSize() { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + vm4.invoke(() -> WANTestBase.createCache( lnPort )); + createSenderVM4(); + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm4.invoke(() -> WANTestBase.startSender( "ln" )); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", + 10 )); + validateQueueContents(vm4, "ln", 10); + vm4.invoke(() -> WANTestBase.stopSender( "ln" )); + + vm4.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderStoppedState( "ln" )); + + vm5.invoke(() -> WANTestBase.createCache( lnPort )); + createSenderVM5(); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm5.invoke(() -> WANTestBase.startSender( "ln" )); + + vm5.invoke(() -> WANTestBase.doPutsFrom( getTestMethodName() + "_RR", 10, 110 )); + + validateQueueContents(vm5, "ln", 100); + vm5.invoke(() -> WANTestBase.stopSender( "ln" )); + vm5.invoke(() -> SerialGatewaySenderOperationsDUnitTest.verifySenderStoppedState( "ln" )); + + vm4.invoke(() -> WANTestBase.startSender( "ln" )); + validateQueueContents(vm4, "ln", 10); + vm4.invoke(() -> WANTestBase.stopSender( "ln" )); + + vm5.invoke(() -> WANTestBase.startSender( "ln" )); + createCacheInVMs(nyPort, vm2); + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + vm2.invoke(() -> WANTestBase.createReceiver()); + + LogWriterUtils.getLogWriter().info("Completed puts in the region"); + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 100 )); + vm5.invoke(() -> WANTestBase.stopSender( "ln" )); + + vm4.invoke(() -> WANTestBase.startSender( "ln" )); + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 110 )); + vm4.invoke(() -> WANTestBase.stopSender( "ln" )); + } + + private void validateQueueContents(VM vm, String site, int size) { + vm.invoke(() -> WANTestBase.validateQueueContents( site, + size )); + } + + /** + * Destroy SerialGatewaySender on all the nodes. + */ + @Test + public void testDestroySerialGatewaySenderOnAllNodes() { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createSenderCaches(lnPort); + + createSenderVM4(); + createSenderVM5(); + + createReceiverRegions(); + + createSenderRegions(); + + startSenderInVMs("ln", vm4, vm5); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", + 10 )); + + //before destroying, stop the sender + vm4.invoke(() -> WANTestBase.stopSender( "ln" )); + vm5.invoke(() -> WANTestBase.stopSender( "ln" )); + + vm4.invoke(() -> WANTestBase.removeSenderFromTheRegion( "ln", getTestMethodName() + "_RR" )); + vm5.invoke(() -> WANTestBase.removeSenderFromTheRegion( "ln", getTestMethodName() + "_RR" )); + + vm4.invoke(() -> WANTestBase.destroySender( "ln" )); + vm5.invoke(() -> WANTestBase.destroySender( "ln" )); + + vm4.invoke(() -> WANTestBase.verifySenderDestroyed( "ln", false )); + vm5.invoke(() -> WANTestBase.verifySenderDestroyed( "ln", false )); + } + + /** + * Destroy SerialGatewaySender on a single node. + */ + @Test + public void testDestroySerialGatewaySenderOnSingleNode() { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createSenderCaches(lnPort); + + createSenderVM4(); + createSenderVM5(); + + createReceiverRegions(); + + createSenderRegions(); + + startSenderInVMs("ln", vm4, vm5); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", + 10 )); + + //before destroying, stop the sender + vm4.invoke(() -> WANTestBase.stopSender( "ln" )); + + vm4.invoke(() -> WANTestBase.removeSenderFromTheRegion( "ln", getTestMethodName() + "_RR" )); + + vm4.invoke(() -> WANTestBase.destroySender( "ln" )); + + vm4.invoke(() -> WANTestBase.verifySenderDestroyed( "ln", false )); + vm5.invoke(() -> WANTestBase.verifySenderRunningState( "ln" )); + } + + /** + * Since the sender is attached to a region and in use, it can not be destroyed. + * Hence, exception is thrown by the sender API. + */ + @Test + public void testDestroySerialGatewaySenderExceptionScenario() { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createSenderCaches(lnPort); + + createSenderVM4(); + createSenderVM5(); + + createReceiverRegions(); + + createSenderRegions(); + + startSenderInVMs("ln", vm4, vm5); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", + 10 )); + + try { + vm4.invoke(() -> WANTestBase.destroySender( "ln" )); + } catch (RMIException e) { + assertTrue("Cause of the exception should be GatewaySenderException", e.getCause() instanceof GatewaySenderException); + } + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 10 )); + } + + + @Test + public void testGatewaySenderNotRegisteredAsCacheServer() { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, true, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, true, null, true )); + + startSenderInVMs("ln", vm4, vm5); + + SerializableRunnable check = new SerializableRunnable("assert no cache servers") { + public void run() { + InternalLocator inl = (InternalLocator)Locator.getLocator(); + ServerLocator server = inl.getServerLocatorAdvisee(); + LogWriterUtils.getLogWriter().info("Server load map is " + server.getLoadMap()); + assertTrue("expected an empty map but found " + server.getLoadMap(), + server.getLoadMap().isEmpty()); + QueueConnectionRequest request = new QueueConnectionRequest( + ClientProxyMembershipID.getNewProxyMembership(InternalDistributedSystem.getConnectedInstance()), + 1, new HashSet<>(), "", false); + QueueConnectionResponse response = (QueueConnectionResponse)server.processRequest(request); + assertTrue("expected no servers but found " + response.getServers(), + response.getServers().isEmpty()); + } + }; + vm0.invoke(check); + vm1.invoke(check); + + } + + + + public static void verifySenderPausedState(String senderId) { + Set<GatewaySender> senders = cache.getGatewaySenders(); + AbstractGatewaySender sender = null; + for (GatewaySender s : senders) { + if (s.getId().equals(senderId)) { + sender = (AbstractGatewaySender)s; + break; + } + } + assertTrue(sender.isPaused()); + } + + public static void verifySenderResumedState(String senderId) { + Set<GatewaySender> senders = cache.getGatewaySenders(); + AbstractGatewaySender sender = null; + for (GatewaySender s : senders) { + if (s.getId().equals(senderId)) { + sender = (AbstractGatewaySender)s; + break; + } + } + assertFalse(sender.isPaused()); + assertTrue(sender.isRunning()); + } + + public static void verifySenderStoppedState(String senderId) { + Set<GatewaySender> senders = cache.getGatewaySenders(); + AbstractGatewaySender sender = null; + for (GatewaySender s : senders) { + if (s.getId().equals(senderId)) { + sender = (AbstractGatewaySender)s; + break; + } + } + assertFalse(sender.isRunning()); + assertFalse(sender.isPaused()); + } + + public static void verifyGatewaySenderOperations(String senderId) { + Set<GatewaySender> senders = cache.getGatewaySenders(); + GatewaySender sender = null; + for (GatewaySender s : senders) { + if (s.getId().equals(senderId)) { + sender = s; + break; + } + } + assertFalse(sender.isPaused()); + assertFalse(((AbstractGatewaySender)sender).isRunning()); + sender.pause(); + sender.resume(); + sender.stop(); + } +}
