http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderDUnitTest.java
----------------------------------------------------------------------
diff --git 
a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderDUnitTest.java
 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderDUnitTest.java
new file mode 100644
index 0000000..19edd7d
--- /dev/null
+++ 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderDUnitTest.java
@@ -0,0 +1,737 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.wan.concurrent;
+
+import com.jayway.awaitility.Awaitility;
+import org.junit.experimental.categories.Category;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase;
+import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+
+import com.gemstone.gemfire.cache.EntryExistsException;
+import com.gemstone.gemfire.cache.client.ServerOperationException;
+import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy;
+import com.gemstone.gemfire.cache.wan.GatewaySender;
+import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
+import com.gemstone.gemfire.internal.cache.wan.BatchException70;
+import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
+import 
com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderEventProcessor;
+import com.gemstone.gemfire.test.dunit.AsyncInvocation;
+import com.gemstone.gemfire.test.dunit.IgnoredException;
+import com.gemstone.gemfire.test.dunit.LogWriterUtils;
+import com.gemstone.gemfire.test.dunit.Wait;
+
+import java.net.SocketException;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test the functionality of ParallelGatewaySender with multiple dispatchers.
+ *
+ */
+@Category(DistributedTest.class)
+public class ConcurrentParallelGatewaySenderDUnitTest extends WANTestBase {
+  
+  public ConcurrentParallelGatewaySenderDUnitTest() {
+    super();
+  }
+  
+  /**
+   * Normal happy scenario test case.
+   * checks that all the dispatchers have successfully 
+   * dispatched something individually.
+   * 
+   * @throws Exception
+   */
+  @Test
+  public void testParallelPropagationConcurrentArtifacts() throws Exception {
+    Integer lnPort = (Integer)vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 5, OrderPolicy.PARTITION ));
+    vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 5, OrderPolicy.PARTITION ));
+    vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 5, OrderPolicy.PARTITION ));
+    vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 5, OrderPolicy.PARTITION ));
+
+    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+    vm6.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+    vm7.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
+    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
+
+    //before doing any puts, let the senders be running in order to ensure that
+    //not a single event will be lost
+    vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    
+    try {
+      // set the test hook to find out dispatched events by each of the
+      // concurrent dispatcher
+      vm4.invoke(() -> 
ConcurrentParallelGatewaySenderDUnitTest.setTestHook("ln", Boolean.TRUE ));
+      vm5.invoke(() -> 
ConcurrentParallelGatewaySenderDUnitTest.setTestHook("ln", Boolean.TRUE ));
+      vm6.invoke(() -> 
ConcurrentParallelGatewaySenderDUnitTest.setTestHook("ln", Boolean.TRUE ));
+      vm7.invoke(() -> 
ConcurrentParallelGatewaySenderDUnitTest.setTestHook("ln", Boolean.TRUE ));
+
+      vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
+          1000 ));
+
+      // verify all buckets drained on all sender nodes.
+      vm4.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" ));
+      vm5.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" ));
+      vm6.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" ));
+      vm7.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" ));
+
+      vm2.invoke(() -> WANTestBase.validateRegionSize(
+          getTestMethodName() + "_PR", 1000 ));
+
+      int dispatched1 = (Integer)vm4.invoke(() -> 
WANTestBase.verifyAndGetEventsDispatchedByConcurrentDispatchers( "ln" ));
+      int dispatched2 = (Integer)vm5.invoke(() -> 
WANTestBase.verifyAndGetEventsDispatchedByConcurrentDispatchers( "ln" ));
+      int dispatched3 = (Integer)vm6.invoke(() -> 
WANTestBase.verifyAndGetEventsDispatchedByConcurrentDispatchers( "ln" ));
+      int dispatched4 = (Integer)vm7.invoke(() -> 
WANTestBase.verifyAndGetEventsDispatchedByConcurrentDispatchers( "ln" ));
+
+      assertEquals(1000, dispatched1 + dispatched2 + dispatched3 + 
dispatched4);
+    }
+    finally {
+      vm4.invoke(() -> 
ConcurrentParallelGatewaySenderDUnitTest.setTestHook("ln", Boolean.FALSE ));
+      vm5.invoke(() -> 
ConcurrentParallelGatewaySenderDUnitTest.setTestHook("ln", Boolean.FALSE ));
+      vm6.invoke(() -> 
ConcurrentParallelGatewaySenderDUnitTest.setTestHook("ln", Boolean.FALSE ));
+      vm7.invoke(() -> 
ConcurrentParallelGatewaySenderDUnitTest.setTestHook("ln", Boolean.FALSE ));
+    }
+  }
+  
+  /**
+   * Normal happy scenario test case.
+   * @throws Exception
+   */
+  @Test
+  public void testParallelPropagation() throws Exception {
+    Integer lnPort = (Integer)vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 5, OrderPolicy.PARTITION ));
+    vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 5, OrderPolicy.PARTITION ));
+    vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 5, OrderPolicy.PARTITION ));
+    vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 5, OrderPolicy.PARTITION ));
+
+    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+    vm6.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+    vm7.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
+    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
+
+    //before doing any puts, let the senders be running in order to ensure that
+    //not a single event will be lost
+    vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
+        1000 ));
+    
+    //verify all buckets drained on all sender nodes.
+    vm4.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm5.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm6.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm7.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_PR", 1000 ));
+  }
+  
+  
+  /**
+   * Normal happy scenario test case when bucket division tests boundary cases.
+   * @throws Exception
+   */
+  @Test
+  public void testParallelPropagationWithUnEqualBucketDivision() throws 
Exception {
+    Integer lnPort = (Integer)vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
+    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
+    createReceiverInVMs(vm2, vm3);
+
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 7, OrderPolicy.PARTITION ));
+    vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 7, OrderPolicy.PARTITION ));
+    vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 7, OrderPolicy.PARTITION ));
+    vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 7, OrderPolicy.PARTITION ));
+
+    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+    vm6.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+    vm7.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    //before doing any puts, let the senders be running in order to ensure that
+    //not a single event will be lost
+    vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
+        1000 ));
+    
+    //verify all buckets drained on all sender nodes.
+    vm4.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm5.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm6.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm7.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_PR", 1000 ));
+  }
+  
+  
+  /**
+   * Initially the GatewayReceiver is not up but later it comes up.
+   * We verify that 
+   * @throws Exception
+   */
+  @Test
+  public void testParallelPropagation_withoutRemoteSite() throws Exception {
+    Integer lnPort = (Integer)vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    //keep a larger batch to minimize number of exception occurrences in the 
log
+    vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 300, false, false, null, true, 6, OrderPolicy.PARTITION ));
+    vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 300, false, false, null, true, 6, OrderPolicy.PARTITION ));
+    vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 300, false, false, null, true, 6, OrderPolicy.PARTITION ));
+    vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 300, false, false, null, true, 6, OrderPolicy.PARTITION ));
+
+    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+    vm6.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+    vm7.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    //make sure all the senders are running before doing any puts
+    vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
+      1000 ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
+    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
+
+    createReceiverInVMs(vm2, vm3);
+
+    //verify all buckets drained on all sender nodes.
+    vm4.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm5.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm6.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm7.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    
+    // Just making sure that though the remote site is started later,
+    // remote site is still able to get the data. Since the receivers are
+    // started before creating partition region it is quite possible that the
+    // region may loose some of the events. This needs to be handled by the 
code
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_PR", 1000 ));
+  }
+  
+  /**
+   * Testing for colocated region with orderPolicy Partition
+   */
+  @Test
+  public void testParallelPropagationColocatedPartitionedRegions() {
+    Integer lnPort = (Integer)vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 5, OrderPolicy.PARTITION ));
+    vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 5, OrderPolicy.PARTITION ));
+    vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 5, OrderPolicy.PARTITION ));
+    vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 5, OrderPolicy.PARTITION ));
+
+
+    vm4.invoke(() -> 
WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 1, 100, 
isOffHeap() ));
+    vm5.invoke(() -> 
WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 1, 100, 
isOffHeap() ));
+    vm6.invoke(() -> 
WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 1, 100, 
isOffHeap() ));
+    vm7.invoke(() -> 
WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 1, 100, 
isOffHeap() ));
+
+
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    vm2.invoke(() -> 
WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 1, 100, 
isOffHeap() ));
+    vm3.invoke(() -> 
WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 1, 100, 
isOffHeap() ));
+
+    //before doing any puts, let the senders be running in order to ensure that
+    //not a single event will be lost
+    vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    
+    vm4.invoke(() -> WANTestBase.putcolocatedPartitionedRegion( 1000 ));
+    
+    //verify all buckets drained on all sender nodes.
+    vm4.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm5.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm6.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm7.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+      customerRegionName, 1000 ));
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+      orderRegionName, 1000 ));
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+      shipmentRegionName, 1000 ));
+  }
+  
+  /**
+   * Local and remote sites are up and running.
+   * Local site cache is closed and the site is built again.
+   * Puts are done to local site.
+   * Expected: Remote site should receive all the events put after the local
+   * site was built back.
+   * 
+   * @throws Exception
+   */
+  @Test
+  public void testParallelPropagationWithLocalCacheClosedAndRebuilt() throws 
Exception {
+    Integer lnPort = (Integer)vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 6, OrderPolicy.PARTITION ));
+    vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 6, OrderPolicy.PARTITION ));
+    vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 6, OrderPolicy.PARTITION));
+    vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 6, OrderPolicy.PARTITION ));
+
+    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+    vm6.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+    vm7.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
+    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
+
+    //before doing any puts, let the senders be running in order to ensure that
+    //not a single event will be lost
+    vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
+      1000 ));
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+              getTestMethodName() + "_PR", 1000 ));
+    //-------------------Close and rebuild local site 
---------------------------------
+
+    vm4.invoke(() -> WANTestBase.killSender());
+    vm5.invoke(() -> WANTestBase.killSender());
+    vm6.invoke(() -> WANTestBase.killSender());
+    vm7.invoke(() -> WANTestBase.killSender());
+    
+    Integer regionSize = 
+      (Integer) vm2.invoke(() -> WANTestBase.getRegionSize(getTestMethodName() 
+ "_PR" ));
+    LogWriterUtils.getLogWriter().info("Region size on remote is: " + 
regionSize);
+    
+    vm4.invoke(() -> WANTestBase.createCache( lnPort ));
+    vm5.invoke(() -> WANTestBase.createCache( lnPort ));
+    vm6.invoke(() -> WANTestBase.createCache( lnPort ));
+    vm7.invoke(() -> WANTestBase.createCache( lnPort ));
+    
+    vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 6, OrderPolicy.PARTITION ));
+    vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 6, OrderPolicy.PARTITION ));
+    vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 6, OrderPolicy.PARTITION ));
+    vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 6, OrderPolicy.PARTITION ));
+    
+    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+    vm6.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+    vm7.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    
//------------------------------------------------------------------------------------
+    
+    IgnoredException.addIgnoredException(EntryExistsException.class.getName());
+    IgnoredException.addIgnoredException(BatchException70.class.getName());
+    
IgnoredException.addIgnoredException(ServerOperationException.class.getName());
+    
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 10000 ));
+    
+    //verify all buckets drained on all sender nodes.
+    vm4.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm5.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm6.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm7.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_PR", 10000 ));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(
+      getTestMethodName() + "_PR", 10000 ));
+  }
+  
+  /**
+   * Colocated regions using ConcurrentParallelGatewaySender.
+   * Normal scenario 
+   * @throws Exception
+   */
+  @Test
+  public void testParallelColocatedPropagation() throws Exception {
+    Integer lnPort = (Integer)vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 7, OrderPolicy.KEY ));
+    vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 7, OrderPolicy.KEY ));
+    vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 7, OrderPolicy.KEY ));
+    vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 7, OrderPolicy.KEY ));
+
+    vm4.invoke(() -> WANTestBase.createColocatedPartitionedRegions( 
getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.createColocatedPartitionedRegions( 
getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+    vm6.invoke(() -> WANTestBase.createColocatedPartitionedRegions( 
getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+    vm7.invoke(() -> WANTestBase.createColocatedPartitionedRegions( 
getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    vm2.invoke(() -> WANTestBase.createColocatedPartitionedRegions( 
getTestMethodName(), null, 1, 100, isOffHeap() ));
+    vm3.invoke(() -> WANTestBase.createColocatedPartitionedRegions( 
getTestMethodName(), null, 1, 100, isOffHeap() ));
+
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 1000 ));
+    
+    //verify all buckets drained on all sender nodes.
+    vm4.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm5.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm6.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm7.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName(), 1000 ));
+  }
+  
+  
+  /**
+   * Colocated regions using ConcurrentParallelGatewaySender.
+   * Normal scenario 
+   * @throws Exception
+   */
+  @Test
+  public void testParallelColocatedPropagationOrderPolicyPartition() throws 
Exception {
+    Integer lnPort = (Integer)vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 7, OrderPolicy.PARTITION ));
+    vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 7, OrderPolicy.PARTITION ));
+    vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 7, OrderPolicy.PARTITION ));
+    vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 7, OrderPolicy.PARTITION ));
+
+    vm4.invoke(() -> WANTestBase.createColocatedPartitionedRegions( 
getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.createColocatedPartitionedRegions( 
getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+    vm6.invoke(() -> WANTestBase.createColocatedPartitionedRegions( 
getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+    vm7.invoke(() -> WANTestBase.createColocatedPartitionedRegions( 
getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    vm2.invoke(() -> WANTestBase.createColocatedPartitionedRegions( 
getTestMethodName(), null, 1, 100, isOffHeap() ));
+    vm3.invoke(() -> WANTestBase.createColocatedPartitionedRegions( 
getTestMethodName(), null, 1, 100, isOffHeap() ));
+
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 1000 ));
+    
+    //verify all buckets drained on all sender nodes.
+    vm4.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm5.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm6.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm7.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName(), 1000 ));
+  }
+  
+  @Test
+  public void testPartitionedParallelPropagationHA() throws Exception {
+    IgnoredException.addIgnoredException(SocketException.class.getName()); // 
for Connection reset
+    Integer lnPort = (Integer)vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 6, OrderPolicy.KEY ));
+    vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 6, OrderPolicy.KEY ));
+    vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 6, OrderPolicy.KEY ));
+    vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 6, OrderPolicy.KEY ));
+    
+    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 2, 100, isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 2, 100, isOffHeap() ));
+    vm6.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 2, 100, isOffHeap() ));
+    vm7.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 2, 100, isOffHeap() ));
+
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
+    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
+
+    AsyncInvocation inv1 = vm7.invokeAsync(() -> WANTestBase.doPuts( 
getTestMethodName() + "_PR", 5000 ));
+    vm2.invoke(() -> Awaitility.await().atMost(30000, 
TimeUnit.MILLISECONDS).until(() ->
+      assertEquals("Failure in waiting for at least 10 events to be received 
by the receiver",
+        true, (getRegionSize(getTestMethodName() + "_PR") > 10 ))));
+    AsyncInvocation inv2 = vm4.invokeAsync(() -> WANTestBase.killSender());
+    AsyncInvocation inv3 = vm6.invokeAsync(() -> WANTestBase.doPuts( 
getTestMethodName() + "_PR", 10000 ));
+    vm2.invoke(() -> Awaitility.await().atMost(30000, 
TimeUnit.MILLISECONDS).until(() ->
+      assertEquals("Failure in waiting for additional 2000 events to be 
received by the receiver ",
+        true,getRegionSize(getTestMethodName() + "_PR") > 7000 )));
+    AsyncInvocation inv4 = vm5.invokeAsync(() -> WANTestBase.killSender());
+    inv1.join();
+    inv2.join();
+    inv3.join();
+    inv4.join();
+
+    vm6.invoke(() -> WANTestBase.validateRegionSize(
+      getTestMethodName() + "_PR", 10000 ));
+    vm7.invoke(() -> WANTestBase.validateRegionSize(
+      getTestMethodName() + "_PR", 10000 ));
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+      getTestMethodName() + "_PR", 10000 ));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(
+      getTestMethodName() + "_PR", 10000 ));
+    
+    //verify all buckets drained on the sender nodes that up and running.
+    vm6.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm7.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+  }
+  
+  @Test
+  public void testWANPDX_PR_MultipleVM_ConcurrentParallelSender() {
+    Integer lnPort = (Integer)vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2);
+    vm2.invoke(() -> WANTestBase.createReceiver());
+
+    createCacheInVMs(lnPort, vm3, vm4);
+
+    vm3.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY ));
+    vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+      true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY ));
+    
+    vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", null, 0, 2, isOffHeap()));
+
+    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName() + "_PR", "ln", 0, 2, isOffHeap()));
+    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName() + "_PR", "ln", 0, 2, isOffHeap()));
+
+    startSenderInVMs("ln", vm3, vm4);
+
+    vm3.invoke(() -> WANTestBase.doPutsPDXSerializable( getTestMethodName() + 
"_PR",
+        10 ));
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize_PDX(
+        getTestMethodName() + "_PR", 10 ));
+  }
+  
+  @Test
+  public void testWANPDX_PR_MultipleVM_ConcurrentParallelSender_StartedLater() 
{
+    Integer lnPort = (Integer)vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    vm2.invoke(() -> WANTestBase.createReceiver_PDX( nyPort ));
+
+    vm3.invoke(() -> WANTestBase.createCache_PDX( lnPort ));
+    vm4.invoke(() -> WANTestBase.createCache_PDX( lnPort ));
+
+    vm3.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY ));
+    vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+      true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY ));
+    
+    vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", null, 0, 2, isOffHeap()));
+
+    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName() + "_PR", "ln", 0, 2, isOffHeap()));
+    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName() + "_PR", "ln", 0, 2, isOffHeap()));
+    
+    vm3.invoke(() -> WANTestBase.doPutsPDXSerializable( getTestMethodName() + 
"_PR",
+        10 ));
+
+    startSenderInVMsAsync("ln", vm3, vm4);
+
+    vm4.invoke(() -> WANTestBase.doPutsPDXSerializable( getTestMethodName() + 
"_PR",
+      40 ));
+    
+    vm2.invoke(() -> WANTestBase.validateRegionSize_PDX(
+        getTestMethodName() + "_PR", 40 ));
+  }
+
+  public static void setTestHook(String senderId, boolean hook) {
+    Set<GatewaySender> senders = cache.getGatewaySenders();
+    GatewaySender sender = null;
+    for (GatewaySender s : senders) {
+      if (s.getId().equals(senderId)) {
+        sender = s;
+        break;
+      }
+    }
+    ConcurrentParallelGatewaySenderEventProcessor cProc = 
(ConcurrentParallelGatewaySenderEventProcessor)((AbstractGatewaySender)sender)
+        .getEventProcessor();
+    if (cProc == null) return;
+    cProc.TEST_HOOK = hook;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOffHeapDUnitTest.java
----------------------------------------------------------------------
diff --git 
a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOffHeapDUnitTest.java
 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOffHeapDUnitTest.java
new file mode 100644
index 0000000..da8a631
--- /dev/null
+++ 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOffHeapDUnitTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.wan.concurrent;
+
+import org.junit.experimental.categories.Category;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase;
+import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+
+@SuppressWarnings("serial")
+@Category(DistributedTest.class)
+public class ConcurrentParallelGatewaySenderOffHeapDUnitTest extends
+    ConcurrentParallelGatewaySenderDUnitTest {
+
+  public ConcurrentParallelGatewaySenderOffHeapDUnitTest() {
+    super();
+  }
+
+  @Override
+  public boolean isOffHeap() {
+    return true;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_1_DUnitTest.java
----------------------------------------------------------------------
diff --git 
a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_1_DUnitTest.java
 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_1_DUnitTest.java
new file mode 100644
index 0000000..ab1c06b
--- /dev/null
+++ 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_1_DUnitTest.java
@@ -0,0 +1,796 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.wan.concurrent;
+
+import org.junit.Ignore;
+import org.junit.experimental.categories.Category;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase;
+import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy;
+import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
+import com.gemstone.gemfire.test.dunit.AsyncInvocation;
+import com.gemstone.gemfire.test.dunit.IgnoredException;
+import com.gemstone.gemfire.test.dunit.LogWriterUtils;
+import com.gemstone.gemfire.test.dunit.Wait;
+import com.gemstone.gemfire.test.junit.categories.FlakyTest;
+
+@Category(DistributedTest.class)
+public class ConcurrentParallelGatewaySenderOperation_1_DUnitTest extends 
WANTestBase {
+  private static final long serialVersionUID = 1L;
+  
+  public ConcurrentParallelGatewaySenderOperation_1_DUnitTest() {
+    super();
+  }
+
+  @Override
+  protected final void postSetUpWANTestBase() throws Exception {
+    IgnoredException.addIgnoredException("Broken pipe");
+    IgnoredException.addIgnoredException("Connection reset");
+    IgnoredException.addIgnoredException("Unexpected IOException");
+  }
+  
+  @Test
+  public void testParallelGatewaySenderWithoutStarting() {
+    Integer lnPort = (Integer)vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 6, OrderPolicy.KEY ));
+    vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 6, OrderPolicy.KEY ));
+    vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 6, OrderPolicy.KEY ));
+    vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 6, OrderPolicy.KEY ));
+
+    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+    vm6.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+    vm7.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+
+    vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
+    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
+
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 ));
+    
+    vm4.invoke(() -> WANTestBase.verifySenderStoppedState( "ln" ));
+    vm5.invoke(() -> WANTestBase.verifySenderStoppedState( "ln" ));
+    vm6.invoke(() -> WANTestBase.verifySenderStoppedState( "ln" ));
+    vm7.invoke(() -> WANTestBase.verifySenderStoppedState( "ln" ));
+    
+    vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + 
"_PR", 0 ));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + 
"_PR", 0 ));
+  }
+  
+  /**
+   * Defect 44323 (ParallelGatewaySender should not be started on Accessor 
node)
+   */
+  @Test
+  public void testParallelGatewaySenderStartOnAccessorNode() {
+    Integer lnPort = (Integer)vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 7, OrderPolicy.KEY ));
+    vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 7, OrderPolicy.KEY ));
+    vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 7, OrderPolicy.KEY ));
+    vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 7, OrderPolicy.KEY ));
+
+    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+    vm6.invoke(() -> WANTestBase.createPartitionedRegionAsAccessor(
+        getTestMethodName() + "_PR", "ln", 1, 100 ));
+    vm7.invoke(() -> WANTestBase.createPartitionedRegionAsAccessor(
+        getTestMethodName() + "_PR", "ln", 1, 100 ));
+
+    vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
+    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
+    
+    //start the senders
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 ));
+
+    vm4.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm5.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    
+    vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + 
"_PR", 1000 ));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + 
"_PR", 1000 ));
+  }
+
+  
+  /**
+   * Normal scenario in which the sender is paused in between.
+   * @throws Exception
+   */
+  @Test
+  public void testParallelPropagationSenderPause() throws Exception {
+    Integer lnPort = (Integer)vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY ));
+    vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY ));
+    vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY ));
+    vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY ));
+
+    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+    vm6.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+    vm7.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
+    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
+
+    //make sure all the senders are running before doing any puts
+    vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    
+    //FIRST RUN: now, the senders are started. So, start the puts
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 100 ));
+    
+    //now, pause all of the senders
+    vm4.invoke(() -> WANTestBase.pauseSender( "ln" ));
+    vm5.invoke(() -> WANTestBase.pauseSender( "ln" ));
+    vm6.invoke(() -> WANTestBase.pauseSender( "ln" ));
+    vm7.invoke(() -> WANTestBase.pauseSender( "ln" ));
+    Wait.pause(2000);
+    //SECOND RUN: keep one thread doing puts to the region
+    vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 
1000 ));
+    
+    //verify region size remains on remote vm and is restricted below a 
specified limit (i.e. number of puts in the first run)
+    vm2.invoke(() -> 
WANTestBase.validateRegionSizeRemainsSame(getTestMethodName() + "_PR", 100 ));
+  }
+
+  /**
+   * Normal scenario in which a paused sender is resumed.
+   * @throws Exception
+   */
+  @Test
+  public void testParallelPropagationSenderResume() throws Exception {
+    Integer lnPort = (Integer)vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 8, OrderPolicy.KEY ));
+    vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 8, OrderPolicy.KEY ));
+    vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 8, OrderPolicy.KEY ));
+    vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 8, OrderPolicy.KEY ));
+
+    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+    vm6.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+    vm7.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
+    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
+
+    //make sure all the senders are running before doing any puts
+    vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    
+    //now, the senders are started. So, start the puts
+    vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 
1000 ));
+    
+    //now, pause all of the senders
+    vm4.invoke(() -> WANTestBase.pauseSender( "ln" ));
+    vm5.invoke(() -> WANTestBase.pauseSender( "ln" ));
+    vm6.invoke(() -> WANTestBase.pauseSender( "ln" ));
+    vm7.invoke(() -> WANTestBase.pauseSender( "ln" ));
+    
+    //sleep for a second or two
+    Wait.pause(2000);
+    
+    //resume the senders
+    vm4.invoke(() -> WANTestBase.resumeSender( "ln" ));
+    vm5.invoke(() -> WANTestBase.resumeSender( "ln" ));
+    vm6.invoke(() -> WANTestBase.resumeSender( "ln" ));
+    vm7.invoke(() -> WANTestBase.resumeSender( "ln" ));
+    
+    Wait.pause(2000);
+    
+    vm4.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm5.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm6.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm7.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    
+    //find the region size on remote vm
+    vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + 
"_PR", 1000 ));
+ 
+  }
+  
+  /**
+   * Negative scenario in which a sender that is stopped (and not paused) is 
resumed.
+   * Expected: resume is only valid for pause. If a sender which is stopped is 
resumed,
+   * it will not be started again.
+   * 
+   * @throws Exception
+   */
+  @Test
+  public void testParallelPropagationSenderResumeNegativeScenario() throws 
Exception {
+    Integer lnPort = (Integer)vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+
+    createCacheInVMs(nyPort, vm4, vm5);
+    vm4.invoke(() -> WANTestBase.createCache( lnPort ));
+    vm5.invoke(() -> WANTestBase.createCache( lnPort ));
+
+    vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY ));
+    vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY ));
+
+    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+
+    vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
+    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    //wait till the senders are running
+    vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+
+    //start the puts
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 100 ));
+
+    //let the queue drain completely
+    vm4.invoke(() -> WANTestBase.validateQueueContents( "ln", 0 ));
+    
+    //stop the senders
+    vm4.invoke(() -> WANTestBase.stopSender( "ln" ));
+    vm5.invoke(() -> WANTestBase.stopSender( "ln" ));
+    
+    //now, try to resume a stopped sender
+    vm4.invoke(() -> WANTestBase.resumeSender( "ln" ));
+    vm5.invoke(() -> WANTestBase.resumeSender( "ln" ));
+    
+    //do more puts
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 ));
+    
+    //validate region size on remote vm to contain only the events put in 
local site 
+    //before the senders are stopped.
+    vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + 
"_PR", 100 ));
+  }
+
+  /**
+   * Normal scenario in which a sender is stopped.
+   * @throws Exception
+   */
+  @Test
+  public void testParallelPropagationSenderStop() throws Exception {
+    Integer lnPort = (Integer)vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
+    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
+    createReceiverInVMs(vm2, vm3);
+
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 3, OrderPolicy.KEY ));
+    vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 3, OrderPolicy.KEY ));
+    vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 3, OrderPolicy.KEY ));
+    vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 3, OrderPolicy.KEY ));
+
+    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+    vm6.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+    vm7.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    //make sure all the senders are running before doing any puts
+    vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    
+    //FIRST RUN: now, the senders are started. So, do some of the puts
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 100 ));
+    
+    //now, stop all of the senders
+    vm4.invoke(() -> WANTestBase.stopSender( "ln" ));
+    vm5.invoke(() -> WANTestBase.stopSender( "ln" ));
+    vm6.invoke(() -> WANTestBase.stopSender( "ln" ));
+    vm7.invoke(() -> WANTestBase.stopSender( "ln" ));
+    
+    //SECOND RUN: keep one thread doing puts
+    vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 
1000 ));
+    
+    //verify region size remains on remote vm and is restricted below a 
specified limit (number of puts in the first run)
+    vm2.invoke(() -> 
WANTestBase.validateRegionSizeRemainsSame(getTestMethodName() + "_PR", 100 ));
+  }
+
+  /**
+   * Normal scenario in which a sender is stopped and then started again.
+   */
+  @Test
+  public void testParallelPropagationSenderStartAfterStop() throws Throwable {
+    Integer lnPort = (Integer)vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
+    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
+    createReceiverInVMs(vm2, vm3);
+
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY ));
+    vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY ));
+    vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY ));
+    vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY ));
+
+    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+    vm6.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+    vm7.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+
+    //make sure all the senders are running before doing any puts
+    vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    
+    //FIRST RUN: now, the senders are started. So, do some of the puts
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 200 ));
+    
+    //now, stop all of the senders
+    vm4.invoke(() -> WANTestBase.stopSender( "ln" ));
+    vm5.invoke(() -> WANTestBase.stopSender( "ln" ));
+    vm6.invoke(() -> WANTestBase.stopSender( "ln" ));
+    vm7.invoke(() -> WANTestBase.stopSender( "ln" ));
+    
+    Wait.pause(2000);
+
+    //SECOND RUN: do some of the puts after the senders are stopped
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 ));
+    
+    //Region size on remote site should remain same and below the number of 
puts done in the FIRST RUN
+    vm2.invoke(() -> 
WANTestBase.validateRegionSizeRemainsSame(getTestMethodName() + "_PR", 200 ));
+    
+    //start the senders again
+    AsyncInvocation vm4start = vm4.invokeAsync(() -> WANTestBase.startSender( 
"ln" ));
+    AsyncInvocation vm5start = vm5.invokeAsync(() -> WANTestBase.startSender( 
"ln" ));
+    AsyncInvocation vm6start = vm6.invokeAsync(() -> WANTestBase.startSender( 
"ln" ));
+    AsyncInvocation vm7start = vm7.invokeAsync(() -> WANTestBase.startSender( 
"ln" ));
+    int START_TIMEOUT = 30000;
+    vm4start.getResult(START_TIMEOUT);
+    vm5start.getResult(START_TIMEOUT);
+    vm6start.getResult(START_TIMEOUT);
+    vm7start.getResult(START_TIMEOUT);
+
+    //Region size on remote site should remain same and below the number of 
puts done in the FIRST RUN
+    vm2.invoke(() -> 
WANTestBase.validateRegionSizeRemainsSame(getTestMethodName() + "_PR", 200 ));
+
+    //SECOND RUN: do some more puts
+    AsyncInvocation async = vm4.invokeAsync(() -> WANTestBase.doPuts( 
getTestMethodName() + "_PR", 1000 ));
+    async.join();
+    
+    //verify all the buckets on all the sender nodes are drained
+    vm4.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm5.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm6.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm7.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    
+    //verify the events propagate to remote site
+    vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + 
"_PR", 1000 ));
+    
+    vm4.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 0 ));
+    vm5.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 0 ));
+    vm6.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 0 ));
+    vm7.invoke(() -> WANTestBase.validateQueueSizeStat( "ln", 0 ));
+  }
+
+  /**
+   * Normal scenario in which a sender is stopped and then started again.
+   * Differs from above test case in the way that when the sender is starting 
from
+   * stopped state, puts are simultaneously happening on the region by another 
thread.
+   * @throws Exception
+   */
+  @Ignore("Bug47553")
+  @Test
+  public void testParallelPropagationSenderStartAfterStop_Scenario2() throws 
Exception {
+    Integer lnPort = (Integer)vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
+    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
+    createReceiverInVMs(vm2, vm3);
+
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 7, OrderPolicy.KEY ));
+    vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 7, OrderPolicy.KEY ));
+    vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 7, OrderPolicy.KEY ));
+    vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 7, OrderPolicy.KEY ));
+
+    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+    vm6.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+    vm7.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    //make sure all the senders are running before doing any puts
+    vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    
+    LogWriterUtils.getLogWriter().info("All the senders are now started");
+    
+    //FIRST RUN: now, the senders are started. So, do some of the puts
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 200 ));
+    
+    LogWriterUtils.getLogWriter().info("Done few puts");
+    
+    //now, stop all of the senders
+    vm4.invoke(() -> WANTestBase.stopSender( "ln" ));
+    vm5.invoke(() -> WANTestBase.stopSender( "ln" ));
+    vm6.invoke(() -> WANTestBase.stopSender( "ln" ));
+    vm7.invoke(() -> WANTestBase.stopSender( "ln" ));
+    
+    LogWriterUtils.getLogWriter().info("All the senders are stopped");
+    Wait.pause(2000);
+    
+    //SECOND RUN: do some of the puts after the senders are stopped
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 ));
+    LogWriterUtils.getLogWriter().info("Done some more puts in second run");
+    
+    //Region size on remote site should remain same and below the number of 
puts done in the FIRST RUN
+    vm2.invoke(() -> 
WANTestBase.validateRegionSizeRemainsSame(getTestMethodName() + "_PR", 200 ));
+    
+    //SECOND RUN: start async puts on region
+    AsyncInvocation async = vm4.invokeAsync(() -> WANTestBase.doPuts( 
getTestMethodName() + "_PR", 5000 ));
+    LogWriterUtils.getLogWriter().info("Started high number of puts by async 
thread");
+
+    LogWriterUtils.getLogWriter().info("Starting the senders at the same 
time");
+    //when puts are happening by another thread, start the senders
+    startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7);
+
+    LogWriterUtils.getLogWriter().info("All the senders are started");
+    
+    async.join();
+        
+    Wait.pause(2000);
+    
+    //verify all the buckets on all the sender nodes are drained
+    vm4.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm5.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm6.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm7.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    
+    //verify that the queue size ultimately becomes zero. That means all the 
events propagate to remote site.
+    vm4.invoke(() -> WANTestBase.validateQueueContents( "ln", 0 ));
+  }
+  
+  /**
+   * Normal scenario in which a sender is stopped and then started again on 
accessor node.
+   * @throws Exception
+   */
+  @Test
+  public void testParallelPropagationSenderStartAfterStopOnAccessorNode() 
throws Throwable {
+    Integer lnPort = (Integer)vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+    vm6.invoke(() -> WANTestBase.createPartitionedRegionAsAccessor(
+        getTestMethodName() + "_PR", "ln", 1, 100));
+    vm7.invoke(() -> WANTestBase.createPartitionedRegionAsAccessor(
+        getTestMethodName() + "_PR", "ln", 1, 100));
+
+    vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+      true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY ));
+    vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+      true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY ));
+    vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+      true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY ));
+    vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+      true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY ));
+
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
+    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
+
+
+    createReceiverInVMs(vm2, vm3);
+
+    //make sure all the senders are not running on accessor nodes and running 
on non-accessor nodes
+    vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    
+    vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    
+    //FIRST RUN: now, the senders are started. So, do some of the puts
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 200 ));
+
+    //now, stop all of the senders
+    vm4.invoke(() -> WANTestBase.stopSender( "ln" ));
+    vm5.invoke(() -> WANTestBase.stopSender( "ln" ));
+    vm6.invoke(() -> WANTestBase.stopSender( "ln" ));
+    vm7.invoke(() -> WANTestBase.stopSender( "ln" ));
+
+    //SECOND RUN: do some of the puts after the senders are stopped
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 ));
+    
+    //Region size on remote site should remain same and below the number of 
puts done in the FIRST RUN
+    vm2.invoke(() -> 
WANTestBase.validateRegionSizeRemainsSame(getTestMethodName() + "_PR", 200 ));
+    
+    //start the senders again
+    AsyncInvocation vm4start = vm4.invokeAsync(() -> WANTestBase.startSender( 
"ln" ));
+    AsyncInvocation vm5start = vm5.invokeAsync(() -> WANTestBase.startSender( 
"ln" ));
+    AsyncInvocation vm6start = vm6.invokeAsync(() -> WANTestBase.startSender( 
"ln" ));
+    AsyncInvocation vm7start = vm7.invokeAsync(() -> WANTestBase.startSender( 
"ln" ));
+    vm4start.join();
+    vm5start.join();
+    vm6start.join();
+    vm7start.join();
+
+    vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+
+
+    //Region size on remote site should remain same and below the number of 
puts done in the FIRST RUN
+    vm2.invoke(() -> 
WANTestBase.validateRegionSizeRemainsSame(getTestMethodName() + "_PR", 200 ));
+
+    //SECOND RUN: do some more puts
+    AsyncInvocation async = vm4.invokeAsync(() -> WANTestBase.doPuts( 
getTestMethodName() + "_PR", 1000 ));
+    async.join();
+
+    //verify all buckets drained only on non-accessor nodes.
+    vm4.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm5.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    
+    //verify the events propagate to remote site
+    vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + 
"_PR", 1000 ));
+  }
+
+  
+  /**
+   * Normal scenario in which a combinations of start, pause, resume operations
+   * is tested
+   */
+  @Test
+  public void testStartPauseResumeParallelGatewaySender() throws Exception {
+    Integer lnPort = (Integer)vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    LogWriterUtils.getLogWriter().info("Created cache on local site");
+    
+    vm4.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY ));
+    vm5.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY ));
+    vm6.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY ));
+    vm7.invoke(() -> WANTestBase.createConcurrentSender( "ln", 2,
+        true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY ));
+    
+    LogWriterUtils.getLogWriter().info("Created senders on local site");
+    
+    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+    vm6.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+    vm7.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+
+    LogWriterUtils.getLogWriter().info("Created PRs on local site");
+    
+    vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
+    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
+    LogWriterUtils.getLogWriter().info("Created PRs on remote site");
+    
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 ));
+    LogWriterUtils.getLogWriter().info("Done 1000 puts on local site");
+    
+    //Since puts are already done on userPR, it will have the buckets created. 
+    //During sender start, it will wait until those buckets are created for 
shadowPR as well.
+    //Start the senders in async threads, so colocation of shadowPR will be 
complete and 
+    //missing buckets will be created in 
PRHARedundancyProvider.createMissingBuckets().
+    startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    
+    LogWriterUtils.getLogWriter().info("Started senders on local site");
+    
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 5000 ));
+    LogWriterUtils.getLogWriter().info("Done 5000 puts on local site");
+    
+    vm4.invoke(() -> WANTestBase.pauseSender( "ln" ));
+    vm5.invoke(() -> WANTestBase.pauseSender( "ln" ));
+    vm6.invoke(() -> WANTestBase.pauseSender( "ln" ));
+    vm7.invoke(() -> WANTestBase.pauseSender( "ln" ));
+    LogWriterUtils.getLogWriter().info("Paused senders on local site");
+    
+    vm4.invoke(() -> WANTestBase.verifySenderPausedState( "ln" ));
+    vm5.invoke(() -> WANTestBase.verifySenderPausedState( "ln" ));
+    vm6.invoke(() -> WANTestBase.verifySenderPausedState( "ln" ));
+    vm7.invoke(() -> WANTestBase.verifySenderPausedState( "ln" ));
+    
+    AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.doPuts( 
getTestMethodName() + "_PR", 1000 ));
+    LogWriterUtils.getLogWriter().info("Started 1000 async puts on local 
site");
+
+    vm4.invoke(() -> WANTestBase.resumeSender( "ln" ));
+    vm5.invoke(() -> WANTestBase.resumeSender( "ln" ));
+    vm6.invoke(() -> WANTestBase.resumeSender( "ln" ));
+    vm7.invoke(() -> WANTestBase.resumeSender( "ln" ));
+    LogWriterUtils.getLogWriter().info("Resumed senders on local site");
+
+    vm4.invoke(() -> WANTestBase.verifySenderResumedState( "ln" ));
+    vm5.invoke(() -> WANTestBase.verifySenderResumedState( "ln" ));
+    vm6.invoke(() -> WANTestBase.verifySenderResumedState( "ln" ));
+    vm7.invoke(() -> WANTestBase.verifySenderResumedState( "ln" ));
+
+    try {
+      inv1.join();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+      fail("Interrupted the async invocation.");
+    }
+    
+    //verify all buckets drained on all sender nodes.
+    vm4.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm5.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm6.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm7.invoke(() -> 
WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+      getTestMethodName() + "_PR", 5000 ));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(
+      getTestMethodName() + "_PR", 5000 ));
+  }
+}

Reply via email to