http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropagation_PartitionedRegionDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropagation_PartitionedRegionDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropagation_PartitionedRegionDUnitTest.java deleted file mode 100644 index 47dee96..0000000 --- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropagation_PartitionedRegionDUnitTest.java +++ /dev/null @@ -1,412 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.wan.serial; - -import org.junit.experimental.categories.Category; -import org.junit.Test; - -import static org.junit.Assert.*; - -import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase; -import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase; -import com.gemstone.gemfire.test.junit.categories.DistributedTest; - -import org.junit.experimental.categories.Category; - -import com.gemstone.gemfire.CancelException; -import com.gemstone.gemfire.cache.CacheClosedException; -import com.gemstone.gemfire.internal.cache.ForceReattemptException; -import com.gemstone.gemfire.internal.cache.wan.WANTestBase; -import com.gemstone.gemfire.test.dunit.AsyncInvocation; -import com.gemstone.gemfire.test.dunit.IgnoredException; -import com.gemstone.gemfire.test.junit.categories.FlakyTest; - -@Category(DistributedTest.class) -public class SerialWANPropagation_PartitionedRegionDUnitTest extends WANTestBase { - - private static final long serialVersionUID = 1L; - - public SerialWANPropagation_PartitionedRegionDUnitTest() { - super(); - } - - @Test - public void testPartitionedSerialPropagation() throws Exception { - - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 10, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 10, false, false, null, true )); - - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - //vm5.invoke(() -> WANTestBase.startSender( "ln" )); - - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", - 1000 )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR", 1000 )); - } - - @Test - public void testBothReplicatedAndPartitionedSerialPropagation() - throws Exception { - - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 10, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 10, false, false, null, true )); - - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5); - - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", - 1000 )); - vm5.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", - 1000 )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR", 1000 )); - } - - @Test - public void testSerialReplicatedAndPartitionedPropagation() throws Exception { - - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createSender( "lnSerial", - 2, false, 100, 10, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "lnSerial", - 2, false, 100, 10, false, false, null, true )); - - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "lnSerial", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "lnSerial", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "lnSerial", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "lnSerial", isOffHeap() )); - - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "lnSerial", 1, 100, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "lnSerial", 1, 100, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "lnSerial", 1, 100, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "lnSerial", 1, 100, isOffHeap() )); - - vm4.invoke(() -> WANTestBase.startSender( "lnSerial" )); - vm5.invoke(() -> WANTestBase.startSender( "lnSerial" )); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", - 1000 )); - vm5.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", - 1000 )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR", 1000 )); - } - - @Test - public void testSerialReplicatedAndSerialPartitionedPropagation() - throws Exception { - - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createSender( "lnSerial1", - 2, false, 100, 10, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "lnSerial1", - 2, false, 100, 10, false, false, null, true )); - - vm5.invoke(() -> WANTestBase.createSender( "lnSerial2", - 2, false, 100, 10, false, false, null, true )); - vm6.invoke(() -> WANTestBase.createSender( "lnSerial2", - 2, false, 100, 10, false, false, null, true )); - - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "lnSerial1", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "lnSerial1", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "lnSerial1", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "lnSerial1", isOffHeap() )); - - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "lnSerial2", 1, 100, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "lnSerial2", 1, 100, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "lnSerial2", 1, 100, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "lnSerial2", 1, 100, isOffHeap() )); - - startSenderInVMs("lnSerial1", vm4, vm5); - - startSenderInVMs("lnSerial2", vm5, vm6); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", - 1000 )); - vm5.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", - 1000 )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR", 1000 )); - } - - @Test - public void testPartitionedSerialPropagationToTwoWanSites() throws Exception { - - Integer lnPort = createFirstLocatorWithDSId(1); - Integer nyPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort )); - Integer tkPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(3,lnPort )); - - createCacheInVMs(nyPort, vm2); - vm2.invoke(() -> WANTestBase.createReceiver()); - createCacheInVMs(tkPort, vm3); - vm3.invoke(() -> WANTestBase.createReceiver()); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createSender( "lnSerial1", - 2, false, 100, 10, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "lnSerial1", - 2, false, 100, 10, false, false, null, true )); - - vm4.invoke(() -> WANTestBase.createSender( "lnSerial2", - 3, false, 100, 10, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "lnSerial2", - 3, false, 100, 10, false, false, null, true )); - - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - - startSenderInVMs("lnSerial1", vm4, vm5); - startSenderInVMs("lnSerial2", vm4, vm5); - - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "lnSerial1,lnSerial2", 1, 100, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "lnSerial1,lnSerial2", 1, 100, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "lnSerial1,lnSerial2", 1, 100, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "lnSerial1,lnSerial2", 1, 100, isOffHeap() )); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", - 1000 )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR", 1000 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR", 1000 )); - } - - @Test - public void testPartitionedSerialPropagationHA() throws Exception { - IgnoredException.addIgnoredException("Broken pipe"); - IgnoredException.addIgnoredException("Connection reset"); - IgnoredException.addIgnoredException("Unexpected IOException"); - - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 10, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 10, false, false, null, true )); - - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5); - - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - - //do initial 100 puts to create all the buckets - vm5.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 100 )); - - IgnoredException.addIgnoredException(CancelException.class.getName()); - IgnoredException.addIgnoredException(CacheClosedException.class.getName()); - IgnoredException.addIgnoredException(ForceReattemptException.class.getName()); - //start async puts - AsyncInvocation inv = vm5.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); - //close the cache on vm4 in between the puts - vm4.invoke(() -> WANTestBase.killSender()); - - inv.join(); - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR", 1000 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR", 1000 )); - } - - @Test - public void testPartitionedSerialPropagationWithParallelThreads() throws Exception { - - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - - createReceiverInVMs(vm2, vm3); - - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 10, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 10, false, false, null, true )); - - startSenderInVMs("ln", vm4, vm5); - - - - vm4.invoke(() -> WANTestBase.doMultiThreadedPuts( // TODO: eats exceptions - getTestMethodName() + "_PR", 1000 )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR", 1000 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR", 1000 )); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropagationsFeatureDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropagationsFeatureDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropagationsFeatureDUnitTest.java deleted file mode 100644 index f3b6765..0000000 --- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropagationsFeatureDUnitTest.java +++ /dev/null @@ -1,338 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.wan.serial; - -import org.junit.experimental.categories.Category; -import org.junit.Test; - -import static org.junit.Assert.*; - -import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase; -import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase; -import com.gemstone.gemfire.test.junit.categories.DistributedTest; - -import com.gemstone.gemfire.internal.cache.wan.WANTestBase; - - -@Category(DistributedTest.class) -public class SerialWANPropagationsFeatureDUnitTest extends WANTestBase{ - - private static final long serialVersionUID = 1L; - - public SerialWANPropagationsFeatureDUnitTest() { - super(); - } - - @Test - public void testSerialReplicatedWanWithOverflow() { - - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 10, 10, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 10, 10, false, false, null, true )); - - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5); - vm2.invoke(() -> addListenerToSleepAfterCreateEvent(1000, getTestMethodName() + "_RR")); - vm3.invoke(() -> addListenerToSleepAfterCreateEvent(1000, getTestMethodName() + "_RR")); - - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - - vm4.invoke(() -> WANTestBase.doHeavyPuts( - getTestMethodName() + "_RR", 15 )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 15, 240000 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 15, 240000 )); - } - - @Test - public void testSerialReplicatedWanWithPersistence() { - - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 10, false, true, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 10, false, true, null, true )); - - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5); - - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", - 1000 )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - - } - - @Test - public void testReplicatedSerialPropagationWithConflation() throws Exception { - - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 1000, true, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 1000, true, false, null, true )); - - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5); - - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", - 1000 )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - } - - @Test - public void testReplicatedSerialPropagationWithParallelThreads() - throws Exception { - - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 10, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 10, false, false, null, true )); - - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5); - - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - - vm4.invoke(() -> WANTestBase.doMultiThreadedPuts( - getTestMethodName() + "_RR", 1000 )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - } - - @Test - public void testSerialPropagationWithFilter() throws Exception { - - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2,lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 10, false, false, - new MyGatewayEventFilter(), true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 10, false, false, - new MyGatewayEventFilter(), true )); - - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName(), "ln", 1, 100, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName(), "ln", 1, 100, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName(), "ln", 1, 100, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName(), "ln", 1, 100, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5); - - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName(), null, 1, 100, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName(), null, 1, 100, isOffHeap() )); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 1000 )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName(), 800 )); - } - - @Test - public void testReplicatedSerialPropagationWithFilter() throws Exception { - - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 10, false, false, - new MyGatewayEventFilter(), true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 10, false, false, - new MyGatewayEventFilter(), true )); - - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName(), null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName(), null, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5); - - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName(), "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName(), "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName(), "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName(), "ln", isOffHeap() )); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 1000 )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName(), 800 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName(), 800 )); - } - - @Test - public void testReplicatedSerialPropagationWithFilter_AfterAck() - throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm6, vm7); - createReceiverInVMs(vm6, vm7); - - createCacheInVMs(lnPort, vm2, vm3, vm4, vm5); - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, false, 100, 10, false, false, - new MyGatewayEventFilter_AfterAck(), true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, false, 100, 10, false, false, - new MyGatewayEventFilter_AfterAck(), true )); - - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName(), null, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName(), null, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5); - - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName(), "ln", isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName(), "ln", isOffHeap() )); - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName(), "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName(), "ln", isOffHeap() )); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 1000 )); - - vm4.invoke(() -> WANTestBase.validateQueueContents( "ln", - 0 )); - vm5.invoke(() -> WANTestBase.validateQueueContents( "ln", - 0 )); - - Integer vm4Acks = (Integer)vm4.invoke(() -> WANTestBase.validateAfterAck( "ln")); - Integer vm5Acks = (Integer)vm5.invoke(() -> WANTestBase.validateAfterAck( "ln")); - - assertEquals(2000, (vm4Acks + vm5Acks)); - - vm6.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName(), 1000 )); - vm7.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName(), 1000 )); - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANStatsDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANStatsDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANStatsDUnitTest.java deleted file mode 100644 index 2019cac..0000000 --- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANStatsDUnitTest.java +++ /dev/null @@ -1,588 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.wan.serial; - -import org.junit.experimental.categories.Category; -import org.junit.Test; - -import static org.junit.Assert.*; - -import com.gemstone.gemfire.test.junit.categories.DistributedTest; - -import static com.gemstone.gemfire.test.dunit.Wait.*; -import static com.gemstone.gemfire.test.dunit.IgnoredException.*; - -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy; -import com.gemstone.gemfire.internal.cache.wan.WANTestBase; -import com.gemstone.gemfire.test.dunit.AsyncInvocation; -import com.jayway.awaitility.Awaitility; - -@Category(DistributedTest.class) -public class SerialWANStatsDUnitTest extends WANTestBase { - - private static final long serialVersionUID = 1L; - - private String testName; - - public SerialWANStatsDUnitTest() { - super(); - } - - @Override - protected final void postSetUpWANTestBase() throws Exception { - this.testName = getTestMethodName(); - addIgnoredException("java.net.ConnectException"); - addIgnoredException("java.net.SocketException"); - addIgnoredException("Unexpected IOException"); - } - - @Test - public void testReplicatedSerialPropagation() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2); - vm2.invoke(() -> WANTestBase.createReceiver()); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 10, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 10, false, false, null, true )); - - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - testName + "_RR", null, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5); - - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - testName + "_RR", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - testName + "_RR", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - testName + "_RR", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - testName + "_RR", "ln", isOffHeap() )); - - vm5.invoke(() -> WANTestBase.doPuts( testName + "_RR", - 1000 )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - testName + "_RR", 1000 )); - - pause(2000); - vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(100, 1000, 1000 )); - - vm4.invoke(() -> WANTestBase.checkQueueStats("ln", - 0, 1000, 1000, 1000)); - vm4.invoke(() -> WANTestBase.checkBatchStats("ln", - 100)); - - vm5.invoke(() -> WANTestBase.checkQueueStats("ln", - 0, 1000, 0, 0)); - vm5.invoke(() -> WANTestBase.checkBatchStats("ln", - 0)); - - } - - @Test - public void testReplicatedSerialPropagationWithMultipleDispatchers() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2); - vm2.invoke(() -> WANTestBase.createReceiver()); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createSenderWithMultipleDispatchers( "ln", 2, - false, 100, 10, false, false, null, true, 2, OrderPolicy.KEY )); - vm5.invoke(() -> WANTestBase.createSenderWithMultipleDispatchers( "ln", 2, - false, 100, 10, false, false, null, true, 2, OrderPolicy.KEY )); - - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - testName + "_RR", null, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5); - - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - testName + "_RR", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - testName + "_RR", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - testName + "_RR", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - testName + "_RR", "ln", isOffHeap() )); - - vm5.invoke(() -> WANTestBase.doPuts( testName + "_RR", - 1000 )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - testName + "_RR", 1000 )); - - pause(2000); - vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(100, 1000, 1000 )); - - vm4.invoke(() -> WANTestBase.checkQueueStats("ln", - 0, 1000, 1000, 1000)); - vm4.invoke(() -> WANTestBase.checkBatchStats("ln", - 100)); - - vm5.invoke(() -> WANTestBase.checkQueueStats("ln", - 0, 1000, 0, 0)); - vm5.invoke(() -> WANTestBase.checkBatchStats("ln", - 0)); - - } - - @Test - public void testWANStatsTwoWanSites() throws Exception { - - Integer lnPort = createFirstLocatorWithDSId(1); - Integer nyPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - Integer tkPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 3, lnPort )); - - createCacheInVMs(nyPort, vm2); - vm2.invoke(() -> WANTestBase.createReceiver()); - createCacheInVMs(tkPort, vm3); - vm3.invoke(() -> WANTestBase.createReceiver()); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createSender( "lnSerial1", - 2, false, 100, 10, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "lnSerial1", - 2, false, 100, 10, false, false, null, true )); - - vm4.invoke(() -> WANTestBase.createSender( "lnSerial2", - 3, false, 100, 10, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "lnSerial2", - 3, false, 100, 10, false, false, null, true )); - - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - testName + "_RR", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - testName + "_RR", null, isOffHeap() )); - - startSenderInVMs("lnSerial1", vm4, vm5); - startSenderInVMs("lnSerial2", vm4, vm5); - - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - testName + "_RR", "lnSerial1,lnSerial2", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - testName + "_RR", "lnSerial1,lnSerial2", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - testName + "_RR", "lnSerial1,lnSerial2", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - testName + "_RR", "lnSerial1,lnSerial2", isOffHeap() )); - - vm4.invoke(() -> WANTestBase.doPuts( testName + "_RR", - 1000 )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - testName + "_RR", 1000 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - testName + "_RR", 1000 )); - - pause(2000); - vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(100, 1000, 1000 )); - vm3.invoke(() -> WANTestBase.checkGatewayReceiverStats(100, 1000, 1000 )); - - vm4.invoke(() -> WANTestBase.checkQueueStats("lnSerial1", - 0, 1000, 1000, 1000)); - vm4.invoke(() -> WANTestBase.checkBatchStats("lnSerial1", - 100)); - vm4.invoke(() -> WANTestBase.checkQueueStats("lnSerial2", - 0, 1000, 1000, 1000)); - vm4.invoke(() -> WANTestBase.checkBatchStats("lnSerial2", - 100)); - vm5.invoke(() -> WANTestBase.checkQueueStats("lnSerial1", - 0, 1000, 0, 0)); - vm5.invoke(() -> WANTestBase.checkBatchStats("lnSerial1", - 0)); - vm5.invoke(() -> WANTestBase.checkQueueStats("lnSerial2", - 0, 1000, 0, 0)); - vm5.invoke(() -> WANTestBase.checkBatchStats("lnSerial2", - 0)); - - } - - @Test - public void testReplicatedSerialPropagationHA() throws Exception { - - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - vm2.invoke(() -> WANTestBase.createCache( nyPort )); - vm2.invoke(() -> WANTestBase.createReceiver()); - - vm4.invoke(() -> WANTestBase.createCache(lnPort )); - vm5.invoke(() -> WANTestBase.createCache(lnPort )); - vm6.invoke(() -> WANTestBase.createCache(lnPort )); - vm7.invoke(() -> WANTestBase.createCache(lnPort )); - - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 10, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 10, false, false, null, true )); - - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - testName + "_RR", null, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5); - - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - testName + "_RR", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - testName + "_RR", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - testName + "_RR", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - testName + "_RR", "ln", isOffHeap() )); - - AsyncInvocation inv1 = vm5.invokeAsync(() -> WANTestBase.doPuts( testName + "_RR", 10000 )); - pause(2000); - AsyncInvocation inv2 = vm4.invokeAsync(() -> WANTestBase.killSender( "ln" )); - Boolean isKilled = Boolean.FALSE; - try { - isKilled = (Boolean)inv2.getResult(); - } - catch (Throwable e) { - fail("Unexpected exception while killing a sender"); - } - AsyncInvocation inv3 = null; - if(!isKilled){ - inv3 = vm5.invokeAsync(() -> WANTestBase.killSender( "ln" )); - inv3.join(); - } - inv1.join(); - inv2.join(); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - testName + "_RR", 10000 )); - - vm2.invoke(() -> WANTestBase.checkGatewayReceiverStatsHA(1000, 10000, 10000 )); - - vm5.invoke(() -> WANTestBase.checkStats_Failover("ln", 10000)); - } - - @Test - public void testReplicatedSerialPropagationUnprocessedEvents() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - //these are part of remote site - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - //these are part of local site - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - //senders are created on local site - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 20, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 20, false, false, null, true )); - - //create one RR (RR_1) on remote site - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - testName + "_RR_1", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - testName + "_RR_1", null, isOffHeap() )); - - //create another RR (RR_2) on remote site - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - testName + "_RR_2", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - testName + "_RR_2", null, isOffHeap() )); - - //start the senders on local site - startSenderInVMs("ln", vm4, vm5); - - //create one RR (RR_1) on local site - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - testName + "_RR_1", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - testName + "_RR_1", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - testName + "_RR_1", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - testName + "_RR_1", "ln", isOffHeap() )); - - //create another RR (RR_2) on local site - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - testName + "_RR_2", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - testName + "_RR_2", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - testName + "_RR_2", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - testName + "_RR_2", "ln", isOffHeap() )); - - //start puts in RR_1 in another thread - vm4.invoke(() -> WANTestBase.doPuts( testName + "_RR_1", 1000 )); - //do puts in RR_2 in main thread - vm4.invoke(() -> WANTestBase.doPuts( testName + "_RR_2", 500 )); - //sleep for some time to let all the events propagate to remote site - Thread.sleep(20); - //vm4.invoke(() -> WANTestBase.verifyQueueSize( "ln", 0 )); - vm2.invoke(() -> WANTestBase.validateRegionSize( - testName + "_RR_1", 1000 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - testName + "_RR_2", 500 )); - - pause(2000); - vm4.invoke(() -> WANTestBase.checkQueueStats("ln", - 0, 1500, 1500, 1500)); - vm4.invoke(() -> WANTestBase.checkBatchStats("ln", - 75)); - vm4.invoke(() -> WANTestBase.checkUnProcessedStats("ln", 0)); - - - vm5.invoke(() -> WANTestBase.checkQueueStats("ln", - 0, 1500, 0, 0)); - vm5.invoke(() -> WANTestBase.checkBatchStats("ln", - 0)); - vm5.invoke(() -> WANTestBase.checkUnProcessedStats("ln", 1500)); - } - - /** - * - * Not Disabled - see ticket #52118 - * - * NOTE: The test failure is avoided by having a larger number of puts operation so - * that WANTestBase.verifyRegionQueueNotEmpty("ln" )) is successful as there is a - * significant delay during the high number of puts. - * - * In future if this failure reappears, the put operations must be increase or a better fix must be found. - * - * 1 region and sender configured on local site and 1 region and a - * receiver configured on remote site. Puts to the local region are in progress. - * Remote region is destroyed in the middle. - * - * @throws Exception - */ - @Test - public void testReplicatedSerialPropagationWithRemoteRegionDestroy() throws Exception { - int numEntries = 20000; - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - //these are part of remote site - vm2.invoke(() -> WANTestBase.createCache( nyPort )); - vm2.invoke(() -> WANTestBase.createReceiver()); - - //these are part of local site - vm4.invoke(() -> WANTestBase.createCache( lnPort )); - vm5.invoke(() -> WANTestBase.createCache( lnPort )); - vm6.invoke(() -> WANTestBase.createCache( lnPort )); - vm7.invoke(() -> WANTestBase.createCache( lnPort )); - - //senders are created on local site - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 100, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 100, false, false, null, true )); - - //create one RR (RR_1) on remote site - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - testName + "_RR_1", null, isOffHeap() )); - //This is to cause a scenario where we have received at least X events and want to slow the receiver - vm2.invoke(() -> WANTestBase.longPauseAfterNumEvents(500, 200)); - //start the senders on local site - startSenderInVMs("ln", vm4, vm5); - - //create one RR (RR_1) on local site - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - testName + "_RR_1", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - testName + "_RR_1", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - testName + "_RR_1", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - testName + "_RR_1", "ln", isOffHeap() )); - - //start puts in RR_1 in another thread - AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.doPuts( testName + "_RR_1", numEntries )); - //destroy RR_1 in remote site - vm2.invoke(() -> WANTestBase.destroyRegion( testName + "_RR_1", 500)); - - try { - inv1.join(); - } catch (InterruptedException e) { - e.printStackTrace(); - fail(); - } - - //assuming some events might have been dispatched before the remote region was destroyed, - //sender's region queue will have events less than 1000 but the queue will not be empty. - //NOTE: this much verification might be sufficient in DUnit. Hydra will take care of - //more in depth validations. - vm4.invoke(() -> WANTestBase.verifyRegionQueueNotEmpty("ln" )); - - //verify that all is well in local site. All the events should be present in local region - vm4.invoke(() -> WANTestBase.validateRegionSize( - testName + "_RR_1", numEntries )); - - //like a latch to guarantee at least one exception returned - vm4.invoke(() -> Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> WANTestBase.verifyQueueSize("ln", 0))); - - vm4.invoke(() -> WANTestBase.checkBatchStats("ln", true, true)); - - vm5.invoke(() -> WANTestBase.checkUnProcessedStats("ln", numEntries)); - - vm2.invoke(() -> WANTestBase.checkExceptionStats(1)); - - } - - @Test - public void testSerialPropagationWithFilter() throws Exception { - - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2,lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 10, false, false, - new MyGatewayEventFilter(), true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 10, false, false, - new MyGatewayEventFilter(), true )); - - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - testName, "ln", 1, 100, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - testName, "ln", 1, 100, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createPartitionedRegion( - testName, "ln", 1, 100, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createPartitionedRegion( - testName, "ln", 1, 100, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5); - - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - testName, null, 1, 100, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - testName, null, 1, 100, isOffHeap() )); - - vm4.invoke(() -> WANTestBase.doPuts( testName, 1000 )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - testName, 800 )); - - pause(2000); - vm4.invoke(() -> WANTestBase.checkQueueStats("ln", - 0, 1000, 900, 800)); - vm4.invoke(() -> WANTestBase.checkEventFilteredStats("ln", - 200)); - vm4.invoke(() -> WANTestBase.checkBatchStats("ln", - 80)); - vm4.invoke(() -> WANTestBase.checkUnProcessedStats("ln", 0)); - - - vm5.invoke(() -> WANTestBase.checkQueueStats("ln", - 0, 1000, 0, 0)); - vm5.invoke(() -> WANTestBase.checkBatchStats("ln", - 0)); - vm5.invoke(() -> WANTestBase.checkUnProcessedStats("ln",900)); - } - - @Test - public void testSerialPropagationConflation() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 10, true, false, null, true )); - - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - testName, "ln", 0, 100, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - testName, "ln", 0, 100, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createPartitionedRegion( - testName, "ln", 0, 100, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createPartitionedRegion( - testName, "ln", 0, 100, isOffHeap() )); - - vm4.invoke(() -> WANTestBase.startSender( "ln" )); - - vm4.invoke(() -> WANTestBase.pauseSender( "ln" )); - - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - testName, null,1, 100, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - testName, null,1, 100, isOffHeap() )); - - final Map keyValues = new HashMap(); - final Map updateKeyValues = new HashMap(); - for(int i=0; i< 1000; i++) { - keyValues.put(i, i); - } - - vm4.invoke(() -> WANTestBase.putGivenKeyValue( testName, keyValues )); - - vm4.invoke(() -> WANTestBase.checkQueueSize( "ln", keyValues.size() )); - for(int i=0;i<500;i++) { - updateKeyValues.put(i, i+"_updated"); - } - - vm4.invoke(() -> WANTestBase.putGivenKeyValue( testName, updateKeyValues )); - - vm4.invoke(() -> WANTestBase.checkQueueSize( "ln", keyValues.size() + updateKeyValues.size() )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - testName, 0 )); - - vm4.invoke(() -> WANTestBase.putGivenKeyValue( testName, updateKeyValues )); - - vm4.invoke(() -> WANTestBase.checkQueueSize( "ln", keyValues.size() + updateKeyValues.size() )); - - vm4.invoke(() -> WANTestBase.resumeSender( "ln" )); - - keyValues.putAll(updateKeyValues); - vm2.invoke(() -> WANTestBase.validateRegionSize( - testName, keyValues.size() )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - testName, keyValues.size() )); - - vm2.invoke(() -> WANTestBase.validateRegionContents( - testName, keyValues )); - vm3.invoke(() -> WANTestBase.validateRegionContents( - testName, keyValues )); - - pause(2000); - vm4.invoke(() -> WANTestBase.checkQueueStats("ln", - 0, 2000, 2000, 1500)); - vm4.invoke(() -> WANTestBase.checkConflatedStats("ln", - 500)); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/wancommand/WANCommandTestBase.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/wancommand/WANCommandTestBase.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/wancommand/WANCommandTestBase.java deleted file mode 100644 index f9b46ef..0000000 --- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/wancommand/WANCommandTestBase.java +++ /dev/null @@ -1,490 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.wan.wancommand; - -import com.gemstone.gemfire.cache.Cache; -import com.gemstone.gemfire.cache.CacheFactory; -import com.gemstone.gemfire.cache.DiskStore; -import com.gemstone.gemfire.cache.DiskStoreFactory; -import com.gemstone.gemfire.cache.wan.*; -import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy; -import com.gemstone.gemfire.distributed.DistributedMember; -import com.gemstone.gemfire.distributed.Locator; -import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; -import com.gemstone.gemfire.internal.AvailablePort; -import com.gemstone.gemfire.internal.AvailablePortHelper; -import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender; -import com.gemstone.gemfire.management.ManagementService; -import com.gemstone.gemfire.management.internal.cli.commands.CliCommandTestBase; -import com.gemstone.gemfire.test.dunit.*; - -import javax.management.remote.JMXConnectorServer; -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; -import java.util.Set; - -import static com.gemstone.gemfire.distributed.ConfigurationProperties.*; -import static com.gemstone.gemfire.test.dunit.Assert.assertEquals; -import static com.gemstone.gemfire.test.dunit.Assert.fail; - -public abstract class WANCommandTestBase extends CliCommandTestBase { - - static Cache cache; - private JMXConnectorServer jmxConnectorServer; - private ManagementService managementService; -// public String jmxHost; -// public int jmxPort; - - static VM vm0; - static VM vm1; - static VM vm2; - static VM vm3; - static VM vm4; - static VM vm5; - static VM vm6; - static VM vm7; - - @Override - public final void postSetUpCliCommandTestBase() throws Exception { - final Host host = Host.getHost(0); - vm0 = host.getVM(0); - vm1 = host.getVM(1); - vm2 = host.getVM(2); - vm3 = host.getVM(3); - vm4 = host.getVM(4); - vm5 = host.getVM(5); - vm6 = host.getVM(6); - vm7 = host.getVM(7); - enableManagement(); - } - - public Integer createFirstLocatorWithDSId(int dsId) { - int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); - Properties props = getDistributedSystemProperties(); - props.setProperty(MCAST_PORT, "0"); - props.setProperty(DISTRIBUTED_SYSTEM_ID, ""+dsId); - props.setProperty(LOCATORS, "localhost[" + port + "]"); - props.setProperty(START_LOCATOR, "localhost[" + port + "],server=true,peer=true,hostname-for-clients=localhost"); - InternalDistributedSystem ds = getSystem(props); - cache = CacheFactory.create(ds); - return port; - } - - public Integer createFirstRemoteLocator(int dsId, int remoteLocPort) { - int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); - Properties props = getDistributedSystemProperties(); - props.setProperty(MCAST_PORT, "0"); - props.setProperty(DISTRIBUTED_SYSTEM_ID, ""+dsId); - props.setProperty(LOCATORS, "localhost[" + port + "]"); - props.setProperty(START_LOCATOR, "localhost[" + port + "],server=true,peer=true,hostname-for-clients=localhost"); - props.setProperty(REMOTE_LOCATORS, "localhost[" + remoteLocPort + "]"); - getSystem(props); - return port; - } - - public void createCache(Integer locPort){ - Properties props = getDistributedSystemProperties(); - props.setProperty(MCAST_PORT, "0"); - props.setProperty(LOCATORS, "localhost[" + locPort + "]"); - InternalDistributedSystem ds = getSystem(props); - cache = CacheFactory.create(ds); - } - - public void createCacheWithGroups(Integer locPort, String groups){ - Properties props = getDistributedSystemProperties(); - props.setProperty(MCAST_PORT, "0"); - props.setProperty(LOCATORS, "localhost[" + locPort + "]"); - props.setProperty(GROUPS, groups); - InternalDistributedSystem ds = getSystem(props); - cache = CacheFactory.create(ds); - } - - public void createSender(String dsName, int remoteDsId, - boolean isParallel, Integer maxMemory, - Integer batchSize, boolean isConflation, boolean isPersistent, - GatewayEventFilter filter, boolean isManualStart) { - File persistentDirectory = new File(dsName +"_disk_"+System.currentTimeMillis()+"_" + VM.getCurrentVMNum()); - persistentDirectory.mkdir(); - DiskStoreFactory dsf = cache.createDiskStoreFactory(); - File [] dirs1 = new File[] {persistentDirectory}; - if(isParallel) { - GatewaySenderFactory gateway = cache.createGatewaySenderFactory(); - gateway.setParallel(true); - gateway.setMaximumQueueMemory(maxMemory); - gateway.setBatchSize(batchSize); - gateway.setManualStart(isManualStart); - if (filter != null) { - gateway.addGatewayEventFilter(filter); - } - if(isPersistent) { - gateway.setPersistenceEnabled(true); - gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName).getName()); - } - else { - DiskStore store = dsf.setDiskDirs(dirs1).create(dsName); - gateway.setDiskStoreName(store.getName()); - } - gateway.setBatchConflationEnabled(isConflation); - gateway.create(dsName, remoteDsId); - - }else { - GatewaySenderFactory gateway = cache.createGatewaySenderFactory(); - gateway.setMaximumQueueMemory(maxMemory); - gateway.setBatchSize(batchSize); - gateway.setManualStart(isManualStart); - if (filter != null) { - gateway.addGatewayEventFilter(filter); - } - gateway.setBatchConflationEnabled(isConflation); - if(isPersistent) { - gateway.setPersistenceEnabled(true); - gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName).getName()); - } - else { - DiskStore store = dsf.setDiskDirs(dirs1).create(dsName); - gateway.setDiskStoreName(store.getName()); - } - gateway.create(dsName, remoteDsId); - } - } - - public void startSender(String senderId){ - final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect"); - try { - Set<GatewaySender> senders = cache.getGatewaySenders(); - GatewaySender sender = null; - for (GatewaySender s : senders) { - if (s.getId().equals(senderId)) { - sender = s; - break; - } - } - sender.start(); - } finally { - exln.remove(); - } - } - - public void pauseSender(String senderId){ - final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect"); - try { - Set<GatewaySender> senders = cache.getGatewaySenders(); - GatewaySender sender = null; - for (GatewaySender s : senders) { - if (s.getId().equals(senderId)) { - sender = s; - break; - } - } - sender.pause(); - } finally { - exln.remove(); - } - } - - public int createAndStartReceiver(int locPort) { - Properties props = getDistributedSystemProperties(); - props.setProperty(MCAST_PORT, "0"); - props.setProperty(LOCATORS, "localhost[" + locPort - + "]"); - - InternalDistributedSystem ds = getSystem(props); - cache = CacheFactory.create(ds); - GatewayReceiverFactory fact = cache.createGatewayReceiverFactory(); - int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); - fact.setStartPort(port); - fact.setEndPort(port); - fact.setManualStart(true); - GatewayReceiver receiver = fact.create(); - try { - receiver.start(); - } catch (IOException e) { - e.printStackTrace(); - fail("Test " + getName() + " failed to start GatewayReceiver"); - } - return port; - } - - public int createReceiver(int locPort) { - Properties props = getDistributedSystemProperties(); - props.setProperty(MCAST_PORT, "0"); - props.setProperty(LOCATORS, "localhost[" + locPort - + "]"); - - InternalDistributedSystem ds = getSystem(props); - cache = CacheFactory.create(ds); - GatewayReceiverFactory fact = cache.createGatewayReceiverFactory(); - fact.setStartPort(AvailablePort.AVAILABLE_PORTS_LOWER_BOUND); - fact.setEndPort(AvailablePort.AVAILABLE_PORTS_UPPER_BOUND); - fact.setManualStart(true); - GatewayReceiver receiver = fact.create(); - return receiver.getPort(); - } - - public int createReceiverWithGroup(int locPort, String groups) { - Properties props = getDistributedSystemProperties(); - props.setProperty(MCAST_PORT, "0"); - props.setProperty(LOCATORS, "localhost[" + locPort - + "]"); - props.setProperty(GROUPS, groups); - - InternalDistributedSystem ds = getSystem(props); - cache = CacheFactory.create(ds); - GatewayReceiverFactory fact = cache.createGatewayReceiverFactory(); - fact.setStartPort(AvailablePort.AVAILABLE_PORTS_LOWER_BOUND); - fact.setEndPort(AvailablePort.AVAILABLE_PORTS_UPPER_BOUND); - fact.setManualStart(true); - GatewayReceiver receiver = fact.create(); - return receiver.getPort(); - - } - - public void startReceiver() { - try { - Set<GatewayReceiver> receivers = cache.getGatewayReceivers(); - for (GatewayReceiver receiver : receivers) { - receiver.start(); - } - } catch (IOException e) { - e.printStackTrace(); - fail("Test " + getName() + " failed to start GatewayReceiver"); - } - } - - public void stopReceiver() { - Set<GatewayReceiver> receivers = cache.getGatewayReceivers(); - for (GatewayReceiver receiver : receivers) { - receiver.stop(); - } - } - - public int createAndStartReceiverWithGroup(int locPort, String groups) { - Properties props = getDistributedSystemProperties(); - props.setProperty(MCAST_PORT, "0"); - props.setProperty(LOCATORS, "localhost[" + locPort - + "]"); - props.setProperty(GROUPS, groups); - - InternalDistributedSystem ds = getSystem(props); - cache = CacheFactory.create(ds); - GatewayReceiverFactory fact = cache.createGatewayReceiverFactory(); - int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); - fact.setStartPort(port); - fact.setEndPort(port); - fact.setManualStart(true); - GatewayReceiver receiver = fact.create(); - try { - receiver.start(); - } catch (IOException e) { - e.printStackTrace(); - fail("Test " + getName() + " failed to start GatewayReceiver on port " + port); - } - return port; - } - - public DistributedMember getMember(){ - return cache.getDistributedSystem().getDistributedMember(); - } - - - public int getLocatorPort(){ - return Locator.getLocators().get(0).getPort(); - } - - /** - * Enable system property gemfire.disableManagement false in each VM. - */ - public void enableManagement() { - Invoke.invokeInEveryVM(new SerializableRunnable("Enable Management") { - public void run() { - System.setProperty(InternalDistributedSystem.DISABLE_MANAGEMENT_PROPERTY, "false"); - } - }); - - } - - public void verifySenderState(String senderId, boolean isRunning, boolean isPaused) { - final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect"); - try { - Set<GatewaySender> senders = cache.getGatewaySenders(); - AbstractGatewaySender sender = null; - for (GatewaySender s : senders) { - if (s.getId().equals(senderId)) { - sender = (AbstractGatewaySender) s; - break; - } - } - - assertEquals(isRunning, sender.isRunning()); - assertEquals(isPaused, sender.isPaused()); - } finally { - exln.remove(); - } - } - - public void verifySenderAttributes(String senderId, int remoteDsID, - boolean isParallel, boolean manualStart, int socketBufferSize, - int socketReadTimeout, boolean enableBatchConflation, int batchSize, - int batchTimeInterval, boolean enablePersistence, - boolean diskSynchronous, int maxQueueMemory, int alertThreshold, - int dispatcherThreads, OrderPolicy orderPolicy, - List<String> expectedGatewayEventFilters, - List<String> expectedGatewayTransportFilters) { - - Set<GatewaySender> senders = cache.getGatewaySenders(); - AbstractGatewaySender sender = null; - for (GatewaySender s : senders) { - if (s.getId().equals(senderId)) { - sender = (AbstractGatewaySender)s; - break; - } - } - assertEquals("remoteDistributedSystemId", remoteDsID, sender - .getRemoteDSId()); - assertEquals("isParallel", isParallel, sender.isParallel()); - assertEquals("manualStart", manualStart, sender.isManualStart()); - assertEquals("socketBufferSize", socketBufferSize, sender - .getSocketBufferSize()); - assertEquals("socketReadTimeout", socketReadTimeout, sender - .getSocketReadTimeout()); - assertEquals("enableBatchConflation", enableBatchConflation, sender - .isBatchConflationEnabled()); - assertEquals("batchSize", batchSize, sender.getBatchSize()); - assertEquals("batchTimeInterval", batchTimeInterval, sender - .getBatchTimeInterval()); - assertEquals("enablePersistence", enablePersistence, sender - .isPersistenceEnabled()); - assertEquals("diskSynchronous", diskSynchronous, sender.isDiskSynchronous()); - assertEquals("maxQueueMemory", maxQueueMemory, sender - .getMaximumQueueMemory()); - assertEquals("alertThreshold", alertThreshold, sender.getAlertThreshold()); - assertEquals("dispatcherThreads", dispatcherThreads, sender - .getDispatcherThreads()); - assertEquals("orderPolicy", orderPolicy, sender.getOrderPolicy()); - - // verify GatewayEventFilters - if (expectedGatewayEventFilters != null) { - assertEquals("gatewayEventFilters", expectedGatewayEventFilters.size(), - sender.getGatewayEventFilters().size()); - - List<GatewayEventFilter> actualGatewayEventFilters = sender - .getGatewayEventFilters(); - List<String> actualEventFilterClassNames = new ArrayList<String>( - actualGatewayEventFilters.size()); - for (GatewayEventFilter filter : actualGatewayEventFilters) { - actualEventFilterClassNames.add(filter.getClass().getName()); - } - - for (String expectedGatewayEventFilter : expectedGatewayEventFilters) { - if (!actualEventFilterClassNames.contains(expectedGatewayEventFilter)) { - fail("GatewayEventFilter " + expectedGatewayEventFilter - + " is not added to the GatewaySender"); - } - } - } - - // verify GatewayTransportFilters - if (expectedGatewayTransportFilters != null) { - assertEquals("gatewayTransportFilters", expectedGatewayTransportFilters - .size(), sender.getGatewayTransportFilters().size()); - List<GatewayTransportFilter> actualGatewayTransportFilters = sender - .getGatewayTransportFilters(); - List<String> actualTransportFilterClassNames = new ArrayList<String>( - actualGatewayTransportFilters.size()); - for (GatewayTransportFilter filter : actualGatewayTransportFilters) { - actualTransportFilterClassNames.add(filter.getClass().getName()); - } - - for (String expectedGatewayTransportFilter : expectedGatewayTransportFilters) { - if (!actualTransportFilterClassNames - .contains(expectedGatewayTransportFilter)) { - fail("GatewayTransportFilter " + expectedGatewayTransportFilter - + " is not added to the GatewaySender."); - } - } - } - } - - public void verifyReceiverState(boolean isRunning) { - Set<GatewayReceiver> receivers = cache.getGatewayReceivers(); - for (GatewayReceiver receiver : receivers) { - assertEquals(isRunning, receiver.isRunning()); - } - } - - public void verifyReceiverCreationWithAttributes(boolean isRunning, - int startPort, int endPort, String bindAddress, int maxTimeBetweenPings, - int socketBufferSize, List<String> expectedGatewayTransportFilters) { - - Set<GatewayReceiver> receivers = cache.getGatewayReceivers(); - assertEquals("Number of receivers is incorrect", 1, receivers.size()); - for (GatewayReceiver receiver : receivers) { - assertEquals("isRunning", isRunning, receiver.isRunning()); - assertEquals("startPort", startPort, receiver.getStartPort()); - assertEquals("endPort", endPort, receiver.getEndPort()); - assertEquals("bindAddress", bindAddress, receiver.getBindAddress()); - assertEquals("maximumTimeBetweenPings", maxTimeBetweenPings, receiver - .getMaximumTimeBetweenPings()); - assertEquals("socketBufferSize", socketBufferSize, receiver - .getSocketBufferSize()); - - // verify GatewayTransportFilters - if (expectedGatewayTransportFilters != null) { - assertEquals("gatewayTransportFilters", expectedGatewayTransportFilters - .size(), receiver.getGatewayTransportFilters().size()); - List<GatewayTransportFilter> actualGatewayTransportFilters = receiver - .getGatewayTransportFilters(); - List<String> actualTransportFilterClassNames = new ArrayList<String>( - actualGatewayTransportFilters.size()); - for (GatewayTransportFilter filter : actualGatewayTransportFilters) { - actualTransportFilterClassNames.add(filter.getClass().getName()); - } - - for (String expectedGatewayTransportFilter : expectedGatewayTransportFilters) { - if (!actualTransportFilterClassNames - .contains(expectedGatewayTransportFilter)) { - fail("GatewayTransportFilter " + expectedGatewayTransportFilter - + " is not added to the GatewayReceiver."); - } - } - } - } - } - - @Override - public final void postTearDownCacheTestCase() throws Exception { - closeCacheAndDisconnect(); - vm0.invoke(() -> WANCommandTestBase.closeCacheAndDisconnect()); - vm1.invoke(() -> WANCommandTestBase.closeCacheAndDisconnect()); - vm2.invoke(() -> WANCommandTestBase.closeCacheAndDisconnect()); - vm3.invoke(() -> WANCommandTestBase.closeCacheAndDisconnect()); - vm4.invoke(() -> WANCommandTestBase.closeCacheAndDisconnect()); - vm5.invoke(() -> WANCommandTestBase.closeCacheAndDisconnect()); - vm6.invoke(() -> WANCommandTestBase.closeCacheAndDisconnect()); - vm7.invoke(() -> WANCommandTestBase.closeCacheAndDisconnect()); - } - - public static void closeCacheAndDisconnect() { - if (cache != null && !cache.isClosed()) { - cache.close(); - cache.getDistributedSystem().disconnect(); - } - } -}
