http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_2_DUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_2_DUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_2_DUnitTest.java deleted file mode 100644 index 41f01f3..0000000 --- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_2_DUnitTest.java +++ /dev/null @@ -1,625 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.wan.concurrent; - -import org.junit.experimental.categories.Category; -import org.junit.Test; - -import static org.junit.Assert.*; - -import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase; -import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase; -import com.gemstone.gemfire.test.junit.categories.DistributedTest; - -import java.util.concurrent.TimeUnit; - -import com.gemstone.gemfire.cache.Region; -import com.gemstone.gemfire.cache.RegionDestroyedException; -import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy; -import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender; -import com.gemstone.gemfire.internal.cache.wan.WANTestBase; -import com.gemstone.gemfire.test.dunit.Assert; -import com.gemstone.gemfire.test.dunit.AsyncInvocation; -import com.gemstone.gemfire.test.dunit.IgnoredException; -import com.gemstone.gemfire.test.dunit.LogWriterUtils; -import com.gemstone.gemfire.test.dunit.VM; -import com.gemstone.gemfire.test.dunit.Wait; -import com.gemstone.gemfire.test.dunit.WaitCriterion; -import com.jayway.awaitility.Awaitility; - -/** - * - */ -@Category(DistributedTest.class) -public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTestBase { - - private static final long serialVersionUID = 1L; - - public ConcurrentParallelGatewaySenderOperation_2_DUnitTest() { - super(); - } - - @Override - protected final void postSetUpWANTestBase() throws Exception { - IgnoredException.addIgnoredException("RegionDestroyedException"); - IgnoredException.addIgnoredException("Broken pipe"); - IgnoredException.addIgnoredException("Connection reset"); - IgnoredException.addIgnoredException("Unexpected IOException"); - } - - // to test that when userPR is locally destroyed, shadow Pr is also locally - // destroyed and on recreation userPr , shadow Pr is also recreated. - @Test - public void testParallelGatewaySender_SingleNode_UserPR_localDestroy_RecreateRegion() throws Exception { - Integer[] locatorPorts = createLNAndNYLocators(); - Integer lnPort = locatorPorts[0]; - Integer nyPort = locatorPorts[1]; - - try { - String regionName = getTestMethodName() + "_PR"; - - createCacheInVMs(lnPort, vm4); - vm4.invoke(() -> AbstractGatewaySender.MAXIMUM_SHUTDOWN_WAIT_TIME = -1); - vm4.invoke(() -> createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 10, isOffHeap())); - vm4.invoke(() -> createConcurrentSender("ln", 2, true, 100, 10, false, false, null, false, 5, OrderPolicy.KEY)); - vm4.invoke(() -> startSender("ln")); - vm4.invoke(() -> pauseSender("ln")); - - createCacheInVMs(nyPort, vm2); - vm2.invoke(() -> createPartitionedRegion(regionName, null, 1, 10, isOffHeap())); - vm2.invoke(() -> createReceiver()); - - vm4.invoke(() -> doPuts(regionName, 10)); - vm4.invoke(() -> validateRegionSize(regionName, 10)); - - vm2.invoke(() -> validateRegionSize(regionName, 0)); - - vm4.invoke(() -> localDestroyRegion(getTestMethodName() + "_PR")); - - vm2.invoke(() -> validateRegionSize(regionName, 0)); - - vm4.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 10, isOffHeap())); - vm4.invoke(() -> doPutsFrom(regionName, 10, 20)); - - vm2.invoke(() -> Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> validateRegionSize(regionName, 0))); - - vm4.invoke(() -> validateRegionSize(regionName, 10)); - } finally { - vm4.invoke(() -> AbstractGatewaySender.MAXIMUM_SHUTDOWN_WAIT_TIME = 0); - } - } - - @Test - public void testParallelGatewaySender_SingleNode_UserPR_Destroy_RecreateRegion() throws Exception { - Integer[] locatorPorts = createLNAndNYLocators(); - Integer lnPort = locatorPorts[0]; - Integer nyPort = locatorPorts[1]; - - try { - String regionName = getTestMethodName() + "_PR"; - - createCacheInVMs(lnPort, vm4); - vm4.invoke(() -> AbstractGatewaySender.MAXIMUM_SHUTDOWN_WAIT_TIME = -1); - vm4.invoke(() -> createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 10, isOffHeap())); - vm4.invoke(() -> createConcurrentSender("ln", 2, true, 100, 10, false, false, null, false, 4, OrderPolicy.KEY)); - vm4.invoke(() -> startSender("ln")); - vm4.invoke(() -> pauseSender("ln")); - - createCacheInVMs(nyPort, vm2); - vm2.invoke(() -> createPartitionedRegion(regionName, null, 1, 10, isOffHeap())); - vm2.invoke(() -> createReceiver()); - - vm4.invoke(() -> doPuts(regionName, 10)); - vm4.invoke(() -> validateRegionSize(regionName, 10)); - - vm2.invoke(() -> validateRegionSize(regionName, 0)); - - vm4.invoke(() -> resumeSender("ln")); - vm4.invoke(() -> validateParallelSenderQueueAllBucketsDrained("ln")); - vm4.invoke(() -> localDestroyRegion(getTestMethodName() + "_PR")); - - vm2.invoke(() -> validateRegionSize(regionName, 10)); - - vm4.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 10, isOffHeap())); - vm4.invoke(() -> doPutsFrom(regionName, 10, 20)); - - vm2.invoke(() -> Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> validateRegionSize(regionName, 20))); - - vm4.invoke(() -> validateRegionSize(regionName, 10)); - - } finally { - vm4.invoke(() -> AbstractGatewaySender.MAXIMUM_SHUTDOWN_WAIT_TIME = 0); - } - } - - @Test - public void testParallelGatewaySender_SingleNode_UserPR_Close_RecreateRegion() throws Exception { - Integer[] locatorPorts = createLNAndNYLocators(); - Integer lnPort = locatorPorts[0]; - Integer nyPort = locatorPorts[1]; - try { - String regionName = getTestMethodName() + "_PR"; - createCacheInVMs(lnPort, vm4); - vm4.invoke(() -> AbstractGatewaySender.MAXIMUM_SHUTDOWN_WAIT_TIME = -1); - vm4.invoke(() -> createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 10, isOffHeap())); - vm4.invoke(() -> createConcurrentSender("ln", 2, true, 100, 10, false, false, null, false, 7, OrderPolicy.KEY)); - vm4.invoke(() -> startSender("ln")); - vm4.invoke(() -> pauseSender("ln")); - - createCacheInVMs(nyPort, vm2); - vm2.invoke(() -> createPartitionedRegion(regionName, null, 1, 10, isOffHeap())); - vm2.invoke(() -> createReceiver()); - - vm4.invoke(() -> doPuts(regionName, 10)); - vm4.invoke(() -> validateRegionSize(regionName, 10)); - vm4.invoke(() -> closeRegion(getTestMethodName() + "_PR")); - vm4.invoke(() -> resumeSender("ln")); - - Thread.sleep(500); - - vm2.invoke(() -> validateRegionSize(regionName, 0)); - - vm4.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 10, isOffHeap())); - vm4.invoke(() -> doPutsFrom(regionName, 10, 20)); - - vm2.invoke(() -> Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> validateRegionSize(regionName, 10))); - - vm4.invoke(() -> validateRegionSize(regionName, 10)); - } - finally { - vm4.invoke(() -> AbstractGatewaySender.MAXIMUM_SHUTDOWN_WAIT_TIME = 0); - } - } - - @Test - public void testParallelGatewaySender_SingleNode_UserPR_Destroy_SimultaneousPut_RecreateRegion() throws Exception { - Integer[] locatorPorts = createLNAndNYLocators(); - Integer lnPort = locatorPorts[0]; - Integer nyPort = locatorPorts[1]; - - try { - createAndStartSender(vm4, lnPort, 6, false, true); - - vm4.invoke(() -> addCacheListenerAndDestroyRegion(getTestMethodName() + "_PR")); - - createReceiverAndDoPutsInPausedSender(nyPort); - - vm4.invoke(() -> resumeSender("ln")); - - AsyncInvocation putAsync = vm4.invokeAsync(() -> WANTestBase.doPutsFrom( getTestMethodName() + "_PR", 10, 101 )); - try { - putAsync.join(); - } catch (InterruptedException e) { - e.printStackTrace(); - fail("Interrupted the async invocation."); - } - - if (putAsync.getException() != null - && !(putAsync.getException() instanceof RegionDestroyedException)) { - Assert.fail("Expected RegionDestroyedException but got", - putAsync.getException()); - } - - // before destroy, there is wait for queue to drain, so data will be - // dispatched - vm2.invoke(() -> validateRegionSizeWithinRange(getTestMethodName() + "_PR", 10, 101)); // possible size is more than 10 - - vm4.invoke(() -> createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 10, isOffHeap())); - - vm4.invoke(() -> doPutsFrom(getTestMethodName() + "_PR", 10, 20)); - - vm4.invoke(() -> validateRegionSize(getTestMethodName() + "_PR", 10)); - - vm2.invoke(() -> validateRegionSizeWithinRange(getTestMethodName() + "_PR", 20, 101)); // possible size is more than 20 - } finally { - vm4.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME()); - } - } - - @Test - public void testParallelGatewaySender_SingleNode_UserPR_Destroy_NodeDown() - throws Exception { - IgnoredException.addIgnoredException("Broken pipe"); - IgnoredException.addIgnoredException("Connection reset"); - IgnoredException.addIgnoredException("Unexpected IOException"); - Integer[] locatorPorts = createLNAndNYLocators(); - Integer lnPort = locatorPorts[0]; - Integer nyPort = locatorPorts[1]; - - try { - createAndStartSender(vm4, lnPort, 5, false, true); - createAndStartSender(vm5, lnPort, 5, false, true); - createAndStartSender(vm6, lnPort, 5, false, true); - - createReceiverAndDoPutsInPausedSender(nyPort); - - vm4.invoke(() -> WANTestBase.resumeSender( "ln" )); - vm5.invoke(() -> WANTestBase.resumeSender( "ln" )); - vm6.invoke(() -> WANTestBase.resumeSender( "ln" )); - - Wait.pause(200); - AsyncInvocation localDestroyAsync = vm4.invokeAsync(() -> WANTestBase.destroyRegion( getTestMethodName() + "_PR" )); - - AsyncInvocation closeAsync = vm4.invokeAsync(() -> WANTestBase.closeCache()); - try { - localDestroyAsync.join(); - closeAsync.join(); - } catch (InterruptedException e) { - e.printStackTrace(); - fail("Interrupted the async invocation."); - } - - vm2.invoke(() -> validateRegionSize(getTestMethodName() + "_PR", 10)); - } finally { - vm4.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME()); - vm5.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME()); - vm6.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME()); - } - - } - - @Test - public void testParallelGatewaySender_SingleNode_UserPR_Close_SimultaneousPut_RecreateRegion() throws Exception { - Integer[] locatorPorts = createLNAndNYLocators(); - Integer lnPort = locatorPorts[0]; - Integer nyPort = locatorPorts[1]; - - try { - String regionName = getTestMethodName() + "_PR"; - - createCacheInVMs(lnPort, vm4); - vm4.invoke(() -> AbstractGatewaySender.MAXIMUM_SHUTDOWN_WAIT_TIME = -1); - vm4.invoke(() -> createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 10, isOffHeap())); - vm4.invoke(() -> createConcurrentSender("ln", 2, true, 100, 10, false, false, null, false, 5, OrderPolicy.KEY)); - vm4.invoke(() -> startSender("ln")); - vm4.invoke(() -> pauseSender("ln")); - - createCacheInVMs(nyPort, vm2); - vm2.invoke(() -> createPartitionedRegion(regionName, null, 1, 10, isOffHeap())); - vm2.invoke(() -> createReceiver()); - - vm4.invoke(() -> doPuts(regionName, 10)); - vm4.invoke(() -> validateRegionSize(regionName, 10)); - - vm2.invoke(() -> validateRegionSize(regionName, 0)); - - AsyncInvocation putAsync = vm4.invokeAsync(() -> WANTestBase.doPutsFrom( getTestMethodName() + "_PR", 10, 2000 )); - AsyncInvocation localDestroyAsync = vm4.invokeAsync(() -> ConcurrentParallelGatewaySenderOperation_2_DUnitTest. - closeRegion( getTestMethodName() + "_PR" )); - try { - putAsync.join(); - localDestroyAsync.join(); - } catch (InterruptedException e) { - e.printStackTrace(); - fail("Interrupted the async invocation."); - } - vm2.invoke(() -> validateRegionSize(regionName, 0)); - - vm4.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 10, isOffHeap())); - vm4.invoke(() -> doPutsFrom(regionName, 10, 20)); - - vm2.invoke(() -> Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> validateRegionSize(regionName, 0))); - - vm4.invoke(() -> validateRegionSize(regionName, 10)); - } finally { - vm4.invoke(() -> AbstractGatewaySender.MAXIMUM_SHUTDOWN_WAIT_TIME = 0); - } - } - - @Test - public void testParallelGatewaySenders_SingleNode_UserPR_localDestroy_RecreateRegion() throws Exception { - Integer[] locatorPorts = createLNAndNYLocators(); - Integer lnPort = locatorPorts[0]; - Integer nyPort = locatorPorts[1]; - Integer tkPort = (Integer)vm2.invoke(() -> createFirstRemoteLocator(3, lnPort)); - Integer pnPort = (Integer)vm3.invoke(() -> createFirstRemoteLocator(4, lnPort)); - - createCacheInVMs(nyPort, vm4); - vm4.invoke(() -> createReceiver()); - createCacheInVMs(tkPort, vm5); - vm5.invoke(() -> createReceiver()); - createCacheInVMs(pnPort, vm6); - vm6.invoke(() -> createReceiver()); - - try { - vm7.invoke(() -> createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME(lnPort)); - - LogWriterUtils.getLogWriter().info("Created cache on local site"); - - vm7.invoke(() -> createConcurrentSender("ln1", 2, true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY)); - vm7.invoke(() -> createConcurrentSender("ln2", 3, true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY)); - vm7.invoke(() -> createConcurrentSender("ln3", 4, true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY)); - - vm7.invoke(() -> startSender("ln1")); - vm7.invoke(() -> startSender("ln2")); - vm7.invoke(() -> startSender("ln3")); - - String regionName = getTestMethodName() + "_PR"; - vm7.invoke(() -> createPartitionedRegion(regionName, "ln1,ln2,ln3", 1, 10, isOffHeap())); - - LogWriterUtils.getLogWriter().info("Created PRs on local site"); - - vm4.invoke(() -> createPartitionedRegion(regionName, null, 1, 10, isOffHeap())); - vm5.invoke(() -> createPartitionedRegion(regionName, null, 1, 10, isOffHeap())); - vm6.invoke(() -> createPartitionedRegion(regionName, null, 1, 10, isOffHeap())); - - vm7.invoke(() -> doPuts(regionName, 10)); - - vm7.invoke(() -> validateParallelSenderQueueAllBucketsDrained("ln1")); - vm7.invoke(() -> validateParallelSenderQueueAllBucketsDrained("ln2")); - vm7.invoke(() -> validateParallelSenderQueueAllBucketsDrained("ln3")); - - vm7.invoke(() -> localDestroyRegion(regionName)); - - vm7.invoke(() -> createPartitionedRegion(regionName, "ln1,ln2,ln3", 1, 10, isOffHeap())); - - vm7.invoke(() -> doPutsFrom(regionName, 10, 20)); - - vm7.invoke(() -> validateRegionSize(regionName, 10)); - - validateRegionSizes(regionName, 20, vm4, vm5, vm6); - } finally { - vm7.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME()); - } - } - - @Test - public void testParallelGatewaySender_MultipleNode_UserPR_localDestroy_Recreate() throws Exception { - Integer[] locatorPorts = createLNAndNYLocators(); - Integer lnPort = locatorPorts[0]; - Integer nyPort = locatorPorts[1]; - - createCacheInVMs(nyPort, vm2); - vm2.invoke(() -> createReceiver()); - - try { - createAndStartSender(vm4, lnPort, 5, true, false); - createAndStartSender(vm5, lnPort, 5, true, false); - - String regionName = getTestMethodName() + "_PR"; - vm2.invoke(() -> createPartitionedRegion(regionName, null, 1, 10, isOffHeap())); - - AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.doPuts( regionName, 10 )); - Wait.pause(1000); - vm5.invoke(() -> localDestroyRegion(regionName)); - - try { - inv1.join(); - } catch (InterruptedException ex) { - ex.printStackTrace(); - fail("Interrupted the async invocation."); - } - - - validateRegionSizes(regionName, 10, vm4, vm2); - - vm5.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 10, isOffHeap())); - - vm4.invoke(() -> doPutsFrom(regionName, 10, 20)); - - validateRegionSizes(regionName, 20, vm4, vm2); - } finally { - vm4.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME()); - vm5.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME()); - } - } - - @Test - public void testParallelGatewaySenders_MultipleNode_UserPR_localDestroy_Recreate() throws Exception { - Integer[] locatorPorts = createLNAndNYLocators(); - Integer lnPort = locatorPorts[0]; - Integer nyPort = locatorPorts[1]; - Integer tkPort = (Integer)vm2.invoke(() -> createFirstRemoteLocator(3, lnPort)); - - createCacheInVMs(nyPort, vm6); - vm6.invoke(() -> createReceiver()); - createCacheInVMs(tkPort, vm7); - vm7.invoke(() -> createReceiver()); - - try { - createAndStartTwoSenders(vm4, lnPort, 4); - createAndStartTwoSenders(vm5, lnPort, 4); - - String regionName = getTestMethodName() + "_PR"; - vm6.invoke(() -> WANTestBase.createPartitionedRegion( - regionName, null, 1, 100, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createPartitionedRegion( - regionName, null, 1, 100, isOffHeap() )); - - AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.doPuts( regionName, 10 )); - - Wait.pause(1000); - vm5.invoke(() -> WANTestBase.localDestroyRegion( regionName )); - - try { - inv1.join(); - } catch (InterruptedException ex) { - ex.printStackTrace(); - fail("Interrupted the async invocation."); - } - - validateRegionSizes(regionName, 10, vm4, vm6, vm7); - - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - regionName, "ln1,ln2", 1, 100, isOffHeap() )); - - vm4.invoke(() -> WANTestBase.doPutsFrom( - regionName, 10, 20 )); - - validateRegionSizes(regionName, 20, vm4, vm6, vm7); - } finally { - vm4.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME()); - vm5.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME()); - } - } - - @Test - public void testParallelGatewaySender_ColocatedPartitionedRegions_localDestroy() throws Exception { - Integer[] locatorPorts = createLNAndNYLocators(); - Integer lnPort = locatorPorts[0]; - Integer nyPort = locatorPorts[1]; - - createCacheInVMs(nyPort, vm2); - vm2.invoke(() -> createReceiver()); - - try { - createAndStartSenderWithCustomerOrderShipmentRegion(vm4, lnPort, 5, true); - createAndStartSenderWithCustomerOrderShipmentRegion(vm5, lnPort, 5, true); - - LogWriterUtils.getLogWriter().info("Created PRs on local site"); - - vm2.invoke(() -> createCustomerOrderShipmentPartitionedRegion(null, 1, 100, isOffHeap())); - - AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.putcolocatedPartitionedRegion( 10 )); - Wait.pause(1000); - - try { - vm5.invoke(() -> localDestroyRegion(customerRegionName)); - } catch (Exception ex) { - assertTrue(ex.getCause() instanceof UnsupportedOperationException); - } - - try { - inv1.join(); - } catch (Exception e) { - Assert.fail("Unexpected exception", e); - } - - validateRegionSizes(customerRegionName, 10, vm4, vm5, vm2); - } finally { - vm4.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME()); - vm5.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME()); - } - } - - @Test - public void testParallelGatewaySender_ColocatedPartitionedRegions_destroy() throws Exception { - Integer[] locatorPorts = createLNAndNYLocators(); - Integer lnPort = locatorPorts[0]; - Integer nyPort = locatorPorts[1]; - - createCacheInVMs(nyPort, vm2); - vm2.invoke(() -> WANTestBase.createReceiver()); - - try { - createAndStartSenderWithCustomerOrderShipmentRegion(vm4, lnPort, 6, true); - createAndStartSenderWithCustomerOrderShipmentRegion(vm5, lnPort, 6, true); - - LogWriterUtils.getLogWriter().info("Created PRs on local site"); - - vm2.invoke(() -> WANTestBase.createCustomerOrderShipmentPartitionedRegion(null, 1, 100, isOffHeap() )); - - AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.putcolocatedPartitionedRegion( 2000 )); - Wait.pause(1000); - - try { - vm5.invoke(() -> WANTestBase.destroyRegion( customerRegionName )); - } catch (Exception ex) { - assertTrue(ex.getCause() instanceof IllegalStateException); - return; - } - fail("Expected UnsupportedOperationException"); - } finally { - vm4.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME()); - vm5.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME()); - } - } - - public static void clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME() { - AbstractGatewaySender.MAXIMUM_SHUTDOWN_WAIT_TIME = 0; - } - - public static void closeRegion(String regionName) { - Region r = cache.getRegion(Region.SEPARATOR + regionName); - assertNotNull(r); - r.close(); - } - - public static void validateRegionSizeWithinRange(String regionName, - final int min, final int max) { - final Region r = cache.getRegion(Region.SEPARATOR + regionName); - assertNotNull(r); - WaitCriterion wc = new WaitCriterion() { - public boolean done() { - if (r.keySet().size() > min && r.keySet().size() <= max) { - return true; - } - return false; - } - - public String description() { - return "Expected region entries to be within range : " + min + " " - + max + " but actual entries: " + r.keySet().size(); - } - }; - Wait.waitForCriterion(wc, 120000, 500, true); - } - - protected static void createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME( - Integer locPort) { - createCache(false, locPort); - AbstractGatewaySender.MAXIMUM_SHUTDOWN_WAIT_TIME = -1; - } - - protected void createAndStartSender(VM vm, int port, int concurrencyLevel, boolean manualStart, boolean pause) { - vm.invoke(() -> createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME(port)); - vm.invoke(() -> createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 10, isOffHeap())); - createSender(vm, concurrencyLevel, manualStart); - vm.invoke(() -> startSender("ln")); - if (pause) { - vm.invoke(() -> pauseSender("ln")); - } - LogWriterUtils.getLogWriter().info("Created PRs on local site"); - } - - protected void createReceiverAndDoPutsInPausedSender(int port) { - // Note: This is a test-specific method used by several tests to do puts from vm4 to vm2. - String regionName = getTestMethodName() + "_PR"; - createCacheInVMs(port, vm2); - vm2.invoke(() -> createReceiver()); - vm2.invoke(() -> createPartitionedRegion(regionName, null, 1, 10, isOffHeap())); - vm4.invoke(() -> doPuts(regionName, 10)); - vm4.invoke(() -> validateRegionSize(regionName, 10)); - // since sender is paused, no dispatching - vm2.invoke(() -> validateRegionSize(regionName, 0)); - } - - protected void createAndStartTwoSenders(VM vm, int port, int concurrencyLevel) { - // Note: This is a test-specific method used to create and start 2 senders. - vm.invoke(() -> createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME(port)); - vm.invoke(() -> createPartitionedRegion(getTestMethodName() + "_PR", "ln1,ln2", 1, 100, isOffHeap())); - createSenders(vm, concurrencyLevel); - vm.invoke(() -> startSender("ln1")); - vm.invoke(() -> startSender("ln2")); - } - - protected void createAndStartSenderWithCustomerOrderShipmentRegion(VM vm, int port, int concurrencyLevel, boolean manualStart) { - vm.invoke(() -> createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME(port)); - vm.invoke(() -> createCustomerOrderShipmentPartitionedRegion("ln", 1, 100, isOffHeap())); - createSender(vm, concurrencyLevel, manualStart); - vm.invoke(() -> startSender("ln")); - } - - protected void createSender(VM vm, int concurrencyLevel, boolean manualStart) { - vm.invoke(() -> createConcurrentSender("ln", 2, true, 100, 10, false, false, null, manualStart, concurrencyLevel, OrderPolicy.KEY)); - } - - protected void createSenders(VM vm, int concurrencyLevel) { - vm.invoke(() -> createConcurrentSender("ln1", 2, true, 100, 10, false, false, null, true, concurrencyLevel, OrderPolicy.KEY)); - vm.invoke(() -> createConcurrentSender("ln2", 3, true, 100, 10, false, false, null, true, concurrencyLevel, OrderPolicy.KEY)); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentSerialGatewaySenderOperationsDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentSerialGatewaySenderOperationsDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentSerialGatewaySenderOperationsDUnitTest.java deleted file mode 100644 index f09e19c..0000000 --- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentSerialGatewaySenderOperationsDUnitTest.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.wan.concurrent; - -import org.junit.experimental.categories.Category; -import org.junit.Test; - -import static org.junit.Assert.*; - -import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase; -import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase; -import com.gemstone.gemfire.test.junit.categories.DistributedTest; - -import java.util.Set; - -import com.gemstone.gemfire.cache.wan.GatewaySender; -import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy; -import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender; -import com.gemstone.gemfire.internal.cache.wan.WANTestBase; -import com.gemstone.gemfire.internal.cache.wan.serial.SerialGatewaySenderOperationsDUnitTest; -import com.gemstone.gemfire.test.dunit.VM; - -/** - * - */ -@Category(DistributedTest.class) -public class ConcurrentSerialGatewaySenderOperationsDUnitTest extends SerialGatewaySenderOperationsDUnitTest { - - private static final long serialVersionUID = 1L; - - public ConcurrentSerialGatewaySenderOperationsDUnitTest() { - super(); - } - - protected void createSenderVM5() { - vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - false, 100, 10, false, true, null, true, 5, OrderPolicy.KEY )); - } - - protected void createSenderVM4() { - vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - false, 100, 10, false, true, null, true, 5, OrderPolicy.KEY )); - } - - protected void validateQueueClosedVM4() { - vm4.invoke(() -> WANTestBase.validateQueueClosedForConcurrentSerialGatewaySender( "ln")); - } - - private void validateQueueContents(VM vm, String site, int size) { - vm.invoke(() -> WANTestBase.validateQueueContentsForConcurrentSerialGatewaySender( site, size)); - } - - public static void verifySenderPausedState(String senderId) { - Set<GatewaySender> senders = cache.getGatewaySenders(); - AbstractGatewaySender sender = null; - for (GatewaySender s : senders) { - if (s.getId().equals(senderId)) { - sender = (AbstractGatewaySender)s; - break; - } - } - assertTrue(sender.isPaused()); - } - - public static void verifySenderResumedState(String senderId) { - Set<GatewaySender> senders = cache.getGatewaySenders(); - AbstractGatewaySender sender = null; - for (GatewaySender s : senders) { - if (s.getId().equals(senderId)) { - sender = (AbstractGatewaySender)s; - break; - } - } - assertFalse(sender.isPaused()); - assertTrue(sender.isRunning()); - } - - public static void verifySenderStoppedState(String senderId) { - Set<GatewaySender> senders = cache.getGatewaySenders(); - AbstractGatewaySender sender = null; - for (GatewaySender s : senders) { - if (s.getId().equals(senderId)) { - sender = (AbstractGatewaySender)s; - break; - } - } - assertFalse(sender.isRunning()); - assertFalse(sender.isPaused()); - } - - public static void verifyGatewaySenderOperations(String senderId) { - Set<GatewaySender> senders = cache.getGatewaySenders(); - GatewaySender sender = null; - for (GatewaySender s : senders) { - if (s.getId().equals(senderId)) { - sender = s; - break; - } - } - assertFalse(sender.isPaused()); - assertFalse(((AbstractGatewaySender)sender).isRunning()); - sender.pause(); - sender.resume(); - sender.stop(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentSerialGatewaySenderOperationsOffHeapDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentSerialGatewaySenderOperationsOffHeapDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentSerialGatewaySenderOperationsOffHeapDUnitTest.java deleted file mode 100644 index 3a0caef..0000000 --- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentSerialGatewaySenderOperationsOffHeapDUnitTest.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.wan.concurrent; - -import org.junit.experimental.categories.Category; -import org.junit.Test; - -import static org.junit.Assert.*; - -import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase; -import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase; -import com.gemstone.gemfire.test.junit.categories.DistributedTest; - -@SuppressWarnings("serial") -@Category(DistributedTest.class) -public class ConcurrentSerialGatewaySenderOperationsOffHeapDUnitTest extends - ConcurrentSerialGatewaySenderOperationsDUnitTest { - - public ConcurrentSerialGatewaySenderOperationsOffHeapDUnitTest() { - super(); - } - - @Override - public boolean isOffHeap() { - return true; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentWANPropagation_1_DUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentWANPropagation_1_DUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentWANPropagation_1_DUnitTest.java deleted file mode 100644 index ead62e0..0000000 --- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentWANPropagation_1_DUnitTest.java +++ /dev/null @@ -1,568 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.wan.concurrent; - -import org.junit.experimental.categories.Category; -import org.junit.Test; - -import static org.junit.Assert.*; - -import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase; -import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase; -import com.gemstone.gemfire.test.junit.categories.DistributedTest; - -import com.gemstone.gemfire.cache.CacheException; -import com.gemstone.gemfire.cache.EntryExistsException; -import com.gemstone.gemfire.cache.client.ServerOperationException; -import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy; -import com.gemstone.gemfire.cache30.CacheSerializableRunnable; -import com.gemstone.gemfire.distributed.internal.DistributionConfig; -import com.gemstone.gemfire.internal.cache.wan.BatchException70; -import com.gemstone.gemfire.internal.cache.wan.WANTestBase; -import com.gemstone.gemfire.test.dunit.AsyncInvocation; -import com.gemstone.gemfire.test.dunit.IgnoredException; -import com.gemstone.gemfire.test.dunit.LogWriterUtils; - -/** - * All the test cases are similar to SerialWANPropagationDUnitTest except that - * the we create concurrent serial GatewaySender with concurrency of 4 - * - */ -@Category(DistributedTest.class) -public class ConcurrentWANPropagation_1_DUnitTest extends WANTestBase { - - /** - * @param name - */ - public ConcurrentWANPropagation_1_DUnitTest() { - super(); - } - - private static final long serialVersionUID = 1L; - - /** - * All the test cases are similar to SerialWANPropagationDUnitTest - * @throws Exception - */ - @Test - public void testReplicatedSerialPropagation_withoutRemoteSite() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - //keep the batch size high enough to reduce the number of exceptions in the log - vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - false, 100, 400, false, false, null, true, 4, OrderPolicy.KEY )); - vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - false, 100, 400, false, false, null, true, 4, OrderPolicy.KEY )); - - startSenderInVMs("ln", vm4, vm5); - - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - - IgnoredException.addIgnoredException(BatchException70.class.getName()); - IgnoredException.addIgnoredException(ServerOperationException.class.getName()); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", - 1000 )); - - createCacheInVMs(nyPort, vm2, vm3); - - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - - vm2.invoke(() -> WANTestBase.createReceiver()); - vm3.invoke(() -> WANTestBase.createReceiver()); - - vm4.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - } - - @Test - public void testReplicatedSerialPropagation() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD )); - vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD )); - - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5); - - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", - 1000 )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - } - - - @Test - public void testReplicatedSerialPropagationWithLocalSiteClosedAndRebuilt() throws Exception { - IgnoredException.addIgnoredException("Broken pipe"); - IgnoredException.addIgnoredException("Connection reset"); - IgnoredException.addIgnoredException("Unexpected IOException"); - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD )); - vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD )); - - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5); - - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", - 1000 )); - - //---------close local site and build again----------------------------------------- - vm4.invoke(() -> WANTestBase.killSender( )); - vm5.invoke(() -> WANTestBase.killSender( )); - vm6.invoke(() -> WANTestBase.killSender( )); - vm7.invoke(() -> WANTestBase.killSender( )); - - Integer regionSize = - (Integer) vm2.invoke(() -> WANTestBase.getRegionSize(getTestMethodName() + "_RR" )); - LogWriterUtils.getLogWriter().info("Region size on remote is: " + regionSize); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD )); - vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD )); - - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5); - - vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - - IgnoredException.addIgnoredException(EntryExistsException.class.getName()); - IgnoredException.addIgnoredException(BatchException70.class.getName()); - IgnoredException.addIgnoredException(ServerOperationException.class.getName()); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", - 1000 )); - //---------------------------------------------------------------------------------- - - //verify remote site receives all the events - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - } - - /** - * Two regions configured with the same sender and put is in progress - * on both the regions. - * One of the two regions is destroyed in the middle. - * - * @throws Exception - */ - @Test - public void testReplicatedSerialPropagationWithLocalRegionDestroy() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - //these are part of remote site - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - //these are part of local site - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - //senders are created on local site - vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - false, 100, 20, false, false, null, true, 3, OrderPolicy.THREAD )); - vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - false, 100, 20, false, false, null, true ,3, OrderPolicy.THREAD)); - - //create one RR (RR_1) on remote site - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", null, isOffHeap() )); - - //create another RR (RR_2) on remote site - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_2", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_2", null, isOffHeap() )); - - //start the senders on local site - startSenderInVMs("ln", vm4, vm5); - - //create one RR (RR_1) on local site - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", "ln", isOffHeap() )); - - //create another RR (RR_2) on local site - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_2", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_2", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_2", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_2", "ln", isOffHeap() )); - - //start puts in RR_1 in another thread - AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_RR_1", 1000 )); - //do puts in RR_2 in main thread - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR_2", 500 )); - //destroy RR_2 after above puts are complete - vm4.invoke(() -> WANTestBase.destroyRegion( getTestMethodName() + "_RR_2")); - - try { - inv1.join(); - } catch (InterruptedException e) { - e.printStackTrace(); - fail(); - } - //sleep for some time to let all the events propagate to remote site - Thread.sleep(20); - //vm4.invoke(() -> WANTestBase.verifyQueueSize( "ln", 0 )); - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR_1", 1000 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR_2", 500 )); - } - - /** - * 1 region and sender configured on local site and 1 region and a - * receiver configured on remote site. Puts to the local region are in progress. - * Remote region is destroyed in the middle. - * - * @throws Exception - */ - @Test - public void testReplicatedSerialPropagationWithRemoteRegionDestroy() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - //these are part of remote site - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - - //these are part of local site - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - //senders are created on local site - vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - false, 100, 500, false, false, null, true, 5, OrderPolicy.KEY )); - vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - false, 100, 500, false, false, null, true, 5, OrderPolicy.KEY )); - - //create one RR (RR_1) on remote site - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", null, isOffHeap() )); - - //start the senders on local site - startSenderInVMs("ln", vm4, vm5); - - //create one RR (RR_1) on local site - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", "ln", isOffHeap() )); - - IgnoredException.addIgnoredException(BatchException70.class.getName()); - IgnoredException.addIgnoredException(ServerOperationException.class.getName()); - - //start puts in RR_1 in another thread - AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_RR_1", 10000 )); - //destroy RR_1 in remote site - vm2.invoke(() -> WANTestBase.destroyRegion( getTestMethodName() + "_RR_1")); - - try { - inv1.join(); - } catch (InterruptedException e) { - e.printStackTrace(); - fail(); - } - - //verify that all is well in local site. All the events should be present in local region - vm4.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR_1", 10000 )); - //assuming some events might have been dispatched before the remote region was destroyed, - //sender's region queue will have events less than 1000 but the queue will not be empty. - //NOTE: this much verification might be sufficient in DUnit. Hydra will take care of - //more in depth validations. - vm4.invoke(() -> WANTestBase.verifyRegionQueueNotEmptyForConcurrentSender("ln" )); - } - - /** - * Two regions configured in local with the same sender and put is in progress - * on both the regions. Same two regions are configured on remote site as well. - * One of the two regions is destroyed in the middle on remote site. - * - * @throws Exception - */ - @Test - public void testReplicatedSerialPropagationWithRemoteRegionDestroy2() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - //these are part of remote site - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - //these are part of local site - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - //senders are created on local site - vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - false, 100, 200, false, false, null, true, 5, OrderPolicy.THREAD )); - vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - false, 100, 200, false, false, null, true, 5, OrderPolicy.THREAD )); - - //create one RR (RR_1) on remote site - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", null, isOffHeap() )); - - //create another RR (RR_2) on remote site - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_2", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_2", null, isOffHeap() )); - - //start the senders on local site - startSenderInVMs("ln", vm4, vm5); - - //create one RR (RR_1) on local site - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", "ln", isOffHeap() )); - - //create another RR (RR_2) on local site - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_2", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_2", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_2", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_2", "ln", isOffHeap() )); - //destroy RR_2 on remote site in the middle - vm2.invoke(() -> WANTestBase.destroyRegion( getTestMethodName() + "_RR_2")); - - //expected exceptions in the logs - IgnoredException.addIgnoredException(BatchException70.class.getName()); - IgnoredException.addIgnoredException(ServerOperationException.class.getName()); - - //start puts in RR_2 in another thread - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR_2", 1000 )); - - //start puts in RR_1 in another thread - AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_RR_1", 1000 )); - - try { - inv1.join(); - } catch (InterruptedException e) { - e.printStackTrace(); - fail(); - } - //though region RR_2 is destroyed, RR_1 should still get all the events put in it - //in local site - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR_1", 1000 )); - - } - - @Test - public void testReplicatedSerialPropagationWithRemoteRegionDestroy3() - throws Exception { - final String senderId = "ln"; - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - // these are part of remote site - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - // these are part of local site - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - // senders are created on local site - vm4.invoke(() -> WANTestBase.createConcurrentSender( senderId, 2, - false, 100, 200, false, false, null, true, 5, OrderPolicy.THREAD )); - vm5.invoke(() -> WANTestBase.createConcurrentSender( senderId, 2, - false, 100, 200, false, false, null, true, 5, OrderPolicy.THREAD )); - - // create one RR (RR_1) on remote site - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", null, isOffHeap() )); - - // create another RR (RR_2) on remote site - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_2", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_2", null, isOffHeap() )); - - - // start the senders on local site - startSenderInVMs("ln", vm4, vm5); - - // create one RR (RR_1) on local site - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", senderId, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", senderId, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", senderId, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_1", senderId, isOffHeap() )); - - // create another RR (RR_2) on local site - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_2", senderId, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_2", senderId, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_2", senderId, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR_2", senderId, isOffHeap() )); - - IgnoredException.addIgnoredException(BatchException70.class.getName()); - IgnoredException.addIgnoredException(ServerOperationException.class.getName()); - - // start puts in RR_1 in another thread - AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_RR_1", 1000 )); - // start puts in RR_2 in another thread - AsyncInvocation inv2 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_RR_2", 1000 )); - // destroy RR_2 on remote site in the middle - vm2.invoke(() -> WANTestBase.destroyRegion( getTestMethodName() - + "_RR_2" )); - - try { - inv1.join(); - inv2.join(); - } catch (InterruptedException e) { - e.printStackTrace(); - fail(); - } - // though region RR_2 is destroyed, RR_1 should still get all the events put - // in it - // in local site - try { - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR_1", 1000 )); - } finally { - System.setProperty( - DistributionConfig.GEMFIRE_PREFIX + "GatewaySender.REMOVE_FROM_QUEUE_ON_EXCEPTION", "False"); - vm4.invoke(new CacheSerializableRunnable("UnSetting system property ") { - public void run2() throws CacheException { - System.setProperty( - DistributionConfig.GEMFIRE_PREFIX + "GatewaySender.REMOVE_FROM_QUEUE_ON_EXCEPTION", "False"); - } - }); - - vm5.invoke(new CacheSerializableRunnable("UnSetting system property ") { - public void run2() throws CacheException { - System.setProperty( - DistributionConfig.GEMFIRE_PREFIX + "GatewaySender.REMOVE_FROM_QUEUE_ON_EXCEPTION", "False"); - } - }); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentWANPropagation_2_DUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentWANPropagation_2_DUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentWANPropagation_2_DUnitTest.java deleted file mode 100644 index cfe4169..0000000 --- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentWANPropagation_2_DUnitTest.java +++ /dev/null @@ -1,448 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.wan.concurrent; - -import org.junit.Ignore; -import org.junit.experimental.categories.Category; -import org.junit.Test; - -import static org.junit.Assert.*; - -import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase; -import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase; -import com.gemstone.gemfire.test.junit.categories.DistributedTest; - -import org.junit.experimental.categories.Category; - -import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy; -import com.gemstone.gemfire.internal.cache.wan.WANTestBase; -import com.gemstone.gemfire.test.dunit.AsyncInvocation; -import com.gemstone.gemfire.test.dunit.IgnoredException; -import com.gemstone.gemfire.test.dunit.Wait; -import com.gemstone.gemfire.test.junit.categories.FlakyTest; - -/** - * All the test cases are similar to SerialWANPropagationDUnitTest except that - * the we create concurrent serial GatewaySender with concurrency of 4 - */ -@Category(DistributedTest.class) -public class ConcurrentWANPropagation_2_DUnitTest extends WANTestBase { - - public ConcurrentWANPropagation_2_DUnitTest() { - super(); - } - - private static final long serialVersionUID = 1L; - - @Test - public void testSerialReplicatedWanWithOverflow() { - - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - //keep the maxQueueMemory low enough to trigger eviction - vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - false, 10, 5, false, false, null, true, 5, OrderPolicy.KEY )); - vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - false, 10, 5, false, false, null, true, 5, OrderPolicy.KEY )); - - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5); - vm2.invoke(() -> addListenerToSleepAfterCreateEvent(1000, getTestMethodName() + "_RR")); - vm3.invoke(() -> addListenerToSleepAfterCreateEvent(1000, getTestMethodName() + "_RR")); - - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - - vm4.invoke(() -> WANTestBase.doHeavyPuts( - getTestMethodName() + "_RR", 15 )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 15, 240000)); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 15, 240000 )); - } - - @Ignore("Bug46921") - @Test - public void testSerialReplicatedWanWithPersistence() { - - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - false, 100, 10, false, true, null, true, 5, OrderPolicy.THREAD )); - vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - false, 100, 10, false, true, null, true, 5, OrderPolicy.THREAD )); - - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5); - - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", - 1000 )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - - } - - @Test - public void testReplicatedSerialPropagationToTwoWanSites() throws Exception { - - Integer lnPort = createFirstLocatorWithDSId(1); - Integer nyPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - Integer tkPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 3, lnPort )); - - createCacheInVMs(nyPort, vm2); - createCacheInVMs(tkPort, vm3); - vm2.invoke(() -> WANTestBase.createReceiver()); - vm3.invoke(() -> WANTestBase.createReceiver()); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createConcurrentSender( "lnSerial1", - 2, false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD )); - vm5.invoke(() -> WANTestBase.createConcurrentSender( "lnSerial1", - 2, false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD )); - - vm4.invoke(() -> WANTestBase.createConcurrentSender( "lnSerial2", - 3, false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD )); - vm5.invoke(() -> WANTestBase.createConcurrentSender( "lnSerial2", - 3, false, 100, 10, false, false, null, true, 5 , OrderPolicy.THREAD)); - - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - - startSenderInVMs("lnSerial1", vm4, vm5); - - startSenderInVMs("lnSerial2", vm4, vm5); - - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "lnSerial1,lnSerial2", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "lnSerial1,lnSerial2", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "lnSerial1,lnSerial2", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "lnSerial1,lnSerial2", isOffHeap() )); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", - 1000 )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - } - - @Test - public void testReplicatedSerialPropagationHA() throws Exception { - IgnoredException.addIgnoredException("Broken pipe"); - IgnoredException.addIgnoredException("Connection reset"); - IgnoredException.addIgnoredException("Unexpected IOException"); - - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD )); - vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD )); - - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5); - - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - - AsyncInvocation inv1 = vm5.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", 10000 )); - Wait.pause(2000); - AsyncInvocation inv2 = vm4.invokeAsync(() -> WANTestBase.killSender()); - - inv1.join(); - inv2.join(); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 10000 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 10000 )); - } - - @Test - public void testReplicatedSerialPropagationWithConflation() throws Exception { - - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - false, 100, 1000, true, false, null, true, 5, OrderPolicy.THREAD )); - vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - false, 100, 1000, true, false, null, true, 5, OrderPolicy.THREAD )); - - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5); - - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", - 1000 )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - } - - @Test - public void testReplicatedSerialPropagationWithParallelThreads() - throws Exception { - - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2,lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - false, 100, 10, false, false, null, true, 4, OrderPolicy.THREAD )); - vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - false, 100, 10, false, false, null, true, 4, OrderPolicy.THREAD )); - - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5); - - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", "ln", isOffHeap() )); - - vm4.invoke(() -> WANTestBase.doMultiThreadedPuts( - getTestMethodName() + "_RR", 1000 )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000 )); - } - - @Test - public void testSerialPropagationWithFilter() throws Exception { - - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2,lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - false, 100, 10, false, false, - new MyGatewayEventFilter(), true, 5, OrderPolicy.THREAD )); - vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - false, 100, 10, false, false, - new MyGatewayEventFilter(), true, 5, OrderPolicy.THREAD )); - - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName(), "ln", 1, 100, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName(), "ln", 1, 100, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName(), "ln", 1, 100, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName(), "ln", 1, 100, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5); - - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName(), null, 1, 100, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName(), null, 1, 100, isOffHeap() )); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 1000 )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName(), 800 )); - } - - @Test - public void testReplicatedSerialPropagationWithFilter() throws Exception { - - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName(), null, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName(), null, isOffHeap() )); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - false, 100, 10, false, false, - new MyGatewayEventFilter(), true, 5, OrderPolicy.THREAD )); - vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - false, 100, 10, false, false, - new MyGatewayEventFilter(), true, 5, OrderPolicy.THREAD )); - - startSenderInVMs("ln", vm4, vm5); - - vm4.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName(), "ln", isOffHeap() )); - vm5.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName(), "ln", isOffHeap() )); - vm6.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName(), "ln", isOffHeap() )); - vm7.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName(), "ln", isOffHeap() )); - - vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 1000 )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName(), 800 )); - vm3.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName(), 800 )); - } - - @Test - public void testNormalRegionSerialPropagation() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - vm2.invoke(() -> WANTestBase.createCache(nyPort)); - vm2.invoke(() -> WANTestBase.createReplicatedRegion( - getTestMethodName() + "_RR", null, isOffHeap() )); - vm2.invoke(() -> WANTestBase.createReceiver()); - - WANTestBase.createCacheInVMs(lnPort, vm4, vm5); - - vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD )); - vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, - false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD )); - - - startSenderInVMs("ln", vm4, vm5); - - vm4.invoke(() -> WANTestBase.createNormalRegion( - getTestMethodName() + "_RR", "ln" )); - vm5.invoke(() -> WANTestBase.createNormalRegion( - getTestMethodName() + "_RR", "ln" )); - - vm5.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", - 1000 )); - - vm4.invoke(() -> WANTestBase.checkQueueStats( "ln", 0, - 0, 0, 0)); - - vm5.invoke(() -> WANTestBase.checkQueueStats( "ln", 0, - 1000, 0, 0 )); - - vm5.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 1000)); - - vm4.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 0)); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_RR", 0 )); - - vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(0, 0, 0)); - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/disttx/DistTXWANDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/disttx/DistTXWANDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/disttx/DistTXWANDUnitTest.java deleted file mode 100644 index 0ac13d4..0000000 --- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/disttx/DistTXWANDUnitTest.java +++ /dev/null @@ -1,182 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.wan.disttx; - -import static com.gemstone.gemfire.distributed.ConfigurationProperties.*; - -import org.junit.Ignore; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import com.gemstone.gemfire.distributed.internal.DistributionConfig; -import com.gemstone.gemfire.internal.cache.wan.WANTestBase; -import com.gemstone.gemfire.test.dunit.Invoke; -import com.gemstone.gemfire.test.dunit.LogWriterUtils; -import com.gemstone.gemfire.test.dunit.SerializableCallable; -import com.gemstone.gemfire.test.junit.categories.DistributedTest; - -@Category(DistributedTest.class) -public class DistTXWANDUnitTest extends WANTestBase { - - @Override - protected final void postSetUpWANTestBase() throws Exception { - Invoke.invokeInEveryVM(new SerializableCallable() { - @Override - public Object call() throws Exception { - System.setProperty(DistributionConfig.GEMFIRE_PREFIX + LOG_LEVEL, LogWriterUtils.getDUnitLogLevel()); - return null; - } - }); - } - - /** - * Disabled because it hangs with current implementation of notifying - * adjunct receivers by sending DistTXAdjunctCommitMessage from primary at the - * time of commit. - */ - @Ignore("TODO: test is disabled") - @Test - public void testPartitionedSerialPropagation_SenderSameAsCoordinator() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 10, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 10, false, false, null, true )); - - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5); - - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - - vm4.invoke(() -> WANTestBase.doDistTXPuts( getTestMethodName() + "_PR", - 50 )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR", 50 )); - } - - @Test - public void testPartitionedSerialPropagation_SenderNotSameAsCoordinator() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 10, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - false, 100, 10, false, false, null, true )); - - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5); - - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - - vm6.invoke(() -> WANTestBase.doDistTXPuts( getTestMethodName() + "_PR", - 50 )); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR", 50 )); - } - - @Test - public void testPartitionedRegionParallelPropagation() throws Exception { - Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); - Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); - - createCacheInVMs(nyPort, vm2, vm3); - createReceiverInVMs(vm2, vm3); - - createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); - - vm4.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - vm5.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - vm6.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - vm7.invoke(() -> WANTestBase.createSender( "ln", 2, - true, 100, 10, false, false, null, true )); - - vm4.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm5.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm6.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - vm7.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); - - startSenderInVMs("ln", vm4, vm5, vm6, vm7); - - vm2.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - vm3.invoke(() -> WANTestBase.createPartitionedRegion( - getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); - - //before doing any puts, let the senders be running in order to ensure that - //not a single event will be lost - vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); - - vm4.invoke(() -> WANTestBase.doDistTXPuts( getTestMethodName() + "_PR", - 5 )); - - //verify all buckets drained on all sender nodes. - vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); - - vm2.invoke(() -> WANTestBase.validateRegionSize( - getTestMethodName() + "_PR", 5 )); - } -}
