http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/offheap/SerialWANPersistenceEnabledGatewaySenderOffHeapDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/offheap/SerialWANPersistenceEnabledGatewaySenderOffHeapDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/offheap/SerialWANPersistenceEnabledGatewaySenderOffHeapDUnitTest.java deleted file mode 100644 index 5e32bb6..0000000 --- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/offheap/SerialWANPersistenceEnabledGatewaySenderOffHeapDUnitTest.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.wan.offheap; - -import org.junit.experimental.categories.Category; -import org.junit.Test; - -import static org.junit.Assert.*; - -import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase; -import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase; -import com.gemstone.gemfire.test.junit.categories.DistributedTest; - -import com.gemstone.gemfire.internal.cache.wan.serial.SerialWANPersistenceEnabledGatewaySenderDUnitTest; - -@SuppressWarnings("serial") -@Category(DistributedTest.class) -public class SerialWANPersistenceEnabledGatewaySenderOffHeapDUnitTest extends - SerialWANPersistenceEnabledGatewaySenderDUnitTest { - - public SerialWANPersistenceEnabledGatewaySenderOffHeapDUnitTest() { - super(); - } - - @Override - public boolean isOffHeap() { - return true; - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/offheap/SerialWANPropagationOffHeapDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/offheap/SerialWANPropagationOffHeapDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/offheap/SerialWANPropagationOffHeapDUnitTest.java deleted file mode 100644 index d6cecd1..0000000 --- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/offheap/SerialWANPropagationOffHeapDUnitTest.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.wan.offheap; - -import org.junit.experimental.categories.Category; - -import com.gemstone.gemfire.test.junit.categories.DistributedTest; - -import com.gemstone.gemfire.internal.cache.wan.serial.SerialWANPropagationDUnitTest; - -@SuppressWarnings("serial") -@Category(DistributedTest.class) -public class SerialWANPropagationOffHeapDUnitTest extends SerialWANPropagationDUnitTest { - - public SerialWANPropagationOffHeapDUnitTest() { - super(); - } - - @Override - public boolean isOffHeap() { - return true; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/offheap/SerialWANPropagation_PartitionedRegionOffHeapDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/offheap/SerialWANPropagation_PartitionedRegionOffHeapDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/offheap/SerialWANPropagation_PartitionedRegionOffHeapDUnitTest.java deleted file mode 100644 index 4abd6e6..0000000 --- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/offheap/SerialWANPropagation_PartitionedRegionOffHeapDUnitTest.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.wan.offheap; - -import org.junit.experimental.categories.Category; - -import com.gemstone.gemfire.test.junit.categories.DistributedTest; - -import com.gemstone.gemfire.internal.cache.wan.serial.SerialWANPropagation_PartitionedRegionDUnitTest; - -@SuppressWarnings("serial") -@Category(DistributedTest.class) -public class SerialWANPropagation_PartitionedRegionOffHeapDUnitTest - extends SerialWANPropagation_PartitionedRegionDUnitTest { - - public SerialWANPropagation_PartitionedRegionOffHeapDUnitTest() { - super(); - } - - @Override - public boolean isOffHeap() { - return true; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderOperation_2_DUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderOperation_2_DUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderOperation_2_DUnitTest.java deleted file mode 100644 index e0f29db..0000000 --- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderOperation_2_DUnitTest.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.wan.parallel; - -import org.junit.experimental.categories.Category; -import org.junit.Test; - -import static org.junit.Assert.*; - -import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase; -import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase; -import com.gemstone.gemfire.test.junit.categories.DistributedTest; - -import com.gemstone.gemfire.internal.cache.wan.concurrent.ConcurrentParallelGatewaySenderOperation_2_DUnitTest; -import com.gemstone.gemfire.test.dunit.VM; - -@Category(DistributedTest.class) -public class ParallelGatewaySenderOperation_2_DUnitTest extends ConcurrentParallelGatewaySenderOperation_2_DUnitTest { - - private static final long serialVersionUID = 1L; - - public ParallelGatewaySenderOperation_2_DUnitTest() { - super(); - } - - protected void createSender(VM vm, int concurrencyLevel, boolean manualStart) { - vm.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, manualStart)); - } - - protected void createSenders(VM vm, int concurrencyLevel) { - vm.invoke(() -> createSender("ln1", 2, true, 100, 10, false, false, null, true)); - vm.invoke(() -> createSender("ln2", 3, true, 100, 10, false, false, null, true)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java deleted file mode 100644 index 4981c97..0000000 --- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java +++ /dev/null @@ -1,692 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.wan.parallel; - -import static com.gemstone.gemfire.test.dunit.Assert.*; - -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import com.gemstone.gemfire.GemFireIOException; -import com.gemstone.gemfire.distributed.internal.DistributionConfig; -import com.gemstone.gemfire.internal.cache.tier.sockets.Message; -import com.gemstone.gemfire.internal.cache.tier.sockets.MessageTooLargeException; -import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender; -import com.gemstone.gemfire.internal.cache.wan.GatewaySenderException; -import com.gemstone.gemfire.internal.cache.wan.WANTestBase; -import com.gemstone.gemfire.test.dunit.AsyncInvocation; -import com.gemstone.gemfire.test.dunit.IgnoredException; -import com.gemstone.gemfire.test.dunit.LogWriterUtils; -import com.gemstone.gemfire.test.dunit.RMIException; -import com.gemstone.gemfire.test.dunit.Wait; -import com.gemstone.gemfire.test.junit.categories.DistributedTest; -import com.gemstone.gemfire.test.junit.categories.FlakyTest; - -/** - * DUnit test for operations on ParallelGatewaySender - */ -@Category(DistributedTest.class) -public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase { - - @Override - protected final void postSetUpWANTestBase() throws Exception { - IgnoredException.addIgnoredException("Broken pipe||Unexpected IOException"); - } - - @Test - public void testParallelGatewaySenderWithoutStarting() { - Integer[] locatorPorts = createLNAndNYLocators(); - Integer lnPort = locatorPorts[0]; - Integer nyPort = locatorPorts[1]; - - createSendersReceiversAndPartitionedRegion(lnPort, nyPort, false, false); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); - - vm4.invoke(() -> WANTestBase.verifySenderStoppedState( "ln" )); - vm5.invoke(() -> WANTestBase.verifySenderStoppedState( "ln" )); - vm6.invoke(() -> WANTestBase.verifySenderStoppedState( "ln" )); - vm7.invoke(() -> WANTestBase.verifySenderStoppedState( "ln" )); - - validateRegionSizes(getTestMethodName() + "_PR", 0, vm2, vm3); - } - - /** - * Defect 44323 (ParallelGatewaySender should not be started on Accessor node) - */ - @Test - public void testParallelGatewaySenderStartOnAccessorNode() { - Integer[] locatorPorts = createLNAndNYLocators(); - Integer lnPort = locatorPorts[0]; - Integer nyPort = locatorPorts[1]; - - createSendersReceiversAndPartitionedRegion(lnPort, nyPort, true, true); - - Wait.pause(2000); - - vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 10 )); - - vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - - validateRegionSizes(getTestMethodName() + "_PR", 10, vm2, vm3); - } - - /** - * Normal scenario in which the sender is paused in between. - */ - @Test - public void testParallelPropagationSenderPause() throws Exception { - Integer[] locatorPorts = createLNAndNYLocators(); - Integer lnPort = locatorPorts[0]; - Integer nyPort = locatorPorts[1]; - - createSendersReceiversAndPartitionedRegion(lnPort, nyPort, false, true); - - //make sure all the senders are running before doing any puts - waitForSendersRunning(); - - //FIRST RUN: now, the senders are started. So, start the puts - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 100 )); - - //now, pause all of the senders - vm4.invoke(() -> WANTestBase.pauseSender( "ln" )); - vm5.invoke(() -> WANTestBase.pauseSender( "ln" )); - vm6.invoke(() -> WANTestBase.pauseSender( "ln" )); - vm7.invoke(() -> WANTestBase.pauseSender( "ln" )); - - //SECOND RUN: keep one thread doing puts to the region - vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); - - //verify region size remains on remote vm and is restricted below a specified limit (i.e. number of puts in the first run) - vm2.invoke(() -> WANTestBase.validateRegionSizeRemainsSame(getTestMethodName() + "_PR", 100 )); - } - - /** - * Normal scenario in which a paused sender is resumed. - */ - @Test - public void testParallelPropagationSenderResume() throws Exception { - Integer[] locatorPorts = createLNAndNYLocators(); - Integer lnPort = locatorPorts[0]; - Integer nyPort = locatorPorts[1]; - - createSendersReceiversAndPartitionedRegion(lnPort, nyPort, false, true); - - //make sure all the senders are running before doing any puts - waitForSendersRunning(); - - //now, the senders are started. So, start the puts - vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); - - //now, pause all of the senders - vm4.invoke(() -> WANTestBase.pauseSender( "ln" )); - vm5.invoke(() -> WANTestBase.pauseSender( "ln" )); - vm6.invoke(() -> WANTestBase.pauseSender( "ln" )); - vm7.invoke(() -> WANTestBase.pauseSender( "ln" )); - - //sleep for a second or two - Wait.pause(2000); - - //resume the senders - vm4.invoke(() -> WANTestBase.resumeSender( "ln" )); - vm5.invoke(() -> WANTestBase.resumeSender( "ln" )); - vm6.invoke(() -> WANTestBase.resumeSender( "ln" )); - vm7.invoke(() -> WANTestBase.resumeSender( "ln" )); - - Wait.pause(2000); - - validateParallelSenderQueueAllBucketsDrained(); - - //find the region size on remote vm - vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 1000 )); - } - - /** - * Negative scenario in which a sender that is stopped (and not paused) is resumed. - * Expected: resume is only valid for pause. If a sender which is stopped is resumed, - * it will not be started again. - */ - @Test - public void testParallelPropagationSenderResumeNegativeScenario() throws Exception { - Integer[] locatorPorts = createLNAndNYLocators(); - Integer lnPort = locatorPorts[0]; - Integer nyPort = locatorPorts[1]; - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5); - - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5); - - //wait till the senders are running - vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - - //start the puts - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 100 )); - - //let the queue drain completely - vm4.invoke(() -> WANTestBase.validateQueueContents( "ln", 0 )); - - //stop the senders - vm4.invoke(() -> WANTestBase.stopSender( "ln" )); - vm5.invoke(() -> WANTestBase.stopSender( "ln" )); - - //now, try to resume a stopped sender - vm4.invoke(() -> WANTestBase.resumeSender( "ln" )); - vm5.invoke(() -> WANTestBase.resumeSender( "ln" )); - - //do more puts - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); - - //validate region size on remote vm to contain only the events put in local site - //before the senders are stopped. - vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 100 )); - } - - /** - * Normal scenario in which a sender is stopped. - */ - @Test - public void testParallelPropagationSenderStop() throws Exception { - IgnoredException.addIgnoredException("Broken pipe"); - Integer[] locatorPorts = createLNAndNYLocators(); - Integer lnPort = locatorPorts[0]; - Integer nyPort = locatorPorts[1]; - - createSendersReceiversAndPartitionedRegion(lnPort, nyPort, false, true); - - //make sure all the senders are running before doing any puts - waitForSendersRunning(); - - //FIRST RUN: now, the senders are started. So, do some of the puts - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 100 )); - - //now, stop all of the senders - stopSenders(); - - //SECOND RUN: keep one thread doing puts - vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); - - //verify region size remains on remote vm and is restricted below a specified limit (number of puts in the first run) - vm2.invoke(() -> WANTestBase.validateRegionSizeRemainsSame(getTestMethodName() + "_PR", 100 )); - } - - /** - * Normal scenario in which a sender is stopped and then started again. - */ - @Test - public void testParallelPropagationSenderStartAfterStop() throws Exception { - IgnoredException.addIgnoredException("Broken pipe"); - Integer[] locatorPorts = createLNAndNYLocators(); - Integer lnPort = locatorPorts[0]; - Integer nyPort = locatorPorts[1]; - String regionName = getTestMethodName() + "_PR"; - - - createCacheInVMs(nyPort, vm2, vm3); - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm2.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 100, isOffHeap())); - vm3.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 100, isOffHeap())); - - createReceiverInVMs(vm2, vm3); - - vm4.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 100, isOffHeap())); - vm5.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 100, isOffHeap())); - vm6.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 100, isOffHeap())); - vm7.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 100, isOffHeap())); - - vm4.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, true)); - vm5.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, true)); - vm6.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, true)); - vm7.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, true)); - - startSenderInVMs("ln", vm4, vm5, vm6, vm7); - - //make sure all the senders are running before doing any puts - vm4.invoke(() -> waitForSenderRunningState("ln")); - vm5.invoke(() -> waitForSenderRunningState("ln")); - vm6.invoke(() -> waitForSenderRunningState("ln")); - vm7.invoke(() -> waitForSenderRunningState("ln")); - - //FIRST RUN: now, the senders are started. So, do some of the puts - vm4.invoke(() -> WANTestBase.doPuts( regionName, 200 )); - - //now, stop all of the senders - vm4.invoke(() -> stopSender("ln")); - vm5.invoke(() -> stopSender("ln")); - vm6.invoke(() -> stopSender("ln")); - vm7.invoke(() -> stopSender("ln")); - - //Region size on remote site should remain same and below the number of puts done in the FIRST RUN - vm2.invoke(() -> WANTestBase.validateRegionSizeRemainsSame(regionName, 200 )); - - //SECOND RUN: do some of the puts after the senders are stopped - vm4.invoke(() -> WANTestBase.doPuts( regionName, 1000 )); - - //Region size on remote site should remain same and below the number of puts done in the FIRST RUN - vm2.invoke(() -> WANTestBase.validateRegionSizeRemainsSame(regionName, 200 )); - - //start the senders again - startSenderInVMs("ln", vm4, vm5, vm6, vm7); - - vm4.invoke(() -> waitForSenderRunningState("ln")); - vm5.invoke(() -> waitForSenderRunningState("ln")); - vm6.invoke(() -> waitForSenderRunningState("ln")); - vm7.invoke(() -> waitForSenderRunningState("ln")); - - //Region size on remote site should remain same and below the number of puts done in the FIRST RUN - vm2.invoke(() -> WANTestBase.validateRegionSizeRemainsSame(regionName, 200 )); - - //SECOND RUN: do some more puts - AsyncInvocation async = vm4.invokeAsync(() -> WANTestBase.doPuts( regionName, 1000 )); - async.join(); - - //verify all the buckets on all the sender nodes are drained - validateParallelSenderQueueAllBucketsDrained(); - - //verify the events propagate to remote site - vm2.invoke(() -> WANTestBase.validateRegionSize(regionName, 1000 )); - - vm4.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 0 )); - vm5.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 0 )); - vm6.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 0 )); - vm7.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 0 )); - } - - /** - * Normal scenario in which a sender is stopped and then started again. - * Differs from above test case in the way that when the sender is starting from - * stopped state, puts are simultaneously happening on the region by another thread. - */ - @Test - public void testParallelPropagationSenderStartAfterStop_Scenario2() throws Exception { - IgnoredException.addIgnoredException("Broken pipe"); - Integer[] locatorPorts = createLNAndNYLocators(); - Integer lnPort = locatorPorts[0]; - Integer nyPort = locatorPorts[1]; - - createSendersReceiversAndPartitionedRegion(lnPort, nyPort, false, true); - - //make sure all the senders are running before doing any puts - waitForSendersRunning(); - - LogWriterUtils.getLogWriter().info("All the senders are now started"); - - //FIRST RUN: now, the senders are started. So, do some of the puts - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 200 )); - - LogWriterUtils.getLogWriter().info("Done few puts"); - - //Make sure the puts make it to the remote side - vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 200, 120000)); - vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 200, 120000)); - - //now, stop all of the senders - stopSenders(); - - LogWriterUtils.getLogWriter().info("All the senders are stopped"); - vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 200, 120000)); - vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 200, 120000)); - - //SECOND RUN: do some of the puts after the senders are stopped - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); - LogWriterUtils.getLogWriter().info("Done some more puts in second run"); - - //Region size on remote site should remain same and below the number of puts done in the FIRST RUN - vm2.invoke(() -> WANTestBase.validateRegionSizeRemainsSame(getTestMethodName() + "_PR", 200 )); - - //SECOND RUN: start async puts on region - AsyncInvocation async = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 5000 )); - LogWriterUtils.getLogWriter().info("Started high number of puts by async thread"); - - LogWriterUtils.getLogWriter().info("Starting the senders at the same time"); - //when puts are happening by another thread, start the senders - startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7); - - LogWriterUtils.getLogWriter().info("All the senders are started"); - - async.join(); - - //verify all the buckets on all the sender nodes are drained - validateParallelSenderQueueAllBucketsDrained(); - - //verify that the queue size ultimately becomes zero. That means all the events propagate to remote site. - vm4.invoke(() -> WANTestBase.validateQueueContents( "ln", 0 )); - } - - /** - * Normal scenario in which a sender is stopped and then started again on accessor node. - */ - @Test - public void testParallelPropagationSenderStartAfterStopOnAccessorNode() throws Exception { - IgnoredException.addIgnoredException("Broken pipe"); - IgnoredException.addIgnoredException("Connection reset"); - IgnoredException.addIgnoredException("Unexpected IOException"); - Integer[] locatorPorts = createLNAndNYLocators(); - Integer lnPort = locatorPorts[0]; - Integer nyPort = locatorPorts[1]; - - createSendersReceiversAndPartitionedRegion(lnPort, nyPort, true, true); - - //make sure all the senders are not running on accessor nodes and running on non-accessor nodes - waitForSendersRunning(); - - //FIRST RUN: now, the senders are started. So, do some of the puts - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 200 )); - - //now, stop all of the senders - stopSenders(); - - Wait.pause(2000); - - //SECOND RUN: do some of the puts after the senders are stopped - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); - - //Region size on remote site should remain same and below the number of puts done in the FIRST RUN - vm2.invoke(() -> WANTestBase.validateRegionSizeRemainsSame(getTestMethodName() + "_PR", 200 )); - - //start the senders again - startSenderInVMs("ln", vm4, vm5, vm6, vm7); - - //Region size on remote site should remain same and below the number of puts done in the FIRST RUN - vm2.invoke(() -> WANTestBase.validateRegionSizeRemainsSame(getTestMethodName() + "_PR", 200 )); - - //SECOND RUN: do some more puts - AsyncInvocation async = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); - async.join(); - Wait.pause(5000); - - //verify all buckets drained only on non-accessor nodes. - vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - - //verify the events propagate to remote site - vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 1000 )); - } - - /** - * Normal scenario in which a combinations of start, pause, resume operations - * is tested - */ - @Test - public void testStartPauseResumeParallelGatewaySender() throws Exception { - Integer[] locatorPorts = createLNAndNYLocators(); - Integer lnPort = locatorPorts[0]; - Integer nyPort = locatorPorts[1]; - - createSendersReceiversAndPartitionedRegion(lnPort, nyPort, false, true); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); - LogWriterUtils.getLogWriter().info("Done 1000 puts on local site"); - - //Since puts are already done on userPR, it will have the buckets created. - //During sender start, it will wait until those buckets are created for shadowPR as well. - //Start the senders in async threads, so colocation of shadowPR will be complete and - //missing buckets will be created in PRHARedundancyProvider.createMissingBuckets(). - startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7); - - waitForSendersRunning(); - - LogWriterUtils.getLogWriter().info("Started senders on local site"); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 5000 )); - LogWriterUtils.getLogWriter().info("Done 5000 puts on local site"); - - vm4.invoke(() -> WANTestBase.pauseSender( "ln" )); - vm5.invoke(() -> WANTestBase.pauseSender( "ln" )); - vm6.invoke(() -> WANTestBase.pauseSender( "ln" )); - vm7.invoke(() -> WANTestBase.pauseSender( "ln" )); - LogWriterUtils.getLogWriter().info("Paused senders on local site"); - - vm4.invoke(() -> WANTestBase.verifySenderPausedState( "ln" )); - vm5.invoke(() -> WANTestBase.verifySenderPausedState( "ln" )); - vm6.invoke(() -> WANTestBase.verifySenderPausedState( "ln" )); - vm7.invoke(() -> WANTestBase.verifySenderPausedState( "ln" )); - - AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 )); - LogWriterUtils.getLogWriter().info("Started 1000 async puts on local site"); - - vm4.invoke(() -> WANTestBase.resumeSender( "ln" )); - vm5.invoke(() -> WANTestBase.resumeSender( "ln" )); - vm6.invoke(() -> WANTestBase.resumeSender( "ln" )); - vm7.invoke(() -> WANTestBase.resumeSender( "ln" )); - LogWriterUtils.getLogWriter().info("Resumed senders on local site"); - - vm4.invoke(() -> WANTestBase.verifySenderResumedState( "ln" )); - vm5.invoke(() -> WANTestBase.verifySenderResumedState( "ln" )); - vm6.invoke(() -> WANTestBase.verifySenderResumedState( "ln" )); - vm7.invoke(() -> WANTestBase.verifySenderResumedState( "ln" )); - - try { - inv1.join(); - } catch (InterruptedException e) { - fail("Interrupted the async invocation.", e); - } - - //verify all buckets drained on all sender nodes. - validateParallelSenderQueueAllBucketsDrained(); - - validateRegionSizes(getTestMethodName() + "_PR", 5000, vm2, vm3); - } - - /** - * Since the sender is attached to a region and in use, it can not be - * destroyed. Hence, exception is thrown by the sender API. - */ - @Test - public void testDestroyParallelGatewaySenderExceptionScenario() { - Integer[] locatorPorts = createLNAndNYLocators(); - Integer lnPort = locatorPorts[0]; - Integer nyPort = locatorPorts[1]; - - createSendersReceiversAndPartitionedRegion(lnPort, nyPort, false, true); - - // make sure all the senders are running before doing any puts - waitForSendersRunning(); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", - 1000 )); - - // try destroying on couple of nodes - try { - vm4.invoke(() -> WANTestBase.destroySender( "ln" )); - } - catch (RMIException e) { - assertTrue("Cause of the exception should be GatewaySenderException", e - .getCause() instanceof GatewaySenderException); - } - try { - vm5.invoke(() -> WANTestBase.destroySender( "ln" )); - } - catch (RMIException e) { - assertTrue("Cause of the exception should be GatewaySenderException", e - .getCause() instanceof GatewaySenderException); - } - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR", 1000 )); - } - - @Test - public void testDestroyParallelGatewaySender() { - Integer[] locatorPorts = createLNAndNYLocators(); - Integer lnPort = locatorPorts[0]; - Integer nyPort = locatorPorts[1]; - - createSendersReceiversAndPartitionedRegion(lnPort, nyPort, false, true); - - // make sure all the senders are running - waitForSendersRunning(); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", - 1000 )); - - Wait.pause(2000); - - //stop the sender and remove from region before calling destroy on it - stopSenders(); - - vm4.invoke(() -> WANTestBase.removeSenderFromTheRegion( - "ln", getTestMethodName() + "_PR" )); - vm5.invoke(() -> WANTestBase.removeSenderFromTheRegion( - "ln", getTestMethodName() + "_PR" )); - vm6.invoke(() -> WANTestBase.removeSenderFromTheRegion( - "ln", getTestMethodName() + "_PR" )); - vm7.invoke(() -> WANTestBase.removeSenderFromTheRegion( - "ln", getTestMethodName() + "_PR" )); - - vm4.invoke(() -> WANTestBase.destroySender( "ln" )); - vm5.invoke(() -> WANTestBase.destroySender( "ln" )); - vm6.invoke(() -> WANTestBase.destroySender( "ln" )); - vm7.invoke(() -> WANTestBase.destroySender( "ln" )); - - vm4.invoke(() -> WANTestBase.verifySenderDestroyed( "ln", true )); - vm5.invoke(() -> WANTestBase.verifySenderDestroyed( "ln", true )); - vm6.invoke(() -> WANTestBase.verifySenderDestroyed( "ln", true )); - vm7.invoke(() -> WANTestBase.verifySenderDestroyed( "ln", true )); - } - - @Test - public void testParallelGatewaySenderMessageTooLargeException() { - Integer[] locatorPorts = createLNAndNYLocators(); - Integer lnPort = locatorPorts[0]; - Integer nyPort = locatorPorts[1]; - - // Create and start sender with reduced maximum message size and 1 dispatcher thread - String regionName = getTestMethodName() + "_PR"; - vm4.invoke(() -> setMaximumMessageSize( 1024*1024 )); - vm4.invoke(() -> createCache( lnPort )); - vm4.invoke(() -> setNumDispatcherThreadsForTheRun( 1 )); - vm4.invoke(() -> createSender( "ln", 2, true, 100, 100, false, false, null, false )); - vm4.invoke(() -> createPartitionedRegion( regionName, "ln", 0, 100, isOffHeap() )); - - // Do puts - int numPuts = 200; - vm4.invoke(() -> doPuts( regionName, numPuts, new byte[11000] )); - validateRegionSizes(regionName, numPuts, vm4); - - // Start receiver - IgnoredException ignoredMTLE = IgnoredException.addIgnoredException(MessageTooLargeException.class.getName(), vm4); - IgnoredException ignoredGIOE = IgnoredException.addIgnoredException(GemFireIOException.class.getName(), vm4); - vm2.invoke(() -> createCache( nyPort )); - vm2.invoke(() -> createPartitionedRegion( regionName, null, 0, 100, isOffHeap() )); - vm2.invoke(() -> createReceiver()); - validateRegionSizes( regionName, numPuts, vm2 ); - - vm4.invoke(() -> { - final AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender("ln"); - assertTrue(sender.getStatistics().getBatchesResized() > 0); - }); - ignoredMTLE.remove(); - ignoredGIOE.remove(); - } - - private void setMaximumMessageSize(int maximumMessageSizeBytes) { - Message.MAX_MESSAGE_SIZE = maximumMessageSizeBytes; - LogWriterUtils.getLogWriter() - .info("Set gemfire.client.max-message-size: " + System.getProperty(DistributionConfig.GEMFIRE_PREFIX + "client.max-message-size")); - } - - private void createSendersReceiversAndPartitionedRegion(Integer lnPort, Integer nyPort, boolean createAccessors, - boolean startSenders) { - // Note: This is a test-specific method used by several test to create - // receivers, senders and partitioned regions. - createSendersAndReceivers(lnPort, nyPort); - - createPartitionedRegions(createAccessors); - - if (startSenders) { - startSenderInVMs("ln", vm4, vm5, vm6, vm7); - } - } - - private void createSendersAndReceivers(Integer lnPort, Integer nyPort) { - // Note: This is a test-specific method used by several test to create - // receivers and senders. - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, true)); - vm5.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, true)); - vm6.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, true)); - vm7.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, true)); - } - - private void createPartitionedRegions(boolean createAccessors) { - // Note: This is a test-specific method used by several test to create - // partitioned regions. - String regionName = getTestMethodName() + "_PR"; - vm4.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 100, isOffHeap())); - vm5.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 100, isOffHeap())); - - if (createAccessors) { - vm6.invoke(() -> createPartitionedRegionAsAccessor(regionName, "ln", 1, 100)); - vm7.invoke(() -> createPartitionedRegionAsAccessor(regionName, "ln", 1, 100)); - } else { - vm6.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 100, isOffHeap())); - vm7.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 100, isOffHeap())); - } - - vm2.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 100, isOffHeap())); - vm3.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 100, isOffHeap())); - } - - private void stopSenders() { - vm4.invoke(() -> stopSender("ln")); - vm5.invoke(() -> stopSender("ln")); - vm6.invoke(() -> stopSender("ln")); - vm7.invoke(() -> stopSender("ln")); - } - - private void waitForSendersRunning() { - vm4.invoke(() -> waitForSenderRunningState("ln")); - vm5.invoke(() -> waitForSenderRunningState("ln")); - vm6.invoke(() -> waitForSenderRunningState("ln")); - vm7.invoke(() -> waitForSenderRunningState("ln")); - } - - private void validateParallelSenderQueueAllBucketsDrained() { - vm4.invoke(() -> validateParallelSenderQueueAllBucketsDrained("ln")); - vm5.invoke(() -> validateParallelSenderQueueAllBucketsDrained("ln")); - vm6.invoke(() -> validateParallelSenderQueueAllBucketsDrained("ln")); - vm7.invoke(() -> validateParallelSenderQueueAllBucketsDrained("ln")); - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueueOverflowDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueueOverflowDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueueOverflowDUnitTest.java deleted file mode 100644 index a248520..0000000 --- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueueOverflowDUnitTest.java +++ /dev/null @@ -1,500 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.wan.parallel; - -import static com.gemstone.gemfire.distributed.ConfigurationProperties.*; -import static org.junit.Assert.*; - -import java.io.File; -import java.util.Properties; -import java.util.Set; -import java.util.concurrent.TimeUnit; - -import org.junit.Ignore; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import com.gemstone.gemfire.cache.AttributesFactory; -import com.gemstone.gemfire.cache.CacheFactory; -import com.gemstone.gemfire.cache.DataPolicy; -import com.gemstone.gemfire.cache.DiskStore; -import com.gemstone.gemfire.cache.DiskStoreFactory; -import com.gemstone.gemfire.cache.Region; -import com.gemstone.gemfire.cache.wan.GatewayEventFilter; -import com.gemstone.gemfire.cache.wan.GatewaySender; -import com.gemstone.gemfire.cache.wan.GatewaySenderFactory; -import com.gemstone.gemfire.cache.wan.GatewayTransportFilter; -import com.gemstone.gemfire.cache30.MyGatewayEventFilter1; -import com.gemstone.gemfire.cache30.MyGatewayTransportFilter1; -import com.gemstone.gemfire.cache30.MyGatewayTransportFilter2; -import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; -import com.gemstone.gemfire.internal.cache.RegionQueue; -import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender; -import com.gemstone.gemfire.internal.cache.wan.WANTestBase; -import com.gemstone.gemfire.test.dunit.IgnoredException; -import com.gemstone.gemfire.test.dunit.LogWriterUtils; -import com.gemstone.gemfire.test.dunit.VM; -import com.gemstone.gemfire.test.dunit.Wait; -import com.gemstone.gemfire.test.junit.categories.DistributedTest; -import com.jayway.awaitility.Awaitility; - -/** - * DUnit for ParallelSenderQueue overflow operations. - */ -@Category(DistributedTest.class) -public class ParallelGatewaySenderQueueOverflowDUnitTest extends WANTestBase { - - @Test - public void testParallelSenderQueueEventsOverflow_NoDiskStoreSpecified() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createSenderWithoutDiskStore( "ln", 2, 10, 10, false, true )); - vm5.invoke(() -> WANTestBase.createSenderWithoutDiskStore( "ln", 2, 10, 10, false, true )); - vm6.invoke(() -> WANTestBase.createSenderWithoutDiskStore( "ln", 2, 10, 10, false, true )); - vm7.invoke(() -> WANTestBase.createSenderWithoutDiskStore( "ln", 2, 10, 10, false, true )); - - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName(), "ln", 1, 100, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName(), "ln", 1, 100, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName(), "ln", 1, 100, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName(), "ln", 1, 100, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.pauseSender( "ln" )); - vm5.invoke(() -> WANTestBase.pauseSender( "ln" )); - vm6.invoke(() -> WANTestBase.pauseSender( "ln" )); - vm7.invoke(() -> WANTestBase.pauseSender( "ln" )); - - //give some time for the senders to pause - Wait.pause(1000); - - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName(), null, 1, 100, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName(), null, 1, 100, isOffHeap() )); - - int numEventPuts = 50; - vm4.invoke(() -> WANTestBase.doHeavyPuts( getTestMethodName(), numEventPuts )); - - - //considering a memory limit of 40 MB, maximum of 40 events can be in memory. Rest should be on disk. - Awaitility.await().atMost(60, TimeUnit.SECONDS).until(()-> - { - long numOvVm4 = (Long) vm4.invoke(() -> WANTestBase.getNumberOfEntriesOverflownToDisk( "ln" )); - long numOvVm5 = (Long) vm5.invoke(() -> WANTestBase.getNumberOfEntriesOverflownToDisk( "ln" )); - long numOvVm6 = (Long) vm6.invoke(() -> WANTestBase.getNumberOfEntriesOverflownToDisk( "ln" )); - long numOvVm7 = (Long) vm7.invoke(() -> WANTestBase.getNumberOfEntriesOverflownToDisk( "ln" )); - - long numMemVm4 = (Long) vm4.invoke(() -> WANTestBase.getNumberOfEntriesInVM( "ln" )); - long numMemVm5 = (Long) vm5.invoke(() -> WANTestBase.getNumberOfEntriesInVM( "ln" )); - long numMemVm6 = (Long) vm6.invoke(() -> WANTestBase.getNumberOfEntriesInVM( "ln" )); - long numMemVm7 = (Long) vm7.invoke(() -> WANTestBase.getNumberOfEntriesInVM( "ln" )); - - LogWriterUtils.getLogWriter().info("Entries overflown to disk: " + numOvVm4 + "," + numOvVm5 + "," + numOvVm6 + "," + numOvVm7); - LogWriterUtils.getLogWriter().info("Entries in VM: " + numMemVm4 + "," + numMemVm5 + "," + numMemVm6 + "," + numMemVm7); - long totalOverflown = numOvVm4 + numOvVm5 + numOvVm6 + numOvVm7; - assertTrue("Total number of entries overflown to disk should be at least greater than 55", (totalOverflown > 55)); - - long totalInMemory = numMemVm4 + numMemVm5 + numMemVm6 + numMemVm7; - //expected is twice the number of events put due to redundancy level of 1 - assertEquals("Total number of entries on disk and in VM is incorrect", (numEventPuts*2), (totalOverflown + totalInMemory)); - - }); - - vm4.invoke(() -> WANTestBase.resumeSender( "ln" )); - vm5.invoke(() -> WANTestBase.resumeSender( "ln" )); - vm6.invoke(() -> WANTestBase.resumeSender( "ln" )); - vm7.invoke(() -> WANTestBase.resumeSender( "ln" )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName(), 50, 240000 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName(), 50, 240000 )); - } - - /** - * Keep same max memory limit for all the VMs - */ - @Ignore("TODO: test is disabled") - @Test - public void testParallelSenderQueueEventsOverflow() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 10, 10, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 10, 10, false, false, null, true )); - vm6.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 10, 10, false, false, null, true )); - vm7.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 10, 10, false, false, null, true )); - - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName(), "ln", 1, 100, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName(), "ln", 1, 100, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName(), "ln", 1, 100, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName(), "ln", 1, 100, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.pauseSender( "ln" )); - vm5.invoke(() -> WANTestBase.pauseSender( "ln" )); - vm6.invoke(() -> WANTestBase.pauseSender( "ln" )); - vm7.invoke(() -> WANTestBase.pauseSender( "ln" )); - - //give some time for the senders to pause - Wait.pause(1000); - - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName(), null, 1, 100, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName(), null, 1, 100, isOffHeap() )); - - int numEventPuts = 50; - vm4.invoke(() -> WANTestBase.doHeavyPuts( getTestMethodName(), numEventPuts )); - - long numOvVm4 = (Long) vm4.invoke(() -> WANTestBase.getNumberOfEntriesOverflownToDisk( "ln" )); - long numOvVm5 = (Long) vm5.invoke(() -> WANTestBase.getNumberOfEntriesOverflownToDisk( "ln" )); - long numOvVm6 = (Long) vm6.invoke(() -> WANTestBase.getNumberOfEntriesOverflownToDisk( "ln" )); - long numOvVm7 = (Long) vm7.invoke(() -> WANTestBase.getNumberOfEntriesOverflownToDisk( "ln" )); - - long numMemVm4 = (Long) vm4.invoke(() -> WANTestBase.getNumberOfEntriesInVM( "ln" )); - long numMemVm5 = (Long) vm5.invoke(() -> WANTestBase.getNumberOfEntriesInVM( "ln" )); - long numMemVm6 = (Long) vm6.invoke(() -> WANTestBase.getNumberOfEntriesInVM( "ln" )); - long numMemVm7 = (Long) vm7.invoke(() -> WANTestBase.getNumberOfEntriesInVM( "ln" )); - - LogWriterUtils.getLogWriter().info("Entries overflown to disk: " + numOvVm4 + "," + numOvVm5 + "," + numOvVm6 + "," + numOvVm7); - LogWriterUtils.getLogWriter().info("Entries in VM: " + numMemVm4 + "," + numMemVm5 + "," + numMemVm6 + "," + numMemVm7); - - long totalOverflown = numOvVm4 + numOvVm5 + numOvVm6 + numOvVm7; - //considering a memory limit of 40 MB, maximum of 40 events can be in memory. Rest should be on disk. - assertTrue("Total number of entries overflown to disk should be at least greater than 55", (totalOverflown > 55)); - - long totalInMemory = numMemVm4 + numMemVm5 + numMemVm6 + numMemVm7; - //expected is twice the number of events put due to redundancy level of 1 - assertEquals("Total number of entries on disk and in VM is incorrect", (numEventPuts*2), (totalOverflown + totalInMemory)); - - vm4.invoke(() -> WANTestBase.resumeSender( "ln" )); - vm5.invoke(() -> WANTestBase.resumeSender( "ln" )); - vm6.invoke(() -> WANTestBase.resumeSender( "ln" )); - vm7.invoke(() -> WANTestBase.resumeSender( "ln" )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName(), 50 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName(), 50 )); - } - - /** - * Set a different memory limit for each VM and make sure that all the VMs are utilized to - * full extent of available memory. - */ - @Ignore("TODO: test is disabled") - @Test - public void testParallelSenderQueueEventsOverflow_2() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 10, 10, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 5, 10, false, false, null, true )); - vm6.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 5, 10, false, false, null, true )); - vm7.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 20, 10, false, false, null, true )); - - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName(), "ln", 1, 100, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName(), "ln", 1, 100, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName(), "ln", 1, 100, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName(), "ln", 1, 100, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.pauseSender( "ln" )); - vm5.invoke(() -> WANTestBase.pauseSender( "ln" )); - vm6.invoke(() -> WANTestBase.pauseSender( "ln" )); - vm7.invoke(() -> WANTestBase.pauseSender( "ln" )); - - //give some time for the senders to pause - Wait.pause(1000); - - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName(), null, 1, 100, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName(), null, 1, 100, isOffHeap() )); - - int numEventPuts = 50; - vm4.invoke(() -> WANTestBase.doHeavyPuts( getTestMethodName(), numEventPuts )); - - long numOvVm4 = (Long) vm4.invoke(() -> WANTestBase.getNumberOfEntriesOverflownToDisk( "ln" )); - long numOvVm5 = (Long) vm5.invoke(() -> WANTestBase.getNumberOfEntriesOverflownToDisk( "ln" )); - long numOvVm6 = (Long) vm6.invoke(() -> WANTestBase.getNumberOfEntriesOverflownToDisk( "ln" )); - long numOvVm7 = (Long) vm7.invoke(() -> WANTestBase.getNumberOfEntriesOverflownToDisk( "ln" )); - - long numMemVm4 = (Long) vm4.invoke(() -> WANTestBase.getNumberOfEntriesInVM( "ln" )); - long numMemVm5 = (Long) vm5.invoke(() -> WANTestBase.getNumberOfEntriesInVM( "ln" )); - long numMemVm6 = (Long) vm6.invoke(() -> WANTestBase.getNumberOfEntriesInVM( "ln" )); - long numMemVm7 = (Long) vm7.invoke(() -> WANTestBase.getNumberOfEntriesInVM( "ln" )); - - LogWriterUtils.getLogWriter().info("Entries overflown to disk: " + numOvVm4 + "," + numOvVm5 + "," + numOvVm6 + "," + numOvVm7); - LogWriterUtils.getLogWriter().info("Entries in VM: " + numMemVm4 + "," + numMemVm5 + "," + numMemVm6 + "," + numMemVm7); - - long totalOverflown = numOvVm4 + numOvVm5 + numOvVm6 + numOvVm7; - //considering a memory limit of 40 MB, maximum of 40 events can be in memory. Rest should be on disk. - assertTrue("Total number of entries overflown to disk should be at least greater than 55", (totalOverflown > 55)); - - long totalInMemory = numMemVm4 + numMemVm5 + numMemVm6 + numMemVm7; - //expected is twice the number of events put due to redundancy level of 1 - assertEquals("Total number of entries on disk and in VM is incorrect", (numEventPuts*2), (totalOverflown + totalInMemory)); - - //assert the numbers for each VM - assertTrue("Number of entries in memory VM4 is incorrect. Should be less than 10", (numMemVm4 < 10)); - assertTrue("Number of entries in memory VM5 is incorrect. Should be less than 5", (numMemVm5 < 5)); - assertTrue("Number of entries in memory VM6 is incorrect. Should be less than 5", (numMemVm6 < 5)); - assertTrue("Number of entries in memory VM7 is incorrect. Should be less than 20", (numMemVm7 < 20)); - - vm4.invoke(() -> WANTestBase.resumeSender( "ln" )); - vm5.invoke(() -> WANTestBase.resumeSender( "ln" )); - vm6.invoke(() -> WANTestBase.resumeSender( "ln" )); - vm7.invoke(() -> WANTestBase.resumeSender( "ln" )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName(), 50 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName(), 50 )); - } - - @Ignore("TODO: test is disabled") - @Test - public void testParallelSenderQueueNoEventsOverflow() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 10, 10, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 10, 10, false, false, null, true )); - vm6.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 10, 10, false, false, null, true )); - vm7.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 10, 10, false, false, null, true )); - - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName(), "ln", 1, 100, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName(), "ln", 1, 100, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName(), "ln", 1, 100, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName(), "ln", 1, 100, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.pauseSender( "ln" )); - vm5.invoke(() -> WANTestBase.pauseSender( "ln" )); - vm6.invoke(() -> WANTestBase.pauseSender( "ln" )); - vm7.invoke(() -> WANTestBase.pauseSender( "ln" )); - - //give some time for the senders to pause - Wait.pause(1000); - - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName(), null, 1, 100, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName(), null, 1, 100, isOffHeap() )); - - int numEventPuts = 15; - vm4.invoke(() -> WANTestBase.doHeavyPuts( getTestMethodName(), numEventPuts )); - - long numOvVm4 = (Long) vm4.invoke(() -> WANTestBase.getNumberOfEntriesOverflownToDisk( "ln" )); - long numOvVm5 = (Long) vm5.invoke(() -> WANTestBase.getNumberOfEntriesOverflownToDisk( "ln" )); - long numOvVm6 = (Long) vm6.invoke(() -> WANTestBase.getNumberOfEntriesOverflownToDisk( "ln" )); - long numOvVm7 = (Long) vm7.invoke(() -> WANTestBase.getNumberOfEntriesOverflownToDisk( "ln" )); - - long numMemVm4 = (Long) vm4.invoke(() -> WANTestBase.getNumberOfEntriesInVM( "ln" )); - long numMemVm5 = (Long) vm5.invoke(() -> WANTestBase.getNumberOfEntriesInVM( "ln" )); - long numMemVm6 = (Long) vm6.invoke(() -> WANTestBase.getNumberOfEntriesInVM( "ln" )); - long numMemVm7 = (Long) vm7.invoke(() -> WANTestBase.getNumberOfEntriesInVM( "ln" )); - - LogWriterUtils.getLogWriter().info("Entries overflown to disk: " + numOvVm4 + "," + numOvVm5 + "," + numOvVm6 + "," + numOvVm7); - LogWriterUtils.getLogWriter().info("Entries in VM: " + numMemVm4 + "," + numMemVm5 + "," + numMemVm6 + "," + numMemVm7); - - long totalOverflown = numOvVm4 + numOvVm5 + numOvVm6 + numOvVm7; - //all 30 (considering redundant copies) events should accommodate in 40 MB space given to 4 senders - assertEquals("Total number of entries overflown to disk is incorrect", 0, totalOverflown); - - long totalInMemory = numMemVm4 + numMemVm5 + numMemVm6 + numMemVm7; - //expected is twice the number of events put due to redundancy level of 1 - assertEquals("Total number of entries on disk and in VM is incorrect", (numEventPuts*2), (totalOverflown + totalInMemory)); - - vm4.invoke(() -> WANTestBase.resumeSender( "ln" )); - vm5.invoke(() -> WANTestBase.resumeSender( "ln" )); - vm6.invoke(() -> WANTestBase.resumeSender( "ln" )); - vm7.invoke(() -> WANTestBase.resumeSender( "ln" )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName(), 15 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName(), 15 )); - } - - /** - * Test to validate that ParallelGatewaySenderQueue diskSynchronous attribute - * when persistence of sender is enabled. - */ - @Ignore("TODO: test is disabled") - @Test - public void test_ValidateParallelGatewaySenderQueueAttributes_1() { - Integer localLocPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - - Integer remoteLocPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, localLocPort )); - - WANTestBase test = new WANTestBase(); - Properties props = test.getDistributedSystemProperties(); - props.setProperty(MCAST_PORT, "0"); - props.setProperty(LOCATORS, "localhost[" + localLocPort + "]"); - InternalDistributedSystem ds = test.getSystem(props); - cache = CacheFactory.create(ds); - - File directory = new File("TKSender" + "_disk_" - + System.currentTimeMillis() + "_" + VM.getCurrentVMNum()); - directory.mkdir(); - File[] dirs1 = new File[] { directory }; - DiskStoreFactory dsf = cache.createDiskStoreFactory(); - dsf.setDiskDirs(dirs1); - DiskStore diskStore = dsf.create("FORNY"); - - GatewaySenderFactory fact = cache.createGatewaySenderFactory(); - fact.setParallel(true);//set parallel to true - fact.setBatchConflationEnabled(true); - fact.setBatchSize(200); - fact.setBatchTimeInterval(300); - fact.setPersistenceEnabled(true);//enable the persistence - fact.setDiskSynchronous(true); - fact.setDiskStoreName("FORNY"); - fact.setMaximumQueueMemory(200); - fact.setAlertThreshold(1200); - GatewayEventFilter myEventFilter1 = new MyGatewayEventFilter1(); - fact.addGatewayEventFilter(myEventFilter1); - GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1(); - fact.addGatewayTransportFilter(myStreamFilter1); - GatewayTransportFilter myStreamFilter2 = new MyGatewayTransportFilter2(); - fact.addGatewayTransportFilter(myStreamFilter2); - final IgnoredException exTKSender = IgnoredException.addIgnoredException("Could not connect"); - try { - GatewaySender sender1 = fact.create("TKSender", 2); - - AttributesFactory factory = new AttributesFactory(); - factory.addGatewaySenderId(sender1.getId()); - factory.setDataPolicy(DataPolicy.PERSISTENT_PARTITION); - Region region = cache.createRegionFactory(factory.create()).create( - "test_ValidateGatewaySenderAttributes"); - Set<GatewaySender> senders = cache.getGatewaySenders(); - assertEquals(senders.size(), 1); - GatewaySender gatewaySender = senders.iterator().next(); - Set<RegionQueue> regionQueues = ((AbstractGatewaySender) gatewaySender) - .getQueues(); - assertEquals(regionQueues.size(), 1); - RegionQueue regionQueue = regionQueues.iterator().next(); - assertEquals(true, regionQueue.getRegion().getAttributes() - .isDiskSynchronous()); - } finally { - exTKSender.remove(); - } - } - - /** - * Test to validate that ParallelGatewaySenderQueue diskSynchronous attribute - * when persistence of sender is not enabled. - */ - @Ignore("TODO: test is disabled") - @Test - public void test_ValidateParallelGatewaySenderQueueAttributes_2() { - Integer localLocPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - - Integer remoteLocPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, localLocPort )); - - WANTestBase test = new WANTestBase(); - Properties props = test.getDistributedSystemProperties(); - props.setProperty(MCAST_PORT, "0"); - props.setProperty(LOCATORS, "localhost[" + localLocPort + "]"); - InternalDistributedSystem ds = test.getSystem(props); - cache = CacheFactory.create(ds); - - GatewaySenderFactory fact = cache.createGatewaySenderFactory(); - fact.setParallel(true);//set parallel to true - fact.setBatchConflationEnabled(true); - fact.setBatchSize(200); - fact.setBatchTimeInterval(300); - fact.setPersistenceEnabled(false);//set persistence to false - fact.setDiskSynchronous(true); - fact.setMaximumQueueMemory(200); - fact.setAlertThreshold(1200); - GatewayEventFilter myEventFilter1 = new MyGatewayEventFilter1(); - fact.addGatewayEventFilter(myEventFilter1); - GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1(); - fact.addGatewayTransportFilter(myStreamFilter1); - GatewayTransportFilter myStreamFilter2 = new MyGatewayTransportFilter2(); - fact.addGatewayTransportFilter(myStreamFilter2); - final IgnoredException ex = IgnoredException.addIgnoredException("Could not connect"); - try { - GatewaySender sender1 = fact.create("TKSender", 2); - AttributesFactory factory = new AttributesFactory(); - factory.addGatewaySenderId(sender1.getId()); - factory.setDataPolicy(DataPolicy.PARTITION); - Region region = cache.createRegionFactory(factory.create()).create( - "test_ValidateGatewaySenderAttributes"); - Set<GatewaySender> senders = cache.getGatewaySenders(); - assertEquals(senders.size(), 1); - GatewaySender gatewaySender = senders.iterator().next(); - Set<RegionQueue> regionQueues = ((AbstractGatewaySender) gatewaySender) - .getQueues(); - assertEquals(regionQueues.size(), 1); - RegionQueue regionQueue = regionQueues.iterator().next(); - assertEquals(false, regionQueue.getRegion().getAttributes() - .isDiskSynchronous()); - } finally { - ex.remove(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java deleted file mode 100644 index 87a74b9..0000000 --- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java +++ /dev/null @@ -1,497 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.wan.parallel; - -import org.junit.experimental.categories.Category; -import org.junit.Test; - -import static org.junit.Assert.*; - -import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase; -import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase; -import com.gemstone.gemfire.test.junit.categories.DistributedTest; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Map; -import java.util.Random; - -import com.gemstone.gemfire.internal.cache.wan.WANTestBase; -import com.gemstone.gemfire.test.dunit.IgnoredException; - -/** - * - */ -@Category(DistributedTest.class) -public class ParallelWANConflationDUnitTest extends WANTestBase { - private static final long serialVersionUID = 1L; - - public ParallelWANConflationDUnitTest() { - super(); - } - - @Override - protected final void postSetUpWANTestBase() throws Exception { - IgnoredException.addIgnoredException("java.net.ConnectException"); - } - - @Test - public void testParallelPropagationConflationDisabled() throws Exception { - initialSetUp(); - - createSendersNoConflation(); - - createSenderPRs(); - - startPausedSenders(); - - createReceiverPrs(); - - final Map keyValues = putKeyValues(); - - vm4.invoke(() ->checkQueueSize( "ln", keyValues.size() )); - - final Map updateKeyValues = updateKeyValues(); - - vm4.invoke(() ->checkQueueSize( "ln", (keyValues.size() + updateKeyValues.size()) )); - - vm2.invoke(() ->validateRegionSize( - getTestMethodName(), 0 )); - - resumeSenders(); - - keyValues.putAll(updateKeyValues); - validateReceiverRegionSize(keyValues); - - } - - /** - * This test is disabled as it is not guaranteed to pass it everytime. This - * test is related to the conflation in batch. yet did find any way to - * ascertain that the vents in the batch will always be conflated. - * - * @throws Exception - */ - @Test - public void testParallelPropagationBatchConflation() throws Exception { - initialSetUp(); - - vm4.invoke(() ->createSender( "ln", 2, - true, 100, 50, false, false, null, true )); - vm5.invoke(() ->createSender( "ln", 2, - true, 100, 50, false, false, null, true )); - vm6.invoke(() ->createSender( "ln", 2, - true, 100, 50, false, false, null, true )); - vm7.invoke(() ->createSender( "ln", 2, - true, 100, 50, false, false, null, true )); - - createSenderPRs(); - - startSenderInVMs("ln", vm4, vm5, vm6, vm7); - - pauseSenders(); - - createReceiverPrs(); - - final Map keyValues = new HashMap(); - - for (int i = 1; i <= 10; i++) { - for (int j = 1; j <= 10; j++) { - keyValues.put(j, i) ; - } - vm4.invoke(() ->putGivenKeyValue( - getTestMethodName(), keyValues )); - } - - vm4.invoke(() ->enableConflation( "ln" )); - vm5.invoke(() ->enableConflation( "ln" )); - vm6.invoke(() ->enableConflation( "ln" )); - vm7.invoke(() ->enableConflation( "ln" )); - - resumeSenders(); - - ArrayList<Integer> v4List = (ArrayList<Integer>)vm4.invoke(() -> - WANTestBase.getSenderStats( "ln", 0 )); - ArrayList<Integer> v5List = (ArrayList<Integer>)vm5.invoke(() -> - WANTestBase.getSenderStats( "ln", 0 )); - ArrayList<Integer> v6List = (ArrayList<Integer>)vm6.invoke(() -> - WANTestBase.getSenderStats( "ln", 0 )); - ArrayList<Integer> v7List = (ArrayList<Integer>)vm7.invoke(() -> - WANTestBase.getSenderStats( "ln", 0 )); - - assertTrue("No events conflated in batch", (v4List.get(8) + v5List.get(8) + v6List.get(8) + v7List.get(8)) > 0); - - vm2.invoke(() ->validateRegionSize( - getTestMethodName(), 10 )); - - } - - @Test - public void testParallelPropagationConflation() throws Exception { - doTestParallelPropagationConflation(0); - } - - @Test - public void testParallelPropagationConflationRedundancy2() throws Exception { - doTestParallelPropagationConflation(2); - } - - public void doTestParallelPropagationConflation(int redundancy) throws Exception { - initialSetUp(); - - createSendersWithConflation(); - - createSenderPRs(redundancy); - - startPausedSenders(); - - createReceiverPrs(); - - final Map keyValues = putKeyValues(); - - vm4.invoke(() ->checkQueueSize( "ln", keyValues.size() )); - final Map updateKeyValues = updateKeyValues(); - - vm4.invoke(() ->checkQueueSize( "ln", keyValues.size() + updateKeyValues.size() )); // creates aren't conflated - - vm4.invoke(() ->putGivenKeyValue( getTestMethodName(), updateKeyValues )); - - vm4.invoke(() ->checkQueueSize( "ln", keyValues.size() + updateKeyValues.size() )); // creates aren't conflated - - vm2.invoke(() ->validateRegionSize( - getTestMethodName(), 0 )); - - resumeSenders(); - - keyValues.putAll(updateKeyValues); - validateReceiverRegionSize(keyValues); - } - - @Test - public void testParallelPropagationConflationOfRandomKeys() throws Exception { - initialSetUp(); - - createSendersWithConflation(); - - createSenderPRs(); - - startPausedSenders(); - - createReceiverPrs(); - - final Map keyValues = putKeyValues(); - - vm4.invoke(() ->checkQueueSize( "ln", keyValues.size() )); - - final Map updateKeyValues = new HashMap(); - while(updateKeyValues.size()!=10) { - int key = (new Random()).nextInt(keyValues.size()); - updateKeyValues.put(key, key+"_updated"); - } - vm4.invoke(() ->putGivenKeyValue( getTestMethodName(), updateKeyValues )); - - vm4.invoke(() ->checkQueueSize( "ln", keyValues.size() + updateKeyValues.size() )); - - vm4.invoke(() ->putGivenKeyValue( getTestMethodName(), updateKeyValues )); - - vm4.invoke(() ->checkQueueSize( "ln", keyValues.size() + updateKeyValues.size() )); - - vm2.invoke(() ->validateRegionSize( - getTestMethodName(), 0 )); - - resumeSenders(); - - - keyValues.putAll(updateKeyValues); - validateReceiverRegionSize(keyValues); - - } - - @Test - public void testParallelPropagationColocatedRegionConflation() - throws Exception { - initialSetUp(); - - createSendersWithConflation(); - - createOrderShipmentOnSenders(); - - startPausedSenders(); - - createOrderShipmentOnReceivers(); - - Map custKeyValues = (Map)vm4.invoke(() ->putCustomerPartitionedRegion( 20 )); - Map orderKeyValues = (Map)vm4.invoke(() ->putOrderPartitionedRegion( 20 )); - Map shipmentKeyValues = (Map)vm4.invoke(() ->putShipmentPartitionedRegion( 20 )); - - vm4.invoke(() -> - WANTestBase.checkQueueSize( - "ln", - (custKeyValues.size() + orderKeyValues.size() + shipmentKeyValues - .size()) )); - - Map updatedCustKeyValues = (Map)vm4.invoke(() ->updateCustomerPartitionedRegion( 10 )); - Map updatedOrderKeyValues = (Map)vm4.invoke(() ->updateOrderPartitionedRegion( 10 )); - Map updatedShipmentKeyValues = (Map)vm4.invoke(() ->updateShipmentPartitionedRegion( 10 )); - int sum = (custKeyValues.size() + orderKeyValues.size() + shipmentKeyValues - .size()) - + updatedCustKeyValues.size() - + updatedOrderKeyValues.size() - + updatedShipmentKeyValues.size(); - vm4.invoke(() -> - WANTestBase.checkQueueSize( - "ln", - sum)); - - - updatedCustKeyValues = (Map)vm4.invoke(() ->updateCustomerPartitionedRegion( 10 )); - updatedOrderKeyValues = (Map)vm4.invoke(() ->updateOrderPartitionedRegion( 10 )); - updatedShipmentKeyValues = (Map)vm4.invoke(() ->updateShipmentPartitionedRegion( 10 )); - int sum2 = (custKeyValues.size() + orderKeyValues.size() + shipmentKeyValues - .size()) - + updatedCustKeyValues.size() - + updatedOrderKeyValues.size() - + updatedShipmentKeyValues.size(); - vm4.invoke(() -> - WANTestBase.checkQueueSize( - "ln", - sum2)); - - vm2.invoke(() ->validateRegionSize( - WANTestBase.customerRegionName, 0 )); - vm2.invoke(() ->validateRegionSize( - WANTestBase.orderRegionName, 0 )); - vm2.invoke(() ->validateRegionSize( - WANTestBase.shipmentRegionName, 0 )); - - resumeSenders(); - - custKeyValues.putAll(updatedCustKeyValues); - orderKeyValues.putAll(updatedOrderKeyValues); - shipmentKeyValues.putAll(updatedShipmentKeyValues); - - validateColocatedRegionContents(custKeyValues, orderKeyValues, - shipmentKeyValues); - - } - - // - //This is the same as the previous test, except for the UsingCustId methods - @Test - public void testParallelPropagationColocatedRegionConflationSameKey() - throws Exception { - initialSetUp(); - - createSendersWithConflation(); - - createOrderShipmentOnSenders(); - - startPausedSenders(); - - createOrderShipmentOnReceivers(); - - Map custKeyValues = (Map)vm4.invoke(() ->putCustomerPartitionedRegion( 20 )); - Map orderKeyValues = (Map)vm4.invoke(() ->putOrderPartitionedRegionUsingCustId( 20 )); - Map shipmentKeyValues = (Map)vm4.invoke(() ->putShipmentPartitionedRegionUsingCustId( 20 )); - - vm4.invoke(() ->checkQueueSize( "ln", (custKeyValues.size() + orderKeyValues.size() + shipmentKeyValues - .size()) )); - - Map updatedCustKeyValues = (Map)vm4.invoke(() ->updateCustomerPartitionedRegion( 10 )); - Map updatedOrderKeyValues = (Map)vm4.invoke(() ->updateOrderPartitionedRegionUsingCustId( 10 )); - Map updatedShipmentKeyValues = (Map)vm4.invoke(() ->updateShipmentPartitionedRegionUsingCustId( 10 )); - int sum = (custKeyValues.size() + orderKeyValues.size() + shipmentKeyValues - .size()) + updatedCustKeyValues.size() + updatedOrderKeyValues.size() + updatedShipmentKeyValues.size() ; - - vm4.invoke(() ->checkQueueSize( "ln", sum)); - - updatedCustKeyValues = (Map)vm4.invoke(() ->updateCustomerPartitionedRegion( 10 )); - updatedOrderKeyValues = (Map)vm4.invoke(() ->updateOrderPartitionedRegionUsingCustId( 10 )); - updatedShipmentKeyValues = (Map)vm4.invoke(() ->updateShipmentPartitionedRegionUsingCustId( 10 )); - - int sum2 = (custKeyValues.size() + orderKeyValues.size() + shipmentKeyValues - .size()) + updatedCustKeyValues.size() + updatedOrderKeyValues.size() + updatedShipmentKeyValues.size(); - vm4.invoke(() ->checkQueueSize( "ln", sum2)); - - vm2.invoke(() ->validateRegionSize( - WANTestBase.customerRegionName, 0 )); - vm2.invoke(() ->validateRegionSize( - WANTestBase.orderRegionName, 0 )); - vm2.invoke(() ->validateRegionSize( - WANTestBase.shipmentRegionName, 0 )); - - resumeSenders(); - - custKeyValues.putAll(updatedCustKeyValues); - orderKeyValues.putAll(updatedOrderKeyValues); - shipmentKeyValues.putAll(updatedShipmentKeyValues); - - validateColocatedRegionContents(custKeyValues, orderKeyValues, - shipmentKeyValues); - } - - protected void validateColocatedRegionContents(Map custKeyValues, - Map orderKeyValues, Map shipmentKeyValues) { - vm2.invoke(() ->validateRegionSize( - WANTestBase.customerRegionName, custKeyValues.size() )); - vm2.invoke(() ->validateRegionSize( - WANTestBase.orderRegionName, orderKeyValues.size() )); - vm2.invoke(() ->validateRegionSize( - WANTestBase.shipmentRegionName, shipmentKeyValues.size() )); - - vm2.invoke(() ->validateRegionContents( - WANTestBase.customerRegionName, custKeyValues )); - vm2.invoke(() ->validateRegionContents( - WANTestBase.orderRegionName, orderKeyValues )); - vm2.invoke(() ->validateRegionContents( - WANTestBase.shipmentRegionName, shipmentKeyValues )); - - vm3.invoke(() ->validateRegionSize( - WANTestBase.customerRegionName, custKeyValues.size() )); - vm3.invoke(() ->validateRegionSize( - WANTestBase.orderRegionName, orderKeyValues.size() )); - vm3.invoke(() ->validateRegionSize( - WANTestBase.shipmentRegionName, shipmentKeyValues.size() )); - - vm3.invoke(() ->validateRegionContents( - WANTestBase.customerRegionName, custKeyValues )); - vm3.invoke(() ->validateRegionContents( - WANTestBase.orderRegionName, orderKeyValues )); - vm3.invoke(() ->validateRegionContents( - WANTestBase.shipmentRegionName, shipmentKeyValues )); - } - - protected void createOrderShipmentOnReceivers() { - vm2.invoke(() ->createCustomerOrderShipmentPartitionedRegion(null, 1, 8, isOffHeap() )); - vm3.invoke(() ->createCustomerOrderShipmentPartitionedRegion(null, 1, 8, isOffHeap() )); - } - - protected void createOrderShipmentOnSenders() { - vm4.invoke(() ->createCustomerOrderShipmentPartitionedRegion("ln", 0, 8, isOffHeap() )); - vm5.invoke(() ->createCustomerOrderShipmentPartitionedRegion("ln", 0, 8, isOffHeap() )); - vm6.invoke(() ->createCustomerOrderShipmentPartitionedRegion("ln", 0, 8, isOffHeap() )); - vm7.invoke(() ->createCustomerOrderShipmentPartitionedRegion("ln", 0, 8, isOffHeap() )); - } - - protected Map updateKeyValues() { - final Map updateKeyValues = new HashMap(); - for(int i=0;i<10;i++) { - updateKeyValues.put(i, i+"_updated"); - } - - vm4.invoke(() ->putGivenKeyValue( getTestMethodName(), updateKeyValues )); - return updateKeyValues; - } - - protected Map putKeyValues() { - final Map keyValues = new HashMap(); - for(int i=0; i< 20; i++) { - keyValues.put(i, i); - } - - - vm4.invoke(() ->putGivenKeyValue( getTestMethodName(), keyValues )); - return keyValues; - } - - protected void validateReceiverRegionSize(final Map keyValues) { - vm2.invoke(() ->validateRegionSize( - getTestMethodName(), keyValues.size() )); - vm3.invoke(() ->validateRegionSize( - getTestMethodName(), keyValues.size() )); - - vm2.invoke(() ->validateRegionContents( - getTestMethodName(), keyValues )); - vm3.invoke(() ->validateRegionContents( - getTestMethodName(), keyValues )); - } - - protected void resumeSenders() { - vm4.invoke(() ->resumeSender( "ln" )); - vm5.invoke(() ->resumeSender( "ln" )); - vm6.invoke(() ->resumeSender( "ln" )); - vm7.invoke(() ->resumeSender( "ln" )); - } - - protected void createReceiverPrs() { - vm2.invoke(() ->createPartitionedRegion( - getTestMethodName(), null, 1, 8, isOffHeap() )); - vm3.invoke(() ->createPartitionedRegion( - getTestMethodName(), null, 1, 8, isOffHeap() )); - } - - protected void startPausedSenders() { - startSenderInVMs("ln", vm4, vm5, vm6, vm7); - - pauseSenders(); - } - - protected void pauseSenders() { - vm4.invoke(() ->pauseSender( "ln" )); - vm5.invoke(() ->pauseSender( "ln" )); - vm6.invoke(() ->pauseSender( "ln" )); - vm7.invoke(() ->pauseSender( "ln" )); - } - - protected void createSenderPRs() { - createSenderPRs(0); - } - - protected void createSenderPRs(int redundancy) { - vm4.invoke(() ->createPartitionedRegion( - getTestMethodName(), "ln", redundancy, 8, isOffHeap() )); - vm5.invoke(() ->createPartitionedRegion( - getTestMethodName(), "ln", redundancy, 8, isOffHeap() )); - vm6.invoke(() ->createPartitionedRegion( - getTestMethodName(), "ln", redundancy, 8, isOffHeap() )); - vm7.invoke(() ->createPartitionedRegion( - getTestMethodName(), "ln", redundancy, 8, isOffHeap() )); - } - - protected void initialSetUp() { - Integer lnPort = (Integer)vm0.invoke(() ->createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() ->createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - } - - protected void createSendersNoConflation() { - vm4.invoke(() ->createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - vm5.invoke(() ->createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - vm6.invoke(() ->createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - vm7.invoke(() ->createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - } - - protected void createSendersWithConflation() { - vm4.invoke(() ->createSender( "ln", 2, - true, 100, 2, true, false, null, true )); - vm5.invoke(() ->createSender( "ln", 2, - true, 100, 2, true, false, null, true )); - vm6.invoke(() ->createSender( "ln", 2, - true, 100, 2, true, false, null, true )); - vm7.invoke(() ->createSender( "ln", 2, - true, 100, 2, true, false, null, true )); - } - -}
