http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_2_DUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_2_DUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_2_DUnitTest.java new file mode 100644 index 0000000..41f01f3 --- /dev/null +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_2_DUnitTest.java @@ -0,0 +1,625 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.wan.concurrent; + +import org.junit.experimental.categories.Category; +import org.junit.Test; + +import static org.junit.Assert.*; + +import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase; +import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase; +import com.gemstone.gemfire.test.junit.categories.DistributedTest; + +import java.util.concurrent.TimeUnit; + +import com.gemstone.gemfire.cache.Region; +import com.gemstone.gemfire.cache.RegionDestroyedException; +import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy; +import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender; +import com.gemstone.gemfire.internal.cache.wan.WANTestBase; +import com.gemstone.gemfire.test.dunit.Assert; +import com.gemstone.gemfire.test.dunit.AsyncInvocation; +import com.gemstone.gemfire.test.dunit.IgnoredException; +import com.gemstone.gemfire.test.dunit.LogWriterUtils; +import com.gemstone.gemfire.test.dunit.VM; +import com.gemstone.gemfire.test.dunit.Wait; +import com.gemstone.gemfire.test.dunit.WaitCriterion; +import com.jayway.awaitility.Awaitility; + +/** + * + */ +@Category(DistributedTest.class) +public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTestBase { + + private static final long serialVersionUID = 1L; + + public ConcurrentParallelGatewaySenderOperation_2_DUnitTest() { + super(); + } + + @Override + protected final void postSetUpWANTestBase() throws Exception { + IgnoredException.addIgnoredException("RegionDestroyedException"); + IgnoredException.addIgnoredException("Broken pipe"); + IgnoredException.addIgnoredException("Connection reset"); + IgnoredException.addIgnoredException("Unexpected IOException"); + } + + // to test that when userPR is locally destroyed, shadow Pr is also locally + // destroyed and on recreation userPr , shadow Pr is also recreated. + @Test + public void testParallelGatewaySender_SingleNode_UserPR_localDestroy_RecreateRegion() throws Exception { + Integer[] locatorPorts = createLNAndNYLocators(); + Integer lnPort = locatorPorts[0]; + Integer nyPort = locatorPorts[1]; + + try { + String regionName = getTestMethodName() + "_PR"; + + createCacheInVMs(lnPort, vm4); + vm4.invoke(() -> AbstractGatewaySender.MAXIMUM_SHUTDOWN_WAIT_TIME = -1); + vm4.invoke(() -> createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 10, isOffHeap())); + vm4.invoke(() -> createConcurrentSender("ln", 2, true, 100, 10, false, false, null, false, 5, OrderPolicy.KEY)); + vm4.invoke(() -> startSender("ln")); + vm4.invoke(() -> pauseSender("ln")); + + createCacheInVMs(nyPort, vm2); + vm2.invoke(() -> createPartitionedRegion(regionName, null, 1, 10, isOffHeap())); + vm2.invoke(() -> createReceiver()); + + vm4.invoke(() -> doPuts(regionName, 10)); + vm4.invoke(() -> validateRegionSize(regionName, 10)); + + vm2.invoke(() -> validateRegionSize(regionName, 0)); + + vm4.invoke(() -> localDestroyRegion(getTestMethodName() + "_PR")); + + vm2.invoke(() -> validateRegionSize(regionName, 0)); + + vm4.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 10, isOffHeap())); + vm4.invoke(() -> doPutsFrom(regionName, 10, 20)); + + vm2.invoke(() -> Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> validateRegionSize(regionName, 0))); + + vm4.invoke(() -> validateRegionSize(regionName, 10)); + } finally { + vm4.invoke(() -> AbstractGatewaySender.MAXIMUM_SHUTDOWN_WAIT_TIME = 0); + } + } + + @Test + public void testParallelGatewaySender_SingleNode_UserPR_Destroy_RecreateRegion() throws Exception { + Integer[] locatorPorts = createLNAndNYLocators(); + Integer lnPort = locatorPorts[0]; + Integer nyPort = locatorPorts[1]; + + try { + String regionName = getTestMethodName() + "_PR"; + + createCacheInVMs(lnPort, vm4); + vm4.invoke(() -> AbstractGatewaySender.MAXIMUM_SHUTDOWN_WAIT_TIME = -1); + vm4.invoke(() -> createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 10, isOffHeap())); + vm4.invoke(() -> createConcurrentSender("ln", 2, true, 100, 10, false, false, null, false, 4, OrderPolicy.KEY)); + vm4.invoke(() -> startSender("ln")); + vm4.invoke(() -> pauseSender("ln")); + + createCacheInVMs(nyPort, vm2); + vm2.invoke(() -> createPartitionedRegion(regionName, null, 1, 10, isOffHeap())); + vm2.invoke(() -> createReceiver()); + + vm4.invoke(() -> doPuts(regionName, 10)); + vm4.invoke(() -> validateRegionSize(regionName, 10)); + + vm2.invoke(() -> validateRegionSize(regionName, 0)); + + vm4.invoke(() -> resumeSender("ln")); + vm4.invoke(() -> validateParallelSenderQueueAllBucketsDrained("ln")); + vm4.invoke(() -> localDestroyRegion(getTestMethodName() + "_PR")); + + vm2.invoke(() -> validateRegionSize(regionName, 10)); + + vm4.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 10, isOffHeap())); + vm4.invoke(() -> doPutsFrom(regionName, 10, 20)); + + vm2.invoke(() -> Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> validateRegionSize(regionName, 20))); + + vm4.invoke(() -> validateRegionSize(regionName, 10)); + + } finally { + vm4.invoke(() -> AbstractGatewaySender.MAXIMUM_SHUTDOWN_WAIT_TIME = 0); + } + } + + @Test + public void testParallelGatewaySender_SingleNode_UserPR_Close_RecreateRegion() throws Exception { + Integer[] locatorPorts = createLNAndNYLocators(); + Integer lnPort = locatorPorts[0]; + Integer nyPort = locatorPorts[1]; + try { + String regionName = getTestMethodName() + "_PR"; + createCacheInVMs(lnPort, vm4); + vm4.invoke(() -> AbstractGatewaySender.MAXIMUM_SHUTDOWN_WAIT_TIME = -1); + vm4.invoke(() -> createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 10, isOffHeap())); + vm4.invoke(() -> createConcurrentSender("ln", 2, true, 100, 10, false, false, null, false, 7, OrderPolicy.KEY)); + vm4.invoke(() -> startSender("ln")); + vm4.invoke(() -> pauseSender("ln")); + + createCacheInVMs(nyPort, vm2); + vm2.invoke(() -> createPartitionedRegion(regionName, null, 1, 10, isOffHeap())); + vm2.invoke(() -> createReceiver()); + + vm4.invoke(() -> doPuts(regionName, 10)); + vm4.invoke(() -> validateRegionSize(regionName, 10)); + vm4.invoke(() -> closeRegion(getTestMethodName() + "_PR")); + vm4.invoke(() -> resumeSender("ln")); + + Thread.sleep(500); + + vm2.invoke(() -> validateRegionSize(regionName, 0)); + + vm4.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 10, isOffHeap())); + vm4.invoke(() -> doPutsFrom(regionName, 10, 20)); + + vm2.invoke(() -> Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> validateRegionSize(regionName, 10))); + + vm4.invoke(() -> validateRegionSize(regionName, 10)); + } + finally { + vm4.invoke(() -> AbstractGatewaySender.MAXIMUM_SHUTDOWN_WAIT_TIME = 0); + } + } + + @Test + public void testParallelGatewaySender_SingleNode_UserPR_Destroy_SimultaneousPut_RecreateRegion() throws Exception { + Integer[] locatorPorts = createLNAndNYLocators(); + Integer lnPort = locatorPorts[0]; + Integer nyPort = locatorPorts[1]; + + try { + createAndStartSender(vm4, lnPort, 6, false, true); + + vm4.invoke(() -> addCacheListenerAndDestroyRegion(getTestMethodName() + "_PR")); + + createReceiverAndDoPutsInPausedSender(nyPort); + + vm4.invoke(() -> resumeSender("ln")); + + AsyncInvocation putAsync = vm4.invokeAsync(() -> WANTestBase.doPutsFrom( getTestMethodName() + "_PR", 10, 101 )); + try { + putAsync.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + fail("Interrupted the async invocation."); + } + + if (putAsync.getException() != null + && !(putAsync.getException() instanceof RegionDestroyedException)) { + Assert.fail("Expected RegionDestroyedException but got", + putAsync.getException()); + } + + // before destroy, there is wait for queue to drain, so data will be + // dispatched + vm2.invoke(() -> validateRegionSizeWithinRange(getTestMethodName() + "_PR", 10, 101)); // possible size is more than 10 + + vm4.invoke(() -> createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 10, isOffHeap())); + + vm4.invoke(() -> doPutsFrom(getTestMethodName() + "_PR", 10, 20)); + + vm4.invoke(() -> validateRegionSize(getTestMethodName() + "_PR", 10)); + + vm2.invoke(() -> validateRegionSizeWithinRange(getTestMethodName() + "_PR", 20, 101)); // possible size is more than 20 + } finally { + vm4.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME()); + } + } + + @Test + public void testParallelGatewaySender_SingleNode_UserPR_Destroy_NodeDown() + throws Exception { + IgnoredException.addIgnoredException("Broken pipe"); + IgnoredException.addIgnoredException("Connection reset"); + IgnoredException.addIgnoredException("Unexpected IOException"); + Integer[] locatorPorts = createLNAndNYLocators(); + Integer lnPort = locatorPorts[0]; + Integer nyPort = locatorPorts[1]; + + try { + createAndStartSender(vm4, lnPort, 5, false, true); + createAndStartSender(vm5, lnPort, 5, false, true); + createAndStartSender(vm6, lnPort, 5, false, true); + + createReceiverAndDoPutsInPausedSender(nyPort); + + vm4.invoke(() -> WANTestBase.resumeSender( "ln" )); + vm5.invoke(() -> WANTestBase.resumeSender( "ln" )); + vm6.invoke(() -> WANTestBase.resumeSender( "ln" )); + + Wait.pause(200); + AsyncInvocation localDestroyAsync = vm4.invokeAsync(() -> WANTestBase.destroyRegion( getTestMethodName() + "_PR" )); + + AsyncInvocation closeAsync = vm4.invokeAsync(() -> WANTestBase.closeCache()); + try { + localDestroyAsync.join(); + closeAsync.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + fail("Interrupted the async invocation."); + } + + vm2.invoke(() -> validateRegionSize(getTestMethodName() + "_PR", 10)); + } finally { + vm4.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME()); + vm5.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME()); + vm6.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME()); + } + + } + + @Test + public void testParallelGatewaySender_SingleNode_UserPR_Close_SimultaneousPut_RecreateRegion() throws Exception { + Integer[] locatorPorts = createLNAndNYLocators(); + Integer lnPort = locatorPorts[0]; + Integer nyPort = locatorPorts[1]; + + try { + String regionName = getTestMethodName() + "_PR"; + + createCacheInVMs(lnPort, vm4); + vm4.invoke(() -> AbstractGatewaySender.MAXIMUM_SHUTDOWN_WAIT_TIME = -1); + vm4.invoke(() -> createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 10, isOffHeap())); + vm4.invoke(() -> createConcurrentSender("ln", 2, true, 100, 10, false, false, null, false, 5, OrderPolicy.KEY)); + vm4.invoke(() -> startSender("ln")); + vm4.invoke(() -> pauseSender("ln")); + + createCacheInVMs(nyPort, vm2); + vm2.invoke(() -> createPartitionedRegion(regionName, null, 1, 10, isOffHeap())); + vm2.invoke(() -> createReceiver()); + + vm4.invoke(() -> doPuts(regionName, 10)); + vm4.invoke(() -> validateRegionSize(regionName, 10)); + + vm2.invoke(() -> validateRegionSize(regionName, 0)); + + AsyncInvocation putAsync = vm4.invokeAsync(() -> WANTestBase.doPutsFrom( getTestMethodName() + "_PR", 10, 2000 )); + AsyncInvocation localDestroyAsync = vm4.invokeAsync(() -> ConcurrentParallelGatewaySenderOperation_2_DUnitTest. + closeRegion( getTestMethodName() + "_PR" )); + try { + putAsync.join(); + localDestroyAsync.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + fail("Interrupted the async invocation."); + } + vm2.invoke(() -> validateRegionSize(regionName, 0)); + + vm4.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 10, isOffHeap())); + vm4.invoke(() -> doPutsFrom(regionName, 10, 20)); + + vm2.invoke(() -> Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> validateRegionSize(regionName, 0))); + + vm4.invoke(() -> validateRegionSize(regionName, 10)); + } finally { + vm4.invoke(() -> AbstractGatewaySender.MAXIMUM_SHUTDOWN_WAIT_TIME = 0); + } + } + + @Test + public void testParallelGatewaySenders_SingleNode_UserPR_localDestroy_RecreateRegion() throws Exception { + Integer[] locatorPorts = createLNAndNYLocators(); + Integer lnPort = locatorPorts[0]; + Integer nyPort = locatorPorts[1]; + Integer tkPort = (Integer)vm2.invoke(() -> createFirstRemoteLocator(3, lnPort)); + Integer pnPort = (Integer)vm3.invoke(() -> createFirstRemoteLocator(4, lnPort)); + + createCacheInVMs(nyPort, vm4); + vm4.invoke(() -> createReceiver()); + createCacheInVMs(tkPort, vm5); + vm5.invoke(() -> createReceiver()); + createCacheInVMs(pnPort, vm6); + vm6.invoke(() -> createReceiver()); + + try { + vm7.invoke(() -> createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME(lnPort)); + + LogWriterUtils.getLogWriter().info("Created cache on local site"); + + vm7.invoke(() -> createConcurrentSender("ln1", 2, true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY)); + vm7.invoke(() -> createConcurrentSender("ln2", 3, true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY)); + vm7.invoke(() -> createConcurrentSender("ln3", 4, true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY)); + + vm7.invoke(() -> startSender("ln1")); + vm7.invoke(() -> startSender("ln2")); + vm7.invoke(() -> startSender("ln3")); + + String regionName = getTestMethodName() + "_PR"; + vm7.invoke(() -> createPartitionedRegion(regionName, "ln1,ln2,ln3", 1, 10, isOffHeap())); + + LogWriterUtils.getLogWriter().info("Created PRs on local site"); + + vm4.invoke(() -> createPartitionedRegion(regionName, null, 1, 10, isOffHeap())); + vm5.invoke(() -> createPartitionedRegion(regionName, null, 1, 10, isOffHeap())); + vm6.invoke(() -> createPartitionedRegion(regionName, null, 1, 10, isOffHeap())); + + vm7.invoke(() -> doPuts(regionName, 10)); + + vm7.invoke(() -> validateParallelSenderQueueAllBucketsDrained("ln1")); + vm7.invoke(() -> validateParallelSenderQueueAllBucketsDrained("ln2")); + vm7.invoke(() -> validateParallelSenderQueueAllBucketsDrained("ln3")); + + vm7.invoke(() -> localDestroyRegion(regionName)); + + vm7.invoke(() -> createPartitionedRegion(regionName, "ln1,ln2,ln3", 1, 10, isOffHeap())); + + vm7.invoke(() -> doPutsFrom(regionName, 10, 20)); + + vm7.invoke(() -> validateRegionSize(regionName, 10)); + + validateRegionSizes(regionName, 20, vm4, vm5, vm6); + } finally { + vm7.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME()); + } + } + + @Test + public void testParallelGatewaySender_MultipleNode_UserPR_localDestroy_Recreate() throws Exception { + Integer[] locatorPorts = createLNAndNYLocators(); + Integer lnPort = locatorPorts[0]; + Integer nyPort = locatorPorts[1]; + + createCacheInVMs(nyPort, vm2); + vm2.invoke(() -> createReceiver()); + + try { + createAndStartSender(vm4, lnPort, 5, true, false); + createAndStartSender(vm5, lnPort, 5, true, false); + + String regionName = getTestMethodName() + "_PR"; + vm2.invoke(() -> createPartitionedRegion(regionName, null, 1, 10, isOffHeap())); + + AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.doPuts( regionName, 10 )); + Wait.pause(1000); + vm5.invoke(() -> localDestroyRegion(regionName)); + + try { + inv1.join(); + } catch (InterruptedException ex) { + ex.printStackTrace(); + fail("Interrupted the async invocation."); + } + + + validateRegionSizes(regionName, 10, vm4, vm2); + + vm5.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 10, isOffHeap())); + + vm4.invoke(() -> doPutsFrom(regionName, 10, 20)); + + validateRegionSizes(regionName, 20, vm4, vm2); + } finally { + vm4.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME()); + vm5.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME()); + } + } + + @Test + public void testParallelGatewaySenders_MultipleNode_UserPR_localDestroy_Recreate() throws Exception { + Integer[] locatorPorts = createLNAndNYLocators(); + Integer lnPort = locatorPorts[0]; + Integer nyPort = locatorPorts[1]; + Integer tkPort = (Integer)vm2.invoke(() -> createFirstRemoteLocator(3, lnPort)); + + createCacheInVMs(nyPort, vm6); + vm6.invoke(() -> createReceiver()); + createCacheInVMs(tkPort, vm7); + vm7.invoke(() -> createReceiver()); + + try { + createAndStartTwoSenders(vm4, lnPort, 4); + createAndStartTwoSenders(vm5, lnPort, 4); + + String regionName = getTestMethodName() + "_PR"; + vm6.invoke(() -> WANTestBase.createPartitionedRegion( + regionName, null, 1, 100, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createPartitionedRegion( + regionName, null, 1, 100, isOffHeap() )); + + AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.doPuts( regionName, 10 )); + + Wait.pause(1000); + vm5.invoke(() -> WANTestBase.localDestroyRegion( regionName )); + + try { + inv1.join(); + } catch (InterruptedException ex) { + ex.printStackTrace(); + fail("Interrupted the async invocation."); + } + + validateRegionSizes(regionName, 10, vm4, vm6, vm7); + + vm5.invoke(() -> WANTestBase.createPartitionedRegion( + regionName, "ln1,ln2", 1, 100, isOffHeap() )); + + vm4.invoke(() -> WANTestBase.doPutsFrom( + regionName, 10, 20 )); + + validateRegionSizes(regionName, 20, vm4, vm6, vm7); + } finally { + vm4.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME()); + vm5.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME()); + } + } + + @Test + public void testParallelGatewaySender_ColocatedPartitionedRegions_localDestroy() throws Exception { + Integer[] locatorPorts = createLNAndNYLocators(); + Integer lnPort = locatorPorts[0]; + Integer nyPort = locatorPorts[1]; + + createCacheInVMs(nyPort, vm2); + vm2.invoke(() -> createReceiver()); + + try { + createAndStartSenderWithCustomerOrderShipmentRegion(vm4, lnPort, 5, true); + createAndStartSenderWithCustomerOrderShipmentRegion(vm5, lnPort, 5, true); + + LogWriterUtils.getLogWriter().info("Created PRs on local site"); + + vm2.invoke(() -> createCustomerOrderShipmentPartitionedRegion(null, 1, 100, isOffHeap())); + + AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.putcolocatedPartitionedRegion( 10 )); + Wait.pause(1000); + + try { + vm5.invoke(() -> localDestroyRegion(customerRegionName)); + } catch (Exception ex) { + assertTrue(ex.getCause() instanceof UnsupportedOperationException); + } + + try { + inv1.join(); + } catch (Exception e) { + Assert.fail("Unexpected exception", e); + } + + validateRegionSizes(customerRegionName, 10, vm4, vm5, vm2); + } finally { + vm4.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME()); + vm5.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME()); + } + } + + @Test + public void testParallelGatewaySender_ColocatedPartitionedRegions_destroy() throws Exception { + Integer[] locatorPorts = createLNAndNYLocators(); + Integer lnPort = locatorPorts[0]; + Integer nyPort = locatorPorts[1]; + + createCacheInVMs(nyPort, vm2); + vm2.invoke(() -> WANTestBase.createReceiver()); + + try { + createAndStartSenderWithCustomerOrderShipmentRegion(vm4, lnPort, 6, true); + createAndStartSenderWithCustomerOrderShipmentRegion(vm5, lnPort, 6, true); + + LogWriterUtils.getLogWriter().info("Created PRs on local site"); + + vm2.invoke(() -> WANTestBase.createCustomerOrderShipmentPartitionedRegion(null, 1, 100, isOffHeap() )); + + AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.putcolocatedPartitionedRegion( 2000 )); + Wait.pause(1000); + + try { + vm5.invoke(() -> WANTestBase.destroyRegion( customerRegionName )); + } catch (Exception ex) { + assertTrue(ex.getCause() instanceof IllegalStateException); + return; + } + fail("Expected UnsupportedOperationException"); + } finally { + vm4.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME()); + vm5.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME()); + } + } + + public static void clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME() { + AbstractGatewaySender.MAXIMUM_SHUTDOWN_WAIT_TIME = 0; + } + + public static void closeRegion(String regionName) { + Region r = cache.getRegion(Region.SEPARATOR + regionName); + assertNotNull(r); + r.close(); + } + + public static void validateRegionSizeWithinRange(String regionName, + final int min, final int max) { + final Region r = cache.getRegion(Region.SEPARATOR + regionName); + assertNotNull(r); + WaitCriterion wc = new WaitCriterion() { + public boolean done() { + if (r.keySet().size() > min && r.keySet().size() <= max) { + return true; + } + return false; + } + + public String description() { + return "Expected region entries to be within range : " + min + " " + + max + " but actual entries: " + r.keySet().size(); + } + }; + Wait.waitForCriterion(wc, 120000, 500, true); + } + + protected static void createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME( + Integer locPort) { + createCache(false, locPort); + AbstractGatewaySender.MAXIMUM_SHUTDOWN_WAIT_TIME = -1; + } + + protected void createAndStartSender(VM vm, int port, int concurrencyLevel, boolean manualStart, boolean pause) { + vm.invoke(() -> createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME(port)); + vm.invoke(() -> createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 10, isOffHeap())); + createSender(vm, concurrencyLevel, manualStart); + vm.invoke(() -> startSender("ln")); + if (pause) { + vm.invoke(() -> pauseSender("ln")); + } + LogWriterUtils.getLogWriter().info("Created PRs on local site"); + } + + protected void createReceiverAndDoPutsInPausedSender(int port) { + // Note: This is a test-specific method used by several tests to do puts from vm4 to vm2. + String regionName = getTestMethodName() + "_PR"; + createCacheInVMs(port, vm2); + vm2.invoke(() -> createReceiver()); + vm2.invoke(() -> createPartitionedRegion(regionName, null, 1, 10, isOffHeap())); + vm4.invoke(() -> doPuts(regionName, 10)); + vm4.invoke(() -> validateRegionSize(regionName, 10)); + // since sender is paused, no dispatching + vm2.invoke(() -> validateRegionSize(regionName, 0)); + } + + protected void createAndStartTwoSenders(VM vm, int port, int concurrencyLevel) { + // Note: This is a test-specific method used to create and start 2 senders. + vm.invoke(() -> createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME(port)); + vm.invoke(() -> createPartitionedRegion(getTestMethodName() + "_PR", "ln1,ln2", 1, 100, isOffHeap())); + createSenders(vm, concurrencyLevel); + vm.invoke(() -> startSender("ln1")); + vm.invoke(() -> startSender("ln2")); + } + + protected void createAndStartSenderWithCustomerOrderShipmentRegion(VM vm, int port, int concurrencyLevel, boolean manualStart) { + vm.invoke(() -> createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME(port)); + vm.invoke(() -> createCustomerOrderShipmentPartitionedRegion("ln", 1, 100, isOffHeap())); + createSender(vm, concurrencyLevel, manualStart); + vm.invoke(() -> startSender("ln")); + } + + protected void createSender(VM vm, int concurrencyLevel, boolean manualStart) { + vm.invoke(() -> createConcurrentSender("ln", 2, true, 100, 10, false, false, null, manualStart, concurrencyLevel, OrderPolicy.KEY)); + } + + protected void createSenders(VM vm, int concurrencyLevel) { + vm.invoke(() -> createConcurrentSender("ln1", 2, true, 100, 10, false, false, null, true, concurrencyLevel, OrderPolicy.KEY)); + vm.invoke(() -> createConcurrentSender("ln2", 3, true, 100, 10, false, false, null, true, concurrencyLevel, OrderPolicy.KEY)); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentSerialGatewaySenderOperationsDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentSerialGatewaySenderOperationsDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentSerialGatewaySenderOperationsDUnitTest.java new file mode 100644 index 0000000..f09e19c --- /dev/null +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentSerialGatewaySenderOperationsDUnitTest.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.wan.concurrent; + +import org.junit.experimental.categories.Category; +import org.junit.Test; + +import static org.junit.Assert.*; + +import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase; +import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase; +import com.gemstone.gemfire.test.junit.categories.DistributedTest; + +import java.util.Set; + +import com.gemstone.gemfire.cache.wan.GatewaySender; +import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy; +import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender; +import com.gemstone.gemfire.internal.cache.wan.WANTestBase; +import com.gemstone.gemfire.internal.cache.wan.serial.SerialGatewaySenderOperationsDUnitTest; +import com.gemstone.gemfire.test.dunit.VM; + +/** + * + */ +@Category(DistributedTest.class) +public class ConcurrentSerialGatewaySenderOperationsDUnitTest extends SerialGatewaySenderOperationsDUnitTest { + + private static final long serialVersionUID = 1L; + + public ConcurrentSerialGatewaySenderOperationsDUnitTest() { + super(); + } + + protected void createSenderVM5() { + vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + false, 100, 10, false, true, null, true, 5, OrderPolicy.KEY )); + } + + protected void createSenderVM4() { + vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + false, 100, 10, false, true, null, true, 5, OrderPolicy.KEY )); + } + + protected void validateQueueClosedVM4() { + vm4.invoke(() -> WANTestBase.validateQueueClosedForConcurrentSerialGatewaySender( "ln")); + } + + private void validateQueueContents(VM vm, String site, int size) { + vm.invoke(() -> WANTestBase.validateQueueContentsForConcurrentSerialGatewaySender( site, size)); + } + + public static void verifySenderPausedState(String senderId) { + Set<GatewaySender> senders = cache.getGatewaySenders(); + AbstractGatewaySender sender = null; + for (GatewaySender s : senders) { + if (s.getId().equals(senderId)) { + sender = (AbstractGatewaySender)s; + break; + } + } + assertTrue(sender.isPaused()); + } + + public static void verifySenderResumedState(String senderId) { + Set<GatewaySender> senders = cache.getGatewaySenders(); + AbstractGatewaySender sender = null; + for (GatewaySender s : senders) { + if (s.getId().equals(senderId)) { + sender = (AbstractGatewaySender)s; + break; + } + } + assertFalse(sender.isPaused()); + assertTrue(sender.isRunning()); + } + + public static void verifySenderStoppedState(String senderId) { + Set<GatewaySender> senders = cache.getGatewaySenders(); + AbstractGatewaySender sender = null; + for (GatewaySender s : senders) { + if (s.getId().equals(senderId)) { + sender = (AbstractGatewaySender)s; + break; + } + } + assertFalse(sender.isRunning()); + assertFalse(sender.isPaused()); + } + + public static void verifyGatewaySenderOperations(String senderId) { + Set<GatewaySender> senders = cache.getGatewaySenders(); + GatewaySender sender = null; + for (GatewaySender s : senders) { + if (s.getId().equals(senderId)) { + sender = s; + break; + } + } + assertFalse(sender.isPaused()); + assertFalse(((AbstractGatewaySender)sender).isRunning()); + sender.pause(); + sender.resume(); + sender.stop(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentSerialGatewaySenderOperationsOffHeapDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentSerialGatewaySenderOperationsOffHeapDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentSerialGatewaySenderOperationsOffHeapDUnitTest.java new file mode 100644 index 0000000..3a0caef --- /dev/null +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentSerialGatewaySenderOperationsOffHeapDUnitTest.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.wan.concurrent; + +import org.junit.experimental.categories.Category; +import org.junit.Test; + +import static org.junit.Assert.*; + +import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase; +import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase; +import com.gemstone.gemfire.test.junit.categories.DistributedTest; + +@SuppressWarnings("serial") +@Category(DistributedTest.class) +public class ConcurrentSerialGatewaySenderOperationsOffHeapDUnitTest extends + ConcurrentSerialGatewaySenderOperationsDUnitTest { + + public ConcurrentSerialGatewaySenderOperationsOffHeapDUnitTest() { + super(); + } + + @Override + public boolean isOffHeap() { + return true; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentWANPropagation_1_DUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentWANPropagation_1_DUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentWANPropagation_1_DUnitTest.java new file mode 100644 index 0000000..ead62e0 --- /dev/null +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentWANPropagation_1_DUnitTest.java @@ -0,0 +1,568 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.wan.concurrent; + +import org.junit.experimental.categories.Category; +import org.junit.Test; + +import static org.junit.Assert.*; + +import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase; +import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase; +import com.gemstone.gemfire.test.junit.categories.DistributedTest; + +import com.gemstone.gemfire.cache.CacheException; +import com.gemstone.gemfire.cache.EntryExistsException; +import com.gemstone.gemfire.cache.client.ServerOperationException; +import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy; +import com.gemstone.gemfire.cache30.CacheSerializableRunnable; +import com.gemstone.gemfire.distributed.internal.DistributionConfig; +import com.gemstone.gemfire.internal.cache.wan.BatchException70; +import com.gemstone.gemfire.internal.cache.wan.WANTestBase; +import com.gemstone.gemfire.test.dunit.AsyncInvocation; +import com.gemstone.gemfire.test.dunit.IgnoredException; +import com.gemstone.gemfire.test.dunit.LogWriterUtils; + +/** + * All the test cases are similar to SerialWANPropagationDUnitTest except that + * the we create concurrent serial GatewaySender with concurrency of 4 + * + */ +@Category(DistributedTest.class) +public class ConcurrentWANPropagation_1_DUnitTest extends WANTestBase { + + /** + * @param name + */ + public ConcurrentWANPropagation_1_DUnitTest() { + super(); + } + + private static final long serialVersionUID = 1L; + + /** + * All the test cases are similar to SerialWANPropagationDUnitTest + * @throws Exception + */ + @Test + public void testReplicatedSerialPropagation_withoutRemoteSite() throws Exception { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + //keep the batch size high enough to reduce the number of exceptions in the log + vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + false, 100, 400, false, false, null, true, 4, OrderPolicy.KEY )); + vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + false, 100, 400, false, false, null, true, 4, OrderPolicy.KEY )); + + startSenderInVMs("ln", vm4, vm5); + + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + + IgnoredException.addIgnoredException(BatchException70.class.getName()); + IgnoredException.addIgnoredException(ServerOperationException.class.getName()); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", + 1000 )); + + createCacheInVMs(nyPort, vm2, vm3); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + + vm2.invoke(() -> WANTestBase.createReceiver()); + vm3.invoke(() -> WANTestBase.createReceiver()); + + vm4.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + } + + @Test + public void testReplicatedSerialPropagation() throws Exception { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD )); + vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD )); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5); + + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", + 1000 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + } + + + @Test + public void testReplicatedSerialPropagationWithLocalSiteClosedAndRebuilt() throws Exception { + IgnoredException.addIgnoredException("Broken pipe"); + IgnoredException.addIgnoredException("Connection reset"); + IgnoredException.addIgnoredException("Unexpected IOException"); + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD )); + vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD )); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5); + + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", + 1000 )); + + //---------close local site and build again----------------------------------------- + vm4.invoke(() -> WANTestBase.killSender( )); + vm5.invoke(() -> WANTestBase.killSender( )); + vm6.invoke(() -> WANTestBase.killSender( )); + vm7.invoke(() -> WANTestBase.killSender( )); + + Integer regionSize = + (Integer) vm2.invoke(() -> WANTestBase.getRegionSize(getTestMethodName() + "_RR" )); + LogWriterUtils.getLogWriter().info("Region size on remote is: " + regionSize); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD )); + vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD )); + + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5); + + vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + + IgnoredException.addIgnoredException(EntryExistsException.class.getName()); + IgnoredException.addIgnoredException(BatchException70.class.getName()); + IgnoredException.addIgnoredException(ServerOperationException.class.getName()); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", + 1000 )); + //---------------------------------------------------------------------------------- + + //verify remote site receives all the events + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + } + + /** + * Two regions configured with the same sender and put is in progress + * on both the regions. + * One of the two regions is destroyed in the middle. + * + * @throws Exception + */ + @Test + public void testReplicatedSerialPropagationWithLocalRegionDestroy() throws Exception { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + //these are part of remote site + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + //these are part of local site + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + //senders are created on local site + vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + false, 100, 20, false, false, null, true, 3, OrderPolicy.THREAD )); + vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + false, 100, 20, false, false, null, true ,3, OrderPolicy.THREAD)); + + //create one RR (RR_1) on remote site + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_1", null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_1", null, isOffHeap() )); + + //create another RR (RR_2) on remote site + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_2", null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_2", null, isOffHeap() )); + + //start the senders on local site + startSenderInVMs("ln", vm4, vm5); + + //create one RR (RR_1) on local site + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_1", "ln", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_1", "ln", isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_1", "ln", isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_1", "ln", isOffHeap() )); + + //create another RR (RR_2) on local site + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_2", "ln", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_2", "ln", isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_2", "ln", isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_2", "ln", isOffHeap() )); + + //start puts in RR_1 in another thread + AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_RR_1", 1000 )); + //do puts in RR_2 in main thread + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR_2", 500 )); + //destroy RR_2 after above puts are complete + vm4.invoke(() -> WANTestBase.destroyRegion( getTestMethodName() + "_RR_2")); + + try { + inv1.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + fail(); + } + //sleep for some time to let all the events propagate to remote site + Thread.sleep(20); + //vm4.invoke(() -> WANTestBase.verifyQueueSize( "ln", 0 )); + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR_1", 1000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR_2", 500 )); + } + + /** + * 1 region and sender configured on local site and 1 region and a + * receiver configured on remote site. Puts to the local region are in progress. + * Remote region is destroyed in the middle. + * + * @throws Exception + */ + @Test + public void testReplicatedSerialPropagationWithRemoteRegionDestroy() throws Exception { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + //these are part of remote site + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + + //these are part of local site + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + //senders are created on local site + vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + false, 100, 500, false, false, null, true, 5, OrderPolicy.KEY )); + vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + false, 100, 500, false, false, null, true, 5, OrderPolicy.KEY )); + + //create one RR (RR_1) on remote site + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_1", null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_1", null, isOffHeap() )); + + //start the senders on local site + startSenderInVMs("ln", vm4, vm5); + + //create one RR (RR_1) on local site + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_1", "ln", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_1", "ln", isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_1", "ln", isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_1", "ln", isOffHeap() )); + + IgnoredException.addIgnoredException(BatchException70.class.getName()); + IgnoredException.addIgnoredException(ServerOperationException.class.getName()); + + //start puts in RR_1 in another thread + AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_RR_1", 10000 )); + //destroy RR_1 in remote site + vm2.invoke(() -> WANTestBase.destroyRegion( getTestMethodName() + "_RR_1")); + + try { + inv1.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + fail(); + } + + //verify that all is well in local site. All the events should be present in local region + vm4.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR_1", 10000 )); + //assuming some events might have been dispatched before the remote region was destroyed, + //sender's region queue will have events less than 1000 but the queue will not be empty. + //NOTE: this much verification might be sufficient in DUnit. Hydra will take care of + //more in depth validations. + vm4.invoke(() -> WANTestBase.verifyRegionQueueNotEmptyForConcurrentSender("ln" )); + } + + /** + * Two regions configured in local with the same sender and put is in progress + * on both the regions. Same two regions are configured on remote site as well. + * One of the two regions is destroyed in the middle on remote site. + * + * @throws Exception + */ + @Test + public void testReplicatedSerialPropagationWithRemoteRegionDestroy2() throws Exception { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + //these are part of remote site + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + //these are part of local site + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + //senders are created on local site + vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + false, 100, 200, false, false, null, true, 5, OrderPolicy.THREAD )); + vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + false, 100, 200, false, false, null, true, 5, OrderPolicy.THREAD )); + + //create one RR (RR_1) on remote site + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_1", null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_1", null, isOffHeap() )); + + //create another RR (RR_2) on remote site + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_2", null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_2", null, isOffHeap() )); + + //start the senders on local site + startSenderInVMs("ln", vm4, vm5); + + //create one RR (RR_1) on local site + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_1", "ln", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_1", "ln", isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_1", "ln", isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_1", "ln", isOffHeap() )); + + //create another RR (RR_2) on local site + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_2", "ln", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_2", "ln", isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_2", "ln", isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_2", "ln", isOffHeap() )); + //destroy RR_2 on remote site in the middle + vm2.invoke(() -> WANTestBase.destroyRegion( getTestMethodName() + "_RR_2")); + + //expected exceptions in the logs + IgnoredException.addIgnoredException(BatchException70.class.getName()); + IgnoredException.addIgnoredException(ServerOperationException.class.getName()); + + //start puts in RR_2 in another thread + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR_2", 1000 )); + + //start puts in RR_1 in another thread + AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_RR_1", 1000 )); + + try { + inv1.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + fail(); + } + //though region RR_2 is destroyed, RR_1 should still get all the events put in it + //in local site + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR_1", 1000 )); + + } + + @Test + public void testReplicatedSerialPropagationWithRemoteRegionDestroy3() + throws Exception { + final String senderId = "ln"; + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + // these are part of remote site + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + // these are part of local site + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + // senders are created on local site + vm4.invoke(() -> WANTestBase.createConcurrentSender( senderId, 2, + false, 100, 200, false, false, null, true, 5, OrderPolicy.THREAD )); + vm5.invoke(() -> WANTestBase.createConcurrentSender( senderId, 2, + false, 100, 200, false, false, null, true, 5, OrderPolicy.THREAD )); + + // create one RR (RR_1) on remote site + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_1", null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_1", null, isOffHeap() )); + + // create another RR (RR_2) on remote site + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_2", null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_2", null, isOffHeap() )); + + + // start the senders on local site + startSenderInVMs("ln", vm4, vm5); + + // create one RR (RR_1) on local site + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_1", senderId, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_1", senderId, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_1", senderId, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_1", senderId, isOffHeap() )); + + // create another RR (RR_2) on local site + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_2", senderId, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_2", senderId, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_2", senderId, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR_2", senderId, isOffHeap() )); + + IgnoredException.addIgnoredException(BatchException70.class.getName()); + IgnoredException.addIgnoredException(ServerOperationException.class.getName()); + + // start puts in RR_1 in another thread + AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_RR_1", 1000 )); + // start puts in RR_2 in another thread + AsyncInvocation inv2 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_RR_2", 1000 )); + // destroy RR_2 on remote site in the middle + vm2.invoke(() -> WANTestBase.destroyRegion( getTestMethodName() + + "_RR_2" )); + + try { + inv1.join(); + inv2.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + fail(); + } + // though region RR_2 is destroyed, RR_1 should still get all the events put + // in it + // in local site + try { + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR_1", 1000 )); + } finally { + System.setProperty( + DistributionConfig.GEMFIRE_PREFIX + "GatewaySender.REMOVE_FROM_QUEUE_ON_EXCEPTION", "False"); + vm4.invoke(new CacheSerializableRunnable("UnSetting system property ") { + public void run2() throws CacheException { + System.setProperty( + DistributionConfig.GEMFIRE_PREFIX + "GatewaySender.REMOVE_FROM_QUEUE_ON_EXCEPTION", "False"); + } + }); + + vm5.invoke(new CacheSerializableRunnable("UnSetting system property ") { + public void run2() throws CacheException { + System.setProperty( + DistributionConfig.GEMFIRE_PREFIX + "GatewaySender.REMOVE_FROM_QUEUE_ON_EXCEPTION", "False"); + } + }); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentWANPropagation_2_DUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentWANPropagation_2_DUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentWANPropagation_2_DUnitTest.java new file mode 100644 index 0000000..cfe4169 --- /dev/null +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentWANPropagation_2_DUnitTest.java @@ -0,0 +1,448 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.wan.concurrent; + +import org.junit.Ignore; +import org.junit.experimental.categories.Category; +import org.junit.Test; + +import static org.junit.Assert.*; + +import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase; +import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase; +import com.gemstone.gemfire.test.junit.categories.DistributedTest; + +import org.junit.experimental.categories.Category; + +import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy; +import com.gemstone.gemfire.internal.cache.wan.WANTestBase; +import com.gemstone.gemfire.test.dunit.AsyncInvocation; +import com.gemstone.gemfire.test.dunit.IgnoredException; +import com.gemstone.gemfire.test.dunit.Wait; +import com.gemstone.gemfire.test.junit.categories.FlakyTest; + +/** + * All the test cases are similar to SerialWANPropagationDUnitTest except that + * the we create concurrent serial GatewaySender with concurrency of 4 + */ +@Category(DistributedTest.class) +public class ConcurrentWANPropagation_2_DUnitTest extends WANTestBase { + + public ConcurrentWANPropagation_2_DUnitTest() { + super(); + } + + private static final long serialVersionUID = 1L; + + @Test + public void testSerialReplicatedWanWithOverflow() { + + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + //keep the maxQueueMemory low enough to trigger eviction + vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + false, 10, 5, false, false, null, true, 5, OrderPolicy.KEY )); + vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + false, 10, 5, false, false, null, true, 5, OrderPolicy.KEY )); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5); + vm2.invoke(() -> addListenerToSleepAfterCreateEvent(1000, getTestMethodName() + "_RR")); + vm3.invoke(() -> addListenerToSleepAfterCreateEvent(1000, getTestMethodName() + "_RR")); + + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + + vm4.invoke(() -> WANTestBase.doHeavyPuts( + getTestMethodName() + "_RR", 15 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 15, 240000)); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 15, 240000 )); + } + + @Ignore("Bug46921") + @Test + public void testSerialReplicatedWanWithPersistence() { + + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + false, 100, 10, false, true, null, true, 5, OrderPolicy.THREAD )); + vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + false, 100, 10, false, true, null, true, 5, OrderPolicy.THREAD )); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5); + + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", + 1000 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + + } + + @Test + public void testReplicatedSerialPropagationToTwoWanSites() throws Exception { + + Integer lnPort = createFirstLocatorWithDSId(1); + Integer nyPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + Integer tkPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 3, lnPort )); + + createCacheInVMs(nyPort, vm2); + createCacheInVMs(tkPort, vm3); + vm2.invoke(() -> WANTestBase.createReceiver()); + vm3.invoke(() -> WANTestBase.createReceiver()); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createConcurrentSender( "lnSerial1", + 2, false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD )); + vm5.invoke(() -> WANTestBase.createConcurrentSender( "lnSerial1", + 2, false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD )); + + vm4.invoke(() -> WANTestBase.createConcurrentSender( "lnSerial2", + 3, false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD )); + vm5.invoke(() -> WANTestBase.createConcurrentSender( "lnSerial2", + 3, false, 100, 10, false, false, null, true, 5 , OrderPolicy.THREAD)); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + + startSenderInVMs("lnSerial1", vm4, vm5); + + startSenderInVMs("lnSerial2", vm4, vm5); + + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "lnSerial1,lnSerial2", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "lnSerial1,lnSerial2", isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "lnSerial1,lnSerial2", isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "lnSerial1,lnSerial2", isOffHeap() )); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", + 1000 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + } + + @Test + public void testReplicatedSerialPropagationHA() throws Exception { + IgnoredException.addIgnoredException("Broken pipe"); + IgnoredException.addIgnoredException("Connection reset"); + IgnoredException.addIgnoredException("Unexpected IOException"); + + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD )); + vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD )); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5); + + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + + AsyncInvocation inv1 = vm5.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", 10000 )); + Wait.pause(2000); + AsyncInvocation inv2 = vm4.invokeAsync(() -> WANTestBase.killSender()); + + inv1.join(); + inv2.join(); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 10000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 10000 )); + } + + @Test + public void testReplicatedSerialPropagationWithConflation() throws Exception { + + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + false, 100, 1000, true, false, null, true, 5, OrderPolicy.THREAD )); + vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + false, 100, 1000, true, false, null, true, 5, OrderPolicy.THREAD )); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5); + + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", + 1000 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + } + + @Test + public void testReplicatedSerialPropagationWithParallelThreads() + throws Exception { + + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2,lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + false, 100, 10, false, false, null, true, 4, OrderPolicy.THREAD )); + vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + false, 100, 10, false, false, null, true, 4, OrderPolicy.THREAD )); + + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5); + + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", "ln", isOffHeap() )); + + vm4.invoke(() -> WANTestBase.doMultiThreadedPuts( + getTestMethodName() + "_RR", 1000 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000 )); + } + + @Test + public void testSerialPropagationWithFilter() throws Exception { + + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2,lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + false, 100, 10, false, false, + new MyGatewayEventFilter(), true, 5, OrderPolicy.THREAD )); + vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + false, 100, 10, false, false, + new MyGatewayEventFilter(), true, 5, OrderPolicy.THREAD )); + + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName(), "ln", 1, 100, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName(), "ln", 1, 100, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName(), "ln", 1, 100, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName(), "ln", 1, 100, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName(), null, 1, 100, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName(), null, 1, 100, isOffHeap() )); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 1000 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName(), 800 )); + } + + @Test + public void testReplicatedSerialPropagationWithFilter() throws Exception { + + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName(), null, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName(), null, isOffHeap() )); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + false, 100, 10, false, false, + new MyGatewayEventFilter(), true, 5, OrderPolicy.THREAD )); + vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + false, 100, 10, false, false, + new MyGatewayEventFilter(), true, 5, OrderPolicy.THREAD )); + + startSenderInVMs("ln", vm4, vm5); + + vm4.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName(), "ln", isOffHeap() )); + vm5.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName(), "ln", isOffHeap() )); + vm6.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName(), "ln", isOffHeap() )); + vm7.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName(), "ln", isOffHeap() )); + + vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 1000 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName(), 800 )); + vm3.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName(), 800 )); + } + + @Test + public void testNormalRegionSerialPropagation() throws Exception { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + vm2.invoke(() -> WANTestBase.createCache(nyPort)); + vm2.invoke(() -> WANTestBase.createReplicatedRegion( + getTestMethodName() + "_RR", null, isOffHeap() )); + vm2.invoke(() -> WANTestBase.createReceiver()); + + WANTestBase.createCacheInVMs(lnPort, vm4, vm5); + + vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD )); + vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2, + false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD )); + + + startSenderInVMs("ln", vm4, vm5); + + vm4.invoke(() -> WANTestBase.createNormalRegion( + getTestMethodName() + "_RR", "ln" )); + vm5.invoke(() -> WANTestBase.createNormalRegion( + getTestMethodName() + "_RR", "ln" )); + + vm5.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", + 1000 )); + + vm4.invoke(() -> WANTestBase.checkQueueStats( "ln", 0, + 0, 0, 0)); + + vm5.invoke(() -> WANTestBase.checkQueueStats( "ln", 0, + 1000, 0, 0 )); + + vm5.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 1000)); + + vm4.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 0)); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_RR", 0 )); + + vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(0, 0, 0)); + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/disttx/DistTXWANDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/disttx/DistTXWANDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/disttx/DistTXWANDUnitTest.java new file mode 100644 index 0000000..0ac13d4 --- /dev/null +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/disttx/DistTXWANDUnitTest.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.cache.wan.disttx; + +import static com.gemstone.gemfire.distributed.ConfigurationProperties.*; + +import org.junit.Ignore; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.gemstone.gemfire.distributed.internal.DistributionConfig; +import com.gemstone.gemfire.internal.cache.wan.WANTestBase; +import com.gemstone.gemfire.test.dunit.Invoke; +import com.gemstone.gemfire.test.dunit.LogWriterUtils; +import com.gemstone.gemfire.test.dunit.SerializableCallable; +import com.gemstone.gemfire.test.junit.categories.DistributedTest; + +@Category(DistributedTest.class) +public class DistTXWANDUnitTest extends WANTestBase { + + @Override + protected final void postSetUpWANTestBase() throws Exception { + Invoke.invokeInEveryVM(new SerializableCallable() { + @Override + public Object call() throws Exception { + System.setProperty(DistributionConfig.GEMFIRE_PREFIX + LOG_LEVEL, LogWriterUtils.getDUnitLogLevel()); + return null; + } + }); + } + + /** + * Disabled because it hangs with current implementation of notifying + * adjunct receivers by sending DistTXAdjunctCommitMessage from primary at the + * time of commit. + */ + @Ignore("TODO: test is disabled") + @Test + public void testPartitionedSerialPropagation_SenderSameAsCoordinator() throws Exception { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, null, true )); + + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + + vm4.invoke(() -> WANTestBase.doDistTXPuts( getTestMethodName() + "_PR", + 50 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 50 )); + } + + @Test + public void testPartitionedSerialPropagation_SenderNotSameAsCoordinator() throws Exception { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + false, 100, 10, false, false, null, true )); + + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + + vm6.invoke(() -> WANTestBase.doDistTXPuts( getTestMethodName() + "_PR", + 50 )); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 50 )); + } + + @Test + public void testPartitionedRegionParallelPropagation() throws Exception { + Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 )); + Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort )); + + createCacheInVMs(nyPort, vm2, vm3); + createReceiverInVMs(vm2, vm3); + + createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); + + vm4.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + vm5.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + vm6.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + vm7.invoke(() -> WANTestBase.createSender( "ln", 2, + true, 100, 10, false, false, null, true )); + + vm4.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm5.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm6.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + vm7.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() )); + + startSenderInVMs("ln", vm4, vm5, vm6, vm7); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + vm3.invoke(() -> WANTestBase.createPartitionedRegion( + getTestMethodName() + "_PR", null, 1, 100, isOffHeap() )); + + //before doing any puts, let the senders be running in order to ensure that + //not a single event will be lost + vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" )); + + vm4.invoke(() -> WANTestBase.doDistTXPuts( getTestMethodName() + "_PR", + 5 )); + + //verify all buckets drained on all sender nodes. + vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + + vm2.invoke(() -> WANTestBase.validateRegionSize( + getTestMethodName() + "_PR", 5 )); + } +}
