http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagation_PartitionedRegionDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagation_PartitionedRegionDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagation_PartitionedRegionDUnitTest.java new file mode 100644 index 0000000..47dee96 --- /dev/null +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagation_PartitionedRegionDUnitTest.java @@ -0,0 +1,412 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.wan.serial; + +import org.junit.experimental.categories.Category; +import org.junit.Test; + +import static org.junit.Assert.*; + +import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase; +import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase; +import com.gemstone.gemfire.test.junit.categories.DistributedTest; + +import org.junit.experimental.categories.Category; + +import com.gemstone.gemfire.CancelException; +import com.gemstone.gemfire.cache.CacheClosedException; +import com.gemstone.gemfire.internal.cache.ForceReattemptException; +import com.gemstone.gemfire.internal.cache.wan.WANTestBase; +import com.gemstone.gemfire.test.dunit.AsyncInvocation; +import com.gemstone.gemfire.test.dunit.IgnoredException; +import com.gemstone.gemfire.test.junit.categories.FlakyTest; + +@Category(DistributedTest.class) +public class SerialWANPropagation_PartitionedRegionDUnitTest extends WANTestBase { + + private static final long serialVersionUID = 1L; + + public SerialWANPropagation_PartitionedRegionDUnitTest() { + super(); + } + + @Test + public void testPartitionedSerialPropagation() throws Exception { + + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, null, true )); + + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + + vm4.invoke(() -> WANTestBase.startSender( "ln" )); + //vm5.invoke(() -> WANTestBase.startSender( "ln" )); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", + 1000 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 1000 )); + } + + @Test + public void testBothReplicatedAndPartitionedSerialPropagation() + throws Exception { + + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, null, true )); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5); + + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", + 1000 )); + vm5.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", + 1000 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 1000 )); + } + + @Test + public void testSerialReplicatedAndPartitionedPropagation() throws Exception { + + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createSender( "lnSerial", + 2, false, 100, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "lnSerial", + 2, false, 100, 10, false, false, null, true )); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "lnSerial", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "lnSerial", isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "lnSerial", isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "lnSerial", isOffHeap() )); + + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "lnSerial", 1, 100, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "lnSerial", 1, 100, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "lnSerial", 1, 100, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "lnSerial", 1, 100, isOffHeap() )); + + vm4.invoke(() -> WANTestBase.startSender( "lnSerial" )); + vm5.invoke(() -> WANTestBase.startSender( "lnSerial" )); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", + 1000 )); + vm5.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", + 1000 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 1000 )); + } + + @Test + public void testSerialReplicatedAndSerialPartitionedPropagation() + throws Exception { + + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createSender( "lnSerial1", + 2, false, 100, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "lnSerial1", + 2, false, 100, 10, false, false, null, true )); + + vm5.invoke(() -> WANTestBase.createSender( "lnSerial2", + 2, false, 100, 10, false, false, null, true )); + vm6.invoke(() -> WANTestBase.createSender( "lnSerial2", + 2, false, 100, 10, false, false, null, true )); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "lnSerial1", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "lnSerial1", isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "lnSerial1", isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "lnSerial1", isOffHeap() )); + + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "lnSerial2", 1, 100, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "lnSerial2", 1, 100, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "lnSerial2", 1, 100, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "lnSerial2", 1, 100, isOffHeap() )); + + startSenderInVMs("lnSerial1", vm4, vm5); + + startSenderInVMs("lnSerial2", vm5, vm6); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", + 1000 )); + vm5.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", + 1000 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 1000 )); + } + + @Test + public void testPartitionedSerialPropagationToTwoWanSites() throws Exception { + + Integer lnPort = createFirstLocatorWithDSId(1); + Integer nyPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort )); + Integer tkPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(3,lnPort )); + + createCacheInVMs(nyPort, vm2); + vm2.invoke(() -> WANTestBase.createReceiver()); + createCacheInVMs(tkPort, vm3); + vm3.invoke(() -> WANTestBase.createReceiver()); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createSender( "lnSerial1", + 2, false, 100, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "lnSerial1", + 2, false, 100, 10, false, false, null, true )); + + vm4.invoke(() -> WANTestBase.createSender( "lnSerial2", + 3, false, 100, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "lnSerial2", + 3, false, 100, 10, false, false, null, true )); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + + startSenderInVMs("lnSerial1", vm4, vm5); + startSenderInVMs("lnSerial2", vm4, vm5); + + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "lnSerial1,lnSerial2", 1, 100, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "lnSerial1,lnSerial2", 1, 100, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "lnSerial1,lnSerial2", 1, 100, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "lnSerial1,lnSerial2", 1, 100, isOffHeap() )); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", + 1000 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 1000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 1000 )); + } + + @Test + public void testPartitionedSerialPropagationHA() throws Exception { + IgnoredException.addIgnoredException("Broken pipe"); + IgnoredException.addIgnoredException("Connection reset"); + IgnoredException.addIgnoredException("Unexpected IOException"); + + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, null, true )); + + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + + //do initial 100 puts to create all the buckets + vm5.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 100 )); + + IgnoredException.addIgnoredException(CancelException.class.getName()); + IgnoredException.addIgnoredException(CacheClosedException.class.getName()); + IgnoredException.addIgnoredException(ForceReattemptException.class.getName()); + //start async puts + AsyncInvocation inv = vm5.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); + //close the cache on vm4 in between the puts + vm4.invoke(() -> WANTestBase.killSender()); + + inv.join(); + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 1000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 1000 )); + } + + @Test + public void testPartitionedSerialPropagationWithParallelThreads() throws Exception { + + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + + createReceiverInVMs(vm2, vm3); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, null, true )); + + startSenderInVMs("ln", vm4, vm5); + + + + vm4.invoke(() -> WANTestBase.doMultiThreadedPuts( // TODO: eats exceptions + getTestMethodName() + "_PR", 1000 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 1000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 1000 )); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagationsFeatureDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagationsFeatureDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagationsFeatureDUnitTest.java new file mode 100644 index 0000000..f3b6765 --- /dev/null +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagationsFeatureDUnitTest.java @@ -0,0 +1,338 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.wan.serial; + +import org.junit.experimental.categories.Category; +import org.junit.Test; + +import static org.junit.Assert.*; + +import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase; +import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase; +import com.gemstone.gemfire.test.junit.categories.DistributedTest; + +import com.gemstone.gemfire.internal.cache.wan.WANTestBase; + + +@Category(DistributedTest.class) +public class SerialWANPropagationsFeatureDUnitTest extends WANTestBase{ + + private static final long serialVersionUID = 1L; + + public SerialWANPropagationsFeatureDUnitTest() { + super(); + } + + @Test + public void testSerialReplicatedWanWithOverflow() { + + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 10, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 10, 10, false, false, null, true )); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5); + vm2.invoke(() -> addListenerToSleepAfterCreateEvent(1000, getTestMethodName() + "_RR")); + vm3.invoke(() -> addListenerToSleepAfterCreateEvent(1000, getTestMethodName() + "_RR")); + + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + + vm4.invoke(() -> WANTestBase.doHeavyPuts( + getTestMethodName() + "_RR", 15 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 15, 240000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 15, 240000 )); + } + + @Test + public void testSerialReplicatedWanWithPersistence() { + + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, true, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, true, null, true )); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5); + + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", + 1000 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + + } + + @Test + public void testReplicatedSerialPropagationWithConflation() throws Exception { + + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 1000, true, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 1000, true, false, null, true )); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5); + + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", + 1000 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + } + + @Test + public void testReplicatedSerialPropagationWithParallelThreads() + throws Exception { + + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, null, true )); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5); + + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + + vm4.invoke(() -> WANTestBase.doMultiThreadedPuts( + getTestMethodName() + "_RR", 1000 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + } + + @Test + public void testSerialPropagationWithFilter() throws Exception { + + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2,lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, + new MyGatewayEventFilter(), true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, + new MyGatewayEventFilter(), true )); + + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName(), "ln", 1, 100, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName(), "ln", 1, 100, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName(), "ln", 1, 100, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName(), "ln", 1, 100, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName(), null, 1, 100, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName(), null, 1, 100, isOffHeap() )); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 1000 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName(), 800 )); + } + + @Test + public void testReplicatedSerialPropagationWithFilter() throws Exception { + + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, + new MyGatewayEventFilter(), true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, + new MyGatewayEventFilter(), true )); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName(), null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName(), null, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5); + + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName(), "ln", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName(), "ln", isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName(), "ln", isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName(), "ln", isOffHeap() )); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 1000 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName(), 800 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName(), 800 )); + } + + @Test + public void testReplicatedSerialPropagationWithFilter_AfterAck() + throws Exception { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm6, vm7); + createReceiverInVMs(vm6, vm7); + + createCacheInVMs(lnPort, vm2, vm3, vm4, vm5); + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, false, 100, 10, false, false, + new MyGatewayEventFilter_AfterAck(), true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, false, 100, 10, false, false, + new MyGatewayEventFilter_AfterAck(), true )); + + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName(), null, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName(), null, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName(), "ln", isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName(), "ln", isOffHeap() )); + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName(), "ln", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName(), "ln", isOffHeap() )); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 1000 )); + + vm4.invoke(() -> WANTestBase.validateQueueContents( "ln", + 0 )); + vm5.invoke(() -> WANTestBase.validateQueueContents( "ln", + 0 )); + + Integer vm4Acks = (Integer)vm4.invoke(() -> WANTestBase.validateAfterAck( "ln")); + Integer vm5Acks = (Integer)vm5.invoke(() -> WANTestBase.validateAfterAck( "ln")); + + assertEquals(2000, (vm4Acks + vm5Acks)); + + vm6.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName(), 1000 )); + vm7.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName(), 1000 )); + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANStatsDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANStatsDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANStatsDUnitTest.java new file mode 100644 index 0000000..2019cac --- /dev/null +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANStatsDUnitTest.java @@ -0,0 +1,588 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.wan.serial; + +import org.junit.experimental.categories.Category; +import org.junit.Test; + +import static org.junit.Assert.*; + +import com.gemstone.gemfire.test.junit.categories.DistributedTest; + +import static com.gemstone.gemfire.test.dunit.Wait.*; +import static com.gemstone.gemfire.test.dunit.IgnoredException.*; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy; +import com.gemstone.gemfire.internal.cache.wan.WANTestBase; +import com.gemstone.gemfire.test.dunit.AsyncInvocation; +import com.jayway.awaitility.Awaitility; + +@Category(DistributedTest.class) +public class SerialWANStatsDUnitTest extends WANTestBase { + + private static final long serialVersionUID = 1L; + + private String testName; + + public SerialWANStatsDUnitTest() { + super(); + } + + @Override + protected final void postSetUpWANTestBase() throws Exception { + this.testName = getTestMethodName(); + addIgnoredException("java.net.ConnectException"); + addIgnoredException("java.net.SocketException"); + addIgnoredException("Unexpected IOException"); + } + + @Test + public void testReplicatedSerialPropagation() throws Exception { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2); + vm2.invoke(() -> WANTestBase.createReceiver()); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, null, true )); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + testName + "_RR", null, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5); + + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + testName + "_RR", "ln", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + testName + "_RR", "ln", isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + testName + "_RR", "ln", isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + testName + "_RR", "ln", isOffHeap() )); + + vm5.invoke(() -> WANTestBase.doPuts( testName + "_RR", + 1000 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + testName + "_RR", 1000 )); + + pause(2000); + vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(100, 1000, 1000 )); + + vm4.invoke(() -> WANTestBase.checkQueueStats("ln", + 0, 1000, 1000, 1000)); + vm4.invoke(() -> WANTestBase.checkBatchStats("ln", + 100)); + + vm5.invoke(() -> WANTestBase.checkQueueStats("ln", + 0, 1000, 0, 0)); + vm5.invoke(() -> WANTestBase.checkBatchStats("ln", + 0)); + + } + + @Test + public void testReplicatedSerialPropagationWithMultipleDispatchers() throws Exception { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2); + vm2.invoke(() -> WANTestBase.createReceiver()); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createSenderWithMultipleDispatchers( "ln", 2, + false, 100, 10, false, false, null, true, 2, OrderPolicy.KEY )); + vm5.invoke(() -> WANTestBase.createSenderWithMultipleDispatchers( "ln", 2, + false, 100, 10, false, false, null, true, 2, OrderPolicy.KEY )); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + testName + "_RR", null, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5); + + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + testName + "_RR", "ln", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + testName + "_RR", "ln", isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + testName + "_RR", "ln", isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + testName + "_RR", "ln", isOffHeap() )); + + vm5.invoke(() -> WANTestBase.doPuts( testName + "_RR", + 1000 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + testName + "_RR", 1000 )); + + pause(2000); + vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(100, 1000, 1000 )); + + vm4.invoke(() -> WANTestBase.checkQueueStats("ln", + 0, 1000, 1000, 1000)); + vm4.invoke(() -> WANTestBase.checkBatchStats("ln", + 100)); + + vm5.invoke(() -> WANTestBase.checkQueueStats("ln", + 0, 1000, 0, 0)); + vm5.invoke(() -> WANTestBase.checkBatchStats("ln", + 0)); + + } + + @Test + public void testWANStatsTwoWanSites() throws Exception { + + Integer lnPort = createFirstLocatorWithDSId(1); + Integer nyPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + Integer tkPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 3, lnPort )); + + createCacheInVMs(nyPort, vm2); + vm2.invoke(() -> WANTestBase.createReceiver()); + createCacheInVMs(tkPort, vm3); + vm3.invoke(() -> WANTestBase.createReceiver()); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createSender( "lnSerial1", + 2, false, 100, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "lnSerial1", + 2, false, 100, 10, false, false, null, true )); + + vm4.invoke(() -> WANTestBase.createSender( "lnSerial2", + 3, false, 100, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "lnSerial2", + 3, false, 100, 10, false, false, null, true )); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + testName + "_RR", null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + testName + "_RR", null, isOffHeap() )); + + startSenderInVMs("lnSerial1", vm4, vm5); + startSenderInVMs("lnSerial2", vm4, vm5); + + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + testName + "_RR", "lnSerial1,lnSerial2", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + testName + "_RR", "lnSerial1,lnSerial2", isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + testName + "_RR", "lnSerial1,lnSerial2", isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + testName + "_RR", "lnSerial1,lnSerial2", isOffHeap() )); + + vm4.invoke(() -> WANTestBase.doPuts( testName + "_RR", + 1000 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + testName + "_RR", 1000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + testName + "_RR", 1000 )); + + pause(2000); + vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(100, 1000, 1000 )); + vm3.invoke(() -> WANTestBase.checkGatewayReceiverStats(100, 1000, 1000 )); + + vm4.invoke(() -> WANTestBase.checkQueueStats("lnSerial1", + 0, 1000, 1000, 1000)); + vm4.invoke(() -> WANTestBase.checkBatchStats("lnSerial1", + 100)); + vm4.invoke(() -> WANTestBase.checkQueueStats("lnSerial2", + 0, 1000, 1000, 1000)); + vm4.invoke(() -> WANTestBase.checkBatchStats("lnSerial2", + 100)); + vm5.invoke(() -> WANTestBase.checkQueueStats("lnSerial1", + 0, 1000, 0, 0)); + vm5.invoke(() -> WANTestBase.checkBatchStats("lnSerial1", + 0)); + vm5.invoke(() -> WANTestBase.checkQueueStats("lnSerial2", + 0, 1000, 0, 0)); + vm5.invoke(() -> WANTestBase.checkBatchStats("lnSerial2", + 0)); + + } + + @Test + public void testReplicatedSerialPropagationHA() throws Exception { + + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + vm2.invoke(() -> WANTestBase.createCache( nyPort )); + vm2.invoke(() -> WANTestBase.createReceiver()); + + vm4.invoke(() -> WANTestBase.createCache(lnPort )); + vm5.invoke(() -> WANTestBase.createCache(lnPort )); + vm6.invoke(() -> WANTestBase.createCache(lnPort )); + vm7.invoke(() -> WANTestBase.createCache(lnPort )); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, null, true )); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + testName + "_RR", null, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5); + + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + testName + "_RR", "ln", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + testName + "_RR", "ln", isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + testName + "_RR", "ln", isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + testName + "_RR", "ln", isOffHeap() )); + + AsyncInvocation inv1 = vm5.invokeAsync(() -> WANTestBase.doPuts( testName + "_RR", 10000 )); + pause(2000); + AsyncInvocation inv2 = vm4.invokeAsync(() -> WANTestBase.killSender( "ln" )); + Boolean isKilled = Boolean.FALSE; + try { + isKilled = (Boolean)inv2.getResult(); + } + catch (Throwable e) { + fail("Unexpected exception while killing a sender"); + } + AsyncInvocation inv3 = null; + if(!isKilled){ + inv3 = vm5.invokeAsync(() -> WANTestBase.killSender( "ln" )); + inv3.join(); + } + inv1.join(); + inv2.join(); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + testName + "_RR", 10000 )); + + vm2.invoke(() -> WANTestBase.checkGatewayReceiverStatsHA(1000, 10000, 10000 )); + + vm5.invoke(() -> WANTestBase.checkStats_Failover("ln", 10000)); + } + + @Test + public void testReplicatedSerialPropagationUnprocessedEvents() throws Exception { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + //these are part of remote site + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + //these are part of local site + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + //senders are created on local site + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 20, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 20, false, false, null, true )); + + //create one RR (RR_1) on remote site + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + testName + "_RR_1", null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + testName + "_RR_1", null, isOffHeap() )); + + //create another RR (RR_2) on remote site + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + testName + "_RR_2", null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + testName + "_RR_2", null, isOffHeap() )); + + //start the senders on local site + startSenderInVMs("ln", vm4, vm5); + + //create one RR (RR_1) on local site + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + testName + "_RR_1", "ln", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + testName + "_RR_1", "ln", isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + testName + "_RR_1", "ln", isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + testName + "_RR_1", "ln", isOffHeap() )); + + //create another RR (RR_2) on local site + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + testName + "_RR_2", "ln", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + testName + "_RR_2", "ln", isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + testName + "_RR_2", "ln", isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + testName + "_RR_2", "ln", isOffHeap() )); + + //start puts in RR_1 in another thread + vm4.invoke(() -> WANTestBase.doPuts( testName + "_RR_1", 1000 )); + //do puts in RR_2 in main thread + vm4.invoke(() -> WANTestBase.doPuts( testName + "_RR_2", 500 )); + //sleep for some time to let all the events propagate to remote site + Thread.sleep(20); + //vm4.invoke(() -> WANTestBase.verifyQueueSize( "ln", 0 )); + vm2.invoke(() -> WANTestBase.validateRegionSize( + testName + "_RR_1", 1000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + testName + "_RR_2", 500 )); + + pause(2000); + vm4.invoke(() -> WANTestBase.checkQueueStats("ln", + 0, 1500, 1500, 1500)); + vm4.invoke(() -> WANTestBase.checkBatchStats("ln", + 75)); + vm4.invoke(() -> WANTestBase.checkUnProcessedStats("ln", 0)); + + + vm5.invoke(() -> WANTestBase.checkQueueStats("ln", + 0, 1500, 0, 0)); + vm5.invoke(() -> WANTestBase.checkBatchStats("ln", + 0)); + vm5.invoke(() -> WANTestBase.checkUnProcessedStats("ln", 1500)); + } + + /** + * + * Not Disabled - see ticket #52118 + * + * NOTE: The test failure is avoided by having a larger number of puts operation so + * that WANTestBase.verifyRegionQueueNotEmpty("ln" )) is successful as there is a + * significant delay during the high number of puts. + * + * In future if this failure reappears, the put operations must be increase or a better fix must be found. + * + * 1 region and sender configured on local site and 1 region and a + * receiver configured on remote site. Puts to the local region are in progress. + * Remote region is destroyed in the middle. + * + * @throws Exception + */ + @Test + public void testReplicatedSerialPropagationWithRemoteRegionDestroy() throws Exception { + int numEntries = 20000; + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + //these are part of remote site + vm2.invoke(() -> WANTestBase.createCache( nyPort )); + vm2.invoke(() -> WANTestBase.createReceiver()); + + //these are part of local site + vm4.invoke(() -> WANTestBase.createCache( lnPort )); + vm5.invoke(() -> WANTestBase.createCache( lnPort )); + vm6.invoke(() -> WANTestBase.createCache( lnPort )); + vm7.invoke(() -> WANTestBase.createCache( lnPort )); + + //senders are created on local site + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 100, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 100, false, false, null, true )); + + //create one RR (RR_1) on remote site + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + testName + "_RR_1", null, isOffHeap() )); + //This is to cause a scenario where we have received at least X events and want to slow the receiver + vm2.invoke(() -> WANTestBase.longPauseAfterNumEvents(500, 200)); + //start the senders on local site + startSenderInVMs("ln", vm4, vm5); + + //create one RR (RR_1) on local site + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + testName + "_RR_1", "ln", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + testName + "_RR_1", "ln", isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + testName + "_RR_1", "ln", isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + testName + "_RR_1", "ln", isOffHeap() )); + + //start puts in RR_1 in another thread + AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.doPuts( testName + "_RR_1", numEntries )); + //destroy RR_1 in remote site + vm2.invoke(() -> WANTestBase.destroyRegion( testName + "_RR_1", 500)); + + try { + inv1.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + fail(); + } + + //assuming some events might have been dispatched before the remote region was destroyed, + //sender's region queue will have events less than 1000 but the queue will not be empty. + //NOTE: this much verification might be sufficient in DUnit. Hydra will take care of + //more in depth validations. + vm4.invoke(() -> WANTestBase.verifyRegionQueueNotEmpty("ln" )); + + //verify that all is well in local site. All the events should be present in local region + vm4.invoke(() -> WANTestBase.validateRegionSize( + testName + "_RR_1", numEntries )); + + //like a latch to guarantee at least one exception returned + vm4.invoke(() -> Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> WANTestBase.verifyQueueSize("ln", 0))); + + vm4.invoke(() -> WANTestBase.checkBatchStats("ln", true, true)); + + vm5.invoke(() -> WANTestBase.checkUnProcessedStats("ln", numEntries)); + + vm2.invoke(() -> WANTestBase.checkExceptionStats(1)); + + } + + @Test + public void testSerialPropagationWithFilter() throws Exception { + + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2,lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, + new MyGatewayEventFilter(), true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, + new MyGatewayEventFilter(), true )); + + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + testName, "ln", 1, 100, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPartitionedRegion( + testName, "ln", 1, 100, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createPartitionedRegion( + testName, "ln", 1, 100, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createPartitionedRegion( + testName, "ln", 1, 100, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + testName, null, 1, 100, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + testName, null, 1, 100, isOffHeap() )); + + vm4.invoke(() -> WANTestBase.doPuts( testName, 1000 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + testName, 800 )); + + pause(2000); + vm4.invoke(() -> WANTestBase.checkQueueStats("ln", + 0, 1000, 900, 800)); + vm4.invoke(() -> WANTestBase.checkEventFilteredStats("ln", + 200)); + vm4.invoke(() -> WANTestBase.checkBatchStats("ln", + 80)); + vm4.invoke(() -> WANTestBase.checkUnProcessedStats("ln", 0)); + + + vm5.invoke(() -> WANTestBase.checkQueueStats("ln", + 0, 1000, 0, 0)); + vm5.invoke(() -> WANTestBase.checkBatchStats("ln", + 0)); + vm5.invoke(() -> WANTestBase.checkUnProcessedStats("ln",900)); + } + + @Test + public void testSerialPropagationConflation() throws Exception { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, true, false, null, true )); + + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + testName, "ln", 0, 100, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPartitionedRegion( + testName, "ln", 0, 100, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createPartitionedRegion( + testName, "ln", 0, 100, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createPartitionedRegion( + testName, "ln", 0, 100, isOffHeap() )); + + vm4.invoke(() -> WANTestBase.startSender( "ln" )); + + vm4.invoke(() -> WANTestBase.pauseSender( "ln" )); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + testName, null,1, 100, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + testName, null,1, 100, isOffHeap() )); + + final Map keyValues = new HashMap(); + final Map updateKeyValues = new HashMap(); + for(int i=0; i< 1000; i++) { + keyValues.put(i, i); + } + + vm4.invoke(() -> WANTestBase.putGivenKeyValue( testName, keyValues )); + + vm4.invoke(() -> WANTestBase.checkQueueSize( "ln", keyValues.size() )); + for(int i=0;i<500;i++) { + updateKeyValues.put(i, i+"_updated"); + } + + vm4.invoke(() -> WANTestBase.putGivenKeyValue( testName, updateKeyValues )); + + vm4.invoke(() -> WANTestBase.checkQueueSize( "ln", keyValues.size() + updateKeyValues.size() )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + testName, 0 )); + + vm4.invoke(() -> WANTestBase.putGivenKeyValue( testName, updateKeyValues )); + + vm4.invoke(() -> WANTestBase.checkQueueSize( "ln", keyValues.size() + updateKeyValues.size() )); + + vm4.invoke(() -> WANTestBase.resumeSender( "ln" )); + + keyValues.putAll(updateKeyValues); + vm2.invoke(() -> WANTestBase.validateRegionSize( + testName, keyValues.size() )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + testName, keyValues.size() )); + + vm2.invoke(() -> WANTestBase.validateRegionContents( + testName, keyValues )); + vm3.invoke(() -> WANTestBase.validateRegionContents( + testName, keyValues )); + + pause(2000); + vm4.invoke(() -> WANTestBase.checkQueueStats("ln", + 0, 2000, 2000, 1500)); + vm4.invoke(() -> WANTestBase.checkConflatedStats("ln", + 500)); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/wancommand/WANCommandTestBase.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/wancommand/WANCommandTestBase.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/wancommand/WANCommandTestBase.java new file mode 100644 index 0000000..f9b46ef --- /dev/null +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/wancommand/WANCommandTestBase.java @@ -0,0 +1,490 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.wan.wancommand; + +import com.gemstone.gemfire.cache.Cache; +import com.gemstone.gemfire.cache.CacheFactory; +import com.gemstone.gemfire.cache.DiskStore; +import com.gemstone.gemfire.cache.DiskStoreFactory; +import com.gemstone.gemfire.cache.wan.*; +import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy; +import com.gemstone.gemfire.distributed.DistributedMember; +import com.gemstone.gemfire.distributed.Locator; +import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; +import com.gemstone.gemfire.internal.AvailablePort; +import com.gemstone.gemfire.internal.AvailablePortHelper; +import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender; +import com.gemstone.gemfire.management.ManagementService; +import com.gemstone.gemfire.management.internal.cli.commands.CliCommandTestBase; +import com.gemstone.gemfire.test.dunit.*; + +import javax.management.remote.JMXConnectorServer; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.Set; + +import static com.gemstone.gemfire.distributed.ConfigurationProperties.*; +import static com.gemstone.gemfire.test.dunit.Assert.assertEquals; +import static com.gemstone.gemfire.test.dunit.Assert.fail; + +public abstract class WANCommandTestBase extends CliCommandTestBase { + + static Cache cache; + private JMXConnectorServer jmxConnectorServer; + private ManagementService managementService; +// public String jmxHost; +// public int jmxPort; + + static VM vm0; + static VM vm1; + static VM vm2; + static VM vm3; + static VM vm4; + static VM vm5; + static VM vm6; + static VM vm7; + + @Override + public final void postSetUpCliCommandTestBase() throws Exception { + final Host host = Host.getHost(0); + vm0 = host.getVM(0); + vm1 = host.getVM(1); + vm2 = host.getVM(2); + vm3 = host.getVM(3); + vm4 = host.getVM(4); + vm5 = host.getVM(5); + vm6 = host.getVM(6); + vm7 = host.getVM(7); + enableManagement(); + } + + public Integer createFirstLocatorWithDSId(int dsId) { + int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); + Properties props = getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(DISTRIBUTED_SYSTEM_ID, ""+dsId); + props.setProperty(LOCATORS, "localhost[" + port + "]"); + props.setProperty(START_LOCATOR, "localhost[" + port + "],server=true,peer=true,hostname-for-clients=localhost"); + InternalDistributedSystem ds = getSystem(props); + cache = CacheFactory.create(ds); + return port; + } + + public Integer createFirstRemoteLocator(int dsId, int remoteLocPort) { + int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); + Properties props = getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(DISTRIBUTED_SYSTEM_ID, ""+dsId); + props.setProperty(LOCATORS, "localhost[" + port + "]"); + props.setProperty(START_LOCATOR, "localhost[" + port + "],server=true,peer=true,hostname-for-clients=localhost"); + props.setProperty(REMOTE_LOCATORS, "localhost[" + remoteLocPort + "]"); + getSystem(props); + return port; + } + + public void createCache(Integer locPort){ + Properties props = getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(LOCATORS, "localhost[" + locPort + "]"); + InternalDistributedSystem ds = getSystem(props); + cache = CacheFactory.create(ds); + } + + public void createCacheWithGroups(Integer locPort, String groups){ + Properties props = getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(LOCATORS, "localhost[" + locPort + "]"); + props.setProperty(GROUPS, groups); + InternalDistributedSystem ds = getSystem(props); + cache = CacheFactory.create(ds); + } + + public void createSender(String dsName, int remoteDsId, + boolean isParallel, Integer maxMemory, + Integer batchSize, boolean isConflation, boolean isPersistent, + GatewayEventFilter filter, boolean isManualStart) { + File persistentDirectory = new File(dsName +"_disk_"+System.currentTimeMillis()+"_" + VM.getCurrentVMNum()); + persistentDirectory.mkdir(); + DiskStoreFactory dsf = cache.createDiskStoreFactory(); + File [] dirs1 = new File[] {persistentDirectory}; + if(isParallel) { + GatewaySenderFactory gateway = cache.createGatewaySenderFactory(); + gateway.setParallel(true); + gateway.setMaximumQueueMemory(maxMemory); + gateway.setBatchSize(batchSize); + gateway.setManualStart(isManualStart); + if (filter != null) { + gateway.addGatewayEventFilter(filter); + } + if(isPersistent) { + gateway.setPersistenceEnabled(true); + gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName).getName()); + } + else { + DiskStore store = dsf.setDiskDirs(dirs1).create(dsName); + gateway.setDiskStoreName(store.getName()); + } + gateway.setBatchConflationEnabled(isConflation); + gateway.create(dsName, remoteDsId); + + }else { + GatewaySenderFactory gateway = cache.createGatewaySenderFactory(); + gateway.setMaximumQueueMemory(maxMemory); + gateway.setBatchSize(batchSize); + gateway.setManualStart(isManualStart); + if (filter != null) { + gateway.addGatewayEventFilter(filter); + } + gateway.setBatchConflationEnabled(isConflation); + if(isPersistent) { + gateway.setPersistenceEnabled(true); + gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName).getName()); + } + else { + DiskStore store = dsf.setDiskDirs(dirs1).create(dsName); + gateway.setDiskStoreName(store.getName()); + } + gateway.create(dsName, remoteDsId); + } + } + + public void startSender(String senderId){ + final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect"); + try { + Set<GatewaySender> senders = cache.getGatewaySenders(); + GatewaySender sender = null; + for (GatewaySender s : senders) { + if (s.getId().equals(senderId)) { + sender = s; + break; + } + } + sender.start(); + } finally { + exln.remove(); + } + } + + public void pauseSender(String senderId){ + final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect"); + try { + Set<GatewaySender> senders = cache.getGatewaySenders(); + GatewaySender sender = null; + for (GatewaySender s : senders) { + if (s.getId().equals(senderId)) { + sender = s; + break; + } + } + sender.pause(); + } finally { + exln.remove(); + } + } + + public int createAndStartReceiver(int locPort) { + Properties props = getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(LOCATORS, "localhost[" + locPort + + "]"); + + InternalDistributedSystem ds = getSystem(props); + cache = CacheFactory.create(ds); + GatewayReceiverFactory fact = cache.createGatewayReceiverFactory(); + int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); + fact.setStartPort(port); + fact.setEndPort(port); + fact.setManualStart(true); + GatewayReceiver receiver = fact.create(); + try { + receiver.start(); + } catch (IOException e) { + e.printStackTrace(); + fail("Test " + getName() + " failed to start GatewayReceiver"); + } + return port; + } + + public int createReceiver(int locPort) { + Properties props = getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(LOCATORS, "localhost[" + locPort + + "]"); + + InternalDistributedSystem ds = getSystem(props); + cache = CacheFactory.create(ds); + GatewayReceiverFactory fact = cache.createGatewayReceiverFactory(); + fact.setStartPort(AvailablePort.AVAILABLE_PORTS_LOWER_BOUND); + fact.setEndPort(AvailablePort.AVAILABLE_PORTS_UPPER_BOUND); + fact.setManualStart(true); + GatewayReceiver receiver = fact.create(); + return receiver.getPort(); + } + + public int createReceiverWithGroup(int locPort, String groups) { + Properties props = getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(LOCATORS, "localhost[" + locPort + + "]"); + props.setProperty(GROUPS, groups); + + InternalDistributedSystem ds = getSystem(props); + cache = CacheFactory.create(ds); + GatewayReceiverFactory fact = cache.createGatewayReceiverFactory(); + fact.setStartPort(AvailablePort.AVAILABLE_PORTS_LOWER_BOUND); + fact.setEndPort(AvailablePort.AVAILABLE_PORTS_UPPER_BOUND); + fact.setManualStart(true); + GatewayReceiver receiver = fact.create(); + return receiver.getPort(); + + } + + public void startReceiver() { + try { + Set<GatewayReceiver> receivers = cache.getGatewayReceivers(); + for (GatewayReceiver receiver : receivers) { + receiver.start(); + } + } catch (IOException e) { + e.printStackTrace(); + fail("Test " + getName() + " failed to start GatewayReceiver"); + } + } + + public void stopReceiver() { + Set<GatewayReceiver> receivers = cache.getGatewayReceivers(); + for (GatewayReceiver receiver : receivers) { + receiver.stop(); + } + } + + public int createAndStartReceiverWithGroup(int locPort, String groups) { + Properties props = getDistributedSystemProperties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(LOCATORS, "localhost[" + locPort + + "]"); + props.setProperty(GROUPS, groups); + + InternalDistributedSystem ds = getSystem(props); + cache = CacheFactory.create(ds); + GatewayReceiverFactory fact = cache.createGatewayReceiverFactory(); + int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); + fact.setStartPort(port); + fact.setEndPort(port); + fact.setManualStart(true); + GatewayReceiver receiver = fact.create(); + try { + receiver.start(); + } catch (IOException e) { + e.printStackTrace(); + fail("Test " + getName() + " failed to start GatewayReceiver on port " + port); + } + return port; + } + + public DistributedMember getMember(){ + return cache.getDistributedSystem().getDistributedMember(); + } + + + public int getLocatorPort(){ + return Locator.getLocators().get(0).getPort(); + } + + /** + * Enable system property gemfire.disableManagement false in each VM. + */ + public void enableManagement() { + Invoke.invokeInEveryVM(new SerializableRunnable("Enable Management") { + public void run() { + System.setProperty(InternalDistributedSystem.DISABLE_MANAGEMENT_PROPERTY, "false"); + } + }); + + } + + public void verifySenderState(String senderId, boolean isRunning, boolean isPaused) { + final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect"); + try { + Set<GatewaySender> senders = cache.getGatewaySenders(); + AbstractGatewaySender sender = null; + for (GatewaySender s : senders) { + if (s.getId().equals(senderId)) { + sender = (AbstractGatewaySender) s; + break; + } + } + + assertEquals(isRunning, sender.isRunning()); + assertEquals(isPaused, sender.isPaused()); + } finally { + exln.remove(); + } + } + + public void verifySenderAttributes(String senderId, int remoteDsID, + boolean isParallel, boolean manualStart, int socketBufferSize, + int socketReadTimeout, boolean enableBatchConflation, int batchSize, + int batchTimeInterval, boolean enablePersistence, + boolean diskSynchronous, int maxQueueMemory, int alertThreshold, + int dispatcherThreads, OrderPolicy orderPolicy, + List<String> expectedGatewayEventFilters, + List<String> expectedGatewayTransportFilters) { + + Set<GatewaySender> senders = cache.getGatewaySenders(); + AbstractGatewaySender sender = null; + for (GatewaySender s : senders) { + if (s.getId().equals(senderId)) { + sender = (AbstractGatewaySender)s; + break; + } + } + assertEquals("remoteDistributedSystemId", remoteDsID, sender + .getRemoteDSId()); + assertEquals("isParallel", isParallel, sender.isParallel()); + assertEquals("manualStart", manualStart, sender.isManualStart()); + assertEquals("socketBufferSize", socketBufferSize, sender + .getSocketBufferSize()); + assertEquals("socketReadTimeout", socketReadTimeout, sender + .getSocketReadTimeout()); + assertEquals("enableBatchConflation", enableBatchConflation, sender + .isBatchConflationEnabled()); + assertEquals("batchSize", batchSize, sender.getBatchSize()); + assertEquals("batchTimeInterval", batchTimeInterval, sender + .getBatchTimeInterval()); + assertEquals("enablePersistence", enablePersistence, sender + .isPersistenceEnabled()); + assertEquals("diskSynchronous", diskSynchronous, sender.isDiskSynchronous()); + assertEquals("maxQueueMemory", maxQueueMemory, sender + .getMaximumQueueMemory()); + assertEquals("alertThreshold", alertThreshold, sender.getAlertThreshold()); + assertEquals("dispatcherThreads", dispatcherThreads, sender + .getDispatcherThreads()); + assertEquals("orderPolicy", orderPolicy, sender.getOrderPolicy()); + + // verify GatewayEventFilters + if (expectedGatewayEventFilters != null) { + assertEquals("gatewayEventFilters", expectedGatewayEventFilters.size(), + sender.getGatewayEventFilters().size()); + + List<GatewayEventFilter> actualGatewayEventFilters = sender + .getGatewayEventFilters(); + List<String> actualEventFilterClassNames = new ArrayList<String>( + actualGatewayEventFilters.size()); + for (GatewayEventFilter filter : actualGatewayEventFilters) { + actualEventFilterClassNames.add(filter.getClass().getName()); + } + + for (String expectedGatewayEventFilter : expectedGatewayEventFilters) { + if (!actualEventFilterClassNames.contains(expectedGatewayEventFilter)) { + fail("GatewayEventFilter " + expectedGatewayEventFilter + + " is not added to the GatewaySender"); + } + } + } + + // verify GatewayTransportFilters + if (expectedGatewayTransportFilters != null) { + assertEquals("gatewayTransportFilters", expectedGatewayTransportFilters + .size(), sender.getGatewayTransportFilters().size()); + List<GatewayTransportFilter> actualGatewayTransportFilters = sender + .getGatewayTransportFilters(); + List<String> actualTransportFilterClassNames = new ArrayList<String>( + actualGatewayTransportFilters.size()); + for (GatewayTransportFilter filter : actualGatewayTransportFilters) { + actualTransportFilterClassNames.add(filter.getClass().getName()); + } + + for (String expectedGatewayTransportFilter : expectedGatewayTransportFilters) { + if (!actualTransportFilterClassNames + .contains(expectedGatewayTransportFilter)) { + fail("GatewayTransportFilter " + expectedGatewayTransportFilter + + " is not added to the GatewaySender."); + } + } + } + } + + public void verifyReceiverState(boolean isRunning) { + Set<GatewayReceiver> receivers = cache.getGatewayReceivers(); + for (GatewayReceiver receiver : receivers) { + assertEquals(isRunning, receiver.isRunning()); + } + } + + public void verifyReceiverCreationWithAttributes(boolean isRunning, + int startPort, int endPort, String bindAddress, int maxTimeBetweenPings, + int socketBufferSize, List<String> expectedGatewayTransportFilters) { + + Set<GatewayReceiver> receivers = cache.getGatewayReceivers(); + assertEquals("Number of receivers is incorrect", 1, receivers.size()); + for (GatewayReceiver receiver : receivers) { + assertEquals("isRunning", isRunning, receiver.isRunning()); + assertEquals("startPort", startPort, receiver.getStartPort()); + assertEquals("endPort", endPort, receiver.getEndPort()); + assertEquals("bindAddress", bindAddress, receiver.getBindAddress()); + assertEquals("maximumTimeBetweenPings", maxTimeBetweenPings, receiver + .getMaximumTimeBetweenPings()); + assertEquals("socketBufferSize", socketBufferSize, receiver + .getSocketBufferSize()); + + // verify GatewayTransportFilters + if (expectedGatewayTransportFilters != null) { + assertEquals("gatewayTransportFilters", expectedGatewayTransportFilters + .size(), receiver.getGatewayTransportFilters().size()); + List<GatewayTransportFilter> actualGatewayTransportFilters = receiver + .getGatewayTransportFilters(); + List<String> actualTransportFilterClassNames = new ArrayList<String>( + actualGatewayTransportFilters.size()); + for (GatewayTransportFilter filter : actualGatewayTransportFilters) { + actualTransportFilterClassNames.add(filter.getClass().getName()); + } + + for (String expectedGatewayTransportFilter : expectedGatewayTransportFilters) { + if (!actualTransportFilterClassNames + .contains(expectedGatewayTransportFilter)) { + fail("GatewayTransportFilter " + expectedGatewayTransportFilter + + " is not added to the GatewayReceiver."); + } + } + } + } + } + + @Override + public final void postTearDownCacheTestCase() throws Exception { + closeCacheAndDisconnect(); + vm0.invoke(() -> WANCommandTestBase.closeCacheAndDisconnect()); + vm1.invoke(() -> WANCommandTestBase.closeCacheAndDisconnect()); + vm2.invoke(() -> WANCommandTestBase.closeCacheAndDisconnect()); + vm3.invoke(() -> WANCommandTestBase.closeCacheAndDisconnect()); + vm4.invoke(() -> WANCommandTestBase.closeCacheAndDisconnect()); + vm5.invoke(() -> WANCommandTestBase.closeCacheAndDisconnect()); + vm6.invoke(() -> WANCommandTestBase.closeCacheAndDisconnect()); + vm7.invoke(() -> WANCommandTestBase.closeCacheAndDisconnect()); + } + + public static void closeCacheAndDisconnect() { + if (cache != null && !cache.isClosed()) { + cache.close(); + cache.getDistributedSystem().disconnect(); + } + } +}
